Senior Developer Advocate (Presenter)
In this hands-on exercise you will integrate Kafka producer and consumer clients with Schema Registry. First you will configure them to use Schema Registry and then produce and consume some records. A Kafka producer and consumer is already created in the schema-registry project. You just need to configure them for Schema Registry. Note that we are going to focus on Schema Registry specific configurations, other required configurations are already set up for you. Let’s start with the Producer.
Next is the most important config specifying where Schema Registry is located so the Schema Registry-aware serializer knows where to connect to it.
Since the basic.auth.credentials.source is set to USER_INFO, you need to set basic.auth.user.info value to SR API key:SR API secret to enable the connection to Schema Registry in Confluent Cloud.
Next, let’s work on the consumer starting with the key and value deserializer configuration.
A consumer that uses KafkaProtobufDeserializer needs to be provided with the specific Protobuf class expected for deserialization. Notice that this has been set to Purchase.class.
Let’s now configure the consumer to connect to Schema Registry in Confluent Cloud.
Now let’s run the producer and consumer. The build.gradle file includes a couple of tasks to run both of them; let’s start with the producer.
After executing the command, you will see the build run and then see the results of producing records in the console output. Once the producer finishes running, run the consumer to consume the records that were just produced.
Again, you will see the build run and then you will see the records print out to the console.
Now let’s integrate Schema Registry using the Confluent CLI. First you will need to log in using your Confluent Cloud user and password.
To consume records that were serialized with a Schema Registry compatible serializer, you must provide the Confluent CLI with the Schema Registry connection and authorization settings. You also need to use the --value-format parameter to inform the CLI that the record value format is Protobuf.
Run command (update the parameter values with their values contained in confluent.properties):
confluent kafka topic consume proto-purchase \
--from-beginning\
--value-format protobuf \
--api-key <cluster API key> \
--api-secret <API key secret> \
--sr-endpoint <Schema Registry url> \
--sr-api-key <Schema Registry API key> \
--sr-api-secret <Schema Registry API key secret>
Now let’s see Schema Registry in action with ksqlDB.
Note: For this part of the exercise you will need to create a ksqlDB cluster if you have not already done so. When prompted, select the Global access option.
In the ksqlDB view, click on the ksqlDB cluster link in the Cluster name column.
To create a stream named purchase_events, enter the following in the Editor view:
CREATE STREAM purchase_events (
id VARCHAR KEY,
item VARCHAR,
total_cost DOUBLE)
WITH (KAFKA_TOPIC = 'proto-purchase',
VALUE_FORMAT = 'PROTOBUF');
Note: Specifying the value format of Protobuf results in ksqlDB using the appropriate Schema Registry compatible deserializer under the covers.
This concludes the exercise.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.
Hi, I'm Danica Fine. Welcome back to another hands-on Schema Registry exercise. Let's go through the process of integrating the Confluent Schema Registry with Java producer and consumer clients, as well as with ksqlDB. With Java, the first step is to configure the clients to use the correct key and value serializers. You'll also want to configure the properties the Java clients will use to establish a connection with the Confluent Cloud Schema Registry. Once these steps are completed, you can run the Java producer and consumer and verify that records are successfully produced to the Kafka topic in Confluent Cloud and consumed from that topic. In the final part of the exercise, you'll also run a KSQL query that creates a filtered stream of events from the Protobuf serialized topic. Once you're done, join me in the next module.