Senior Developer Advocate (Presenter)
In this hands-on exercise you will integrate Kafka producer and consumer clients with Schema Registry. First you will configure them to use Schema Registry and then produce and consume some records. A Kafka producer and consumer is already created in the schema-registry
project. You just need to configure them for Schema Registry. Note that we are going to focus on Schema Registry specific configurations, other required configurations are already set up for you. Let’s start with the Producer.
ProducerApp
found in the io.confluent.developer
package.StringSerializer.class
.KafkaProtobufSerializer.class
.false
since we already registered the schema manually following best practice.Next is the most important config specifying where Schema Registry is located so the Schema Registry-aware serializer knows where to connect to it.
schema.registry.url
, follow the instructional text and replace it and surrounding < > with the corresponding value contained in confluent.properties
.Since the basic.auth.credentials.source
is set to USER_INFO
, you need to set basic.auth.user.info
value to SR API key:SR API secret to enable the connection to Schema Registry in Confluent Cloud.
basic.auth.user.info
, follow the instructional text and replace it and surrounding < > with the corresponding value contained in confluent.properties
.Next, let’s work on the consumer starting with the key and value deserializer configuration.
ConsumerApp
found in the io.confluent.developer
package.StringDeserializer.class
.KafkaProtobufDeserializer.class
.A consumer that uses KafkaProtobufDeserializer needs to be provided with the specific Protobuf class expected for deserialization. Notice that this has been set to Purchase.class
.
Let’s now configure the consumer to connect to Schema Registry in Confluent Cloud.
schema.registry.url
to the corresponding value contained in confluent.properties
.basic.auth.user.info
to the corresponding value contained in confluent.properties
.Now let’s run the producer and consumer. The build.gradle
file includes a couple of tasks to run both of them; let’s start with the producer.
schema-registry
project and run command ./gradlew runProducer
.After executing the command, you will see the build run and then see the results of producing records in the console output. Once the producer finishes running, run the consumer to consume the records that were just produced.
./gradlew runConsumer
.Again, you will see the build run and then you will see the records print out to the console.
Now let’s integrate Schema Registry using the Confluent CLI. First you will need to log in using your Confluent Cloud user and password.
confluent login --save
and provide your credentials when prompted.To consume records that were serialized with a Schema Registry compatible serializer, you must provide the Confluent CLI with the Schema Registry connection and authorization settings. You also need to use the --value-format
parameter to inform the CLI that the record value format is Protobuf.
Run command (update the parameter values with their values contained in confluent.properties
):
confluent kafka topic consume proto-purchase \
--from-beginning\
--value-format protobuf \
--api-key <cluster API key> \
--api-secret <API key secret> \
--sr-endpoint <Schema Registry url> \
--sr-api-key <Schema Registry API key> \
--sr-api-secret <Schema Registry API key secret>
Now let’s see Schema Registry in action with ksqlDB.
schema-registry-101
cluster.Note: For this part of the exercise you will need to create a ksqlDB cluster if you have not already done so. When prompted, select the Global access option.
In the ksqlDB view, click on the ksqlDB cluster link in the Cluster name column.
To create a stream named purchase_events
, enter the following in the Editor view:
CREATE STREAM purchase_events (
id VARCHAR KEY,
item VARCHAR,
total_cost DOUBLE)
WITH (KAFKA_TOPIC = 'proto-purchase',
VALUE_FORMAT = 'PROTOBUF');
Note: Specifying the value format of Protobuf results in ksqlDB using the appropriate Schema Registry compatible deserializer under the covers.
auto.offset.reset
query property to Earliest
.This concludes the exercise.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.