Sr. Director, Developer Advocacy (Presenter)
Principal Developer Advocate (Author)
This exercise is the culmination of a pipeline project that takes streams of ratings events, filters them, and enriches them with information about the customer using data streamed from a database.
For the final step, we will stream the enriched data out to Elasticsearch from where it can be built into a dashboard. You need to have an Elasticsearch instance created as described in the first exercise, and it must be accessible from the internet.
In Confluent Cloud, click on the Connectors link, click Add connector, and search for the "Elasticsearch Service Sink" connector.
Click on the tile to create the sink connector.
Configure the connector as follows. You can leave blank any options that are not specified below.
Which topics do you want to get data from? | |
topics |
|
Input messages | |
Input message format | AVRO |
Kafka Cluster credentials | |
Kafka API Key | Use the same API details as you created for the Datagen connector previously. You can create a new API key if necessary, but API key numbers are limited so for the purposes of this exercise only it’s best to re-use if you can. |
Kafka API Secret | |
How should we connect to your Elasticsearch Service? | |
Connection URI | These values will depend on where your Elasticsearch instance is and how you have configured it. Elasticsearch needs to be open to inbound connections from the internet. |
Connection username | |
Connection password | |
Data Conversion | |
Type name |
|
Key ignore |
|
Schema ignore |
|
Connection Details | |
Batch size |
|
Number of tasks for this connector | |
Tasks | 1 |
Click Next to test the connection and validate the configuration.
On the next screen, the JSON configuration should be similar to that shown below. If it is not, return to the previous screen to amend it as needed.
{ "name": "ElasticsearchSinkConnector_0", "config": { "topics": "ratings-enriched", "input.data.format": "AVRO", "connector.class": "ElasticsearchSink", "name": "ElasticsearchSinkConnector_0", "kafka.api.key": "****************", "kafka.api.secret": "***************************", "connection.url": "https://es-host:port", "connection.username": "elastic", "connection.password": "************************", "type.name": "_doc", "key.ignore": "true", "schema.ignore": "true", "batch.size": "5", "tasks.max": "1" } }
Click Launch.
After a few moments, the connector will be provisioned and shortly thereafter you should see that it is "Running" (alongside the existing connectors that you created in previous exercises):
In Elasticsearch, check that data has been received in the index. You can do this using the REST API or with Kibana itself. Here’s an example using curl
to do it:
curl -u $ES_USER:$ES_PW $ES_ENDPOINT/_cat/indices/ratings\*\?v=true
You should see a reponse from Elasticsearch that looks like this:
health status index uuid pri rep docs.count […] green open ratings-enriched Wj-o_hEwR8ekHSF7M7aVug 1 1 101091 […]
Note that the docs.count
value should be above zero.
You can now use the data. In our example, we’re streaming it to Elasticsearch so as to be able to build an operational dashboard using Kibana. The following assumes that you are familiar with the use of Kibana.
In Kibana, create an index pattern for the ratings-enriched
index, with RATING_TS
as the time field.
Use the Discover view to explore the data and its characteristics
Create visualizations to build a dashboard showing relevant details in the data.
Make sure that when you have finished the exercises in this course you use the Confluent Cloud UI or CLI to destroy all the resources you created. Verify they are destroyed to avoid unexpected charges.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.