Senior Developer Advocate (Presenter)
In this hands-on exercise you will integrate Kafka producer and consumer clients with Schema Registry. First you will configure them to use Schema Registry and then produce and consume some records. A Kafka producer and consumer is already created in the
schema-registry project. You just need to configure them for Schema Registry. Note that we are going to focus on Schema Registry specific configurations, other required configurations are already set up for you. Let’s start with the Producer.
ProducerAppfound in the
falsesince we already registered the schema manually following best practice.
Next is the most important config specifying where Schema Registry is located so the Schema Registry-aware serializer knows where to connect to it.
schema.registry.url, follow the instructional text and replace it and surrounding < > with the corresponding value contained in
basic.auth.credentials.source is set to
USER_INFO, you need to set
basic.auth.user.info value to SR API key:SR API secret to enable the connection to Schema Registry in Confluent Cloud.
basic.auth.user.info, follow the instructional text and replace it and surrounding < > with the corresponding value contained in
Next, let’s work on the consumer starting with the key and value deserializer configuration.
ConsumerAppfound in the
A consumer that uses KafkaProtobufDeserializer needs to be provided with the specific Protobuf class expected for deserialization. Notice that this has been set to
Let’s now configure the consumer to connect to Schema Registry in Confluent Cloud.
schema.registry.urlto the corresponding value contained in
basic.auth.user.infoto the corresponding value contained in
Now let’s run the producer and consumer. The
build.gradle file includes a couple of tasks to run both of them; let’s start with the producer.
schema-registryproject and run command
After executing the command, you will see the build run and then see the results of producing records in the console output. Once the producer finishes running, run the consumer to consume the records that were just produced.
Again, you will see the build run and then you will see the records print out to the console.
Now let’s integrate Schema Registry using the Confluent CLI. First you will need to log in using your Confluent Cloud user and password.
confluent login --saveand provide your credentials when prompted.
To consume records that were serialized with a Schema Registry compatible serializer, you must provide the Confluent CLI with the Schema Registry connection and authorization settings. You also need to use the
--value-format parameter to inform the CLI that the record value format is Protobuf.
Run command (update the parameter values with their values contained in
confluent kafka topic consume proto-purchase \ --from-beginning\ --value-format protobuf \ --api-key <cluster API key> \ --api-secret <API key secret> \ --sr-endpoint <Schema Registry url> \ --sr-api-key <Schema Registry API key> \ --sr-api-secret <Schema Registry API key secret>
Now let’s see Schema Registry in action with ksqlDB.
Note: For this part of the exercise you will need to create a ksqlDB cluster if you have not already done so. When prompted, select the Global access option.
In the ksqlDB view, click on the ksqlDB cluster link in the Cluster name column.
To create a stream named
purchase_events, enter the following in the Editor view:
CREATE STREAM purchase_events ( id VARCHAR KEY, item VARCHAR, total_cost DOUBLE) WITH (KAFKA_TOPIC = 'proto-purchase', VALUE_FORMAT = 'PROTOBUF');
Note: Specifying the value format of Protobuf results in ksqlDB using the appropriate Schema Registry compatible deserializer under the covers.
auto.offset.resetquery property to
This concludes the exercise.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.