Get Started Free
course: Building Data Pipelines with Apache Kafka® and Confluent

Hands On: Streaming Data from Kafka to Elasticsearch with Kafka Connect

3 min
Untitled design (21)

Tim Berglund

VP Developer Relations

Robin Moffatt

Robin Moffatt

Principal Developer Advocate (Author)

Hands On: Streaming Data from Kafka to Elasticsearch with Kafka Connect

This exercise is the culmination of a pipeline project that takes streams of ratings events, filters them, and enriches them with information about the customer using data streamed from a database.

Data Lineage

For the final step, we will stream the enriched data out to Elasticsearch from where it can be built into a dashboard. You need to have an Elasticsearch instance created as described in the first exercise, and it must be accessible from the internet.

  1. In Confluent Cloud, click on the Connectors link, click Add connector, and search for the "Elasticsearch Service Sink" connector.

    Elasticsearch sink connector in Confluent Cloud

    Click on the tile to create the sink connector.

  2. Configure the connector as follows. You can leave blank any options that are not specified below.

    Which topics do you want to get data from?

    topics

    ratings-enriched

    Input messages

    Input message format

    AVRO

    Kafka Cluster credentials

    Kafka API Key

    Use the same API details as you created for the Datagen connector previously. You can create a new API key if necessary, but API key numbers are limited so for the purposes of this exercise only it’s best to re-use if you can.

    Kafka API Secret

    How should we connect to your Elasticsearch Service?

    Connection URI

    These values will depend on where your Elasticsearch instance is and how you have configured it. Elasticsearch needs to be open to inbound connections from the internet.

    Connection username

    Connection password

    Data Conversion

    Type name

    _doc

    Key ignore

    true

    Schema ignore

    true

    Connection Details

    Batch size

    5 (this is a setting only suitable for this exercise; in practice you would leave it as the default or set it much higher for performance reasons).

    Number of tasks for this connector

    Tasks

    1

    Click Next to test the connection and validate the configuration.

  3. On the next screen, the JSON configuration should be similar to that shown below. If it is not, return to the previous screen to amend it as needed.

    {
      "name": "ElasticsearchSinkConnector_0",
      "config": {
        "topics": "ratings-enriched",
        "input.data.format": "AVRO",
        "connector.class": "ElasticsearchSink",
        "name": "ElasticsearchSinkConnector_0",
        "kafka.api.key": "****************",
        "kafka.api.secret": "***************************",
        "connection.url": "https://es-host:port",
        "connection.username": "elastic",
        "connection.password": "************************",
        "type.name": "_doc",
        "key.ignore": "true",
        "schema.ignore": "true",
        "batch.size": "5",
        "tasks.max": "1"
      }
    }
    

    Click Launch.

  4. After a few moments, the connector will be provisioned and shortly thereafter you should see that it is "Running" (alongside the existing connectors that you created in previous exercises):

    All three connectors running

  5. In Elasticsearch, check that data has been received in the index. You can do this using the REST API or with Kibana itself. Here’s an example using curl to do it:

    curl -u $ES_USER:$ES_PW $ES_ENDPOINT/_cat/indices/ratings\*\?v=true

    You should see a reponse from Elasticsearch that looks like this:

    health status index            uuid                   pri rep docs.count […]
    green  open   ratings-enriched Wj-o_hEwR8ekHSF7M7aVug   1   1     101091 […]
    

    Note that the docs.count value should be above zero.

  6. You can now use the data. In our example, we’re streaming it to Elasticsearch so as to be able to build an operational dashboard using Kibana. The following assumes that you are familiar with the use of Kibana.

    • In Kibana, create an index pattern for the ratings-enriched index, with RATING_TS as the time field.

      Creating a Kibana index pattern

    • Use the Discover view to explore the data and its characteristics

      Kibana Discover view

      Create visualizations to build a dashboard showing relevant details in the data.

      Kibana Dashboard

Note

Make sure that when you have finished the exercises in this course you use the Confluent Cloud UI or CLI to destroy all the resources you created. Verify they are destroyed to avoid unexpected charges.

Use the promo code PIPELINES101 to receive $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.