Get Started Free
course: Schema Registry 101

Hands On: Integrate Schema Registry with Clients

1 min
Danica Fine

Danica Fine

Senior Developer Advocate (Presenter)

In this hands-on exercise you will integrate Kafka producer and consumer clients with Schema Registry. First you will configure them to use Schema Registry and then produce and consume some records. A Kafka producer and consumer is already created in the schema-registry project. You just need to configure them for Schema Registry. Note that we are going to focus on Schema Registry specific configurations, other required configurations are already set up for you. Let’s start with the Producer.

  1. Open the ProducerApp found in the io.confluent.developer package.

sr101-m8-01

  1. Set the key serializer by replacing the instructional text and surrounding < > with StringSerializer.class.
  2. Set the value serializerby replacing the instructional text and surrounding < > with KafkaProtobufSerializer.class.
  3. Note that schema auto-registration is set to false since we already registered the schema manually following best practice.

sr101-m8-02

Next is the most important config specifying where Schema Registry is located so the Schema Registry-aware serializer knows where to connect to it.

  1. To set the schema.registry.url, follow the instructional text and replace it and surrounding < > with the corresponding value contained in confluent.properties.

Since the basic.auth.credentials.source is set to USER_INFO, you need to set basic.auth.user.info value to SR API key:SR API secret to enable the connection to Schema Registry in Confluent Cloud.

  1. To set the basic.auth.user.info, follow the instructional text and replace it and surrounding < > with the corresponding value contained in confluent.properties.

Next, let’s work on the consumer starting with the key and value deserializer configuration.

  1. Open the ConsumerApp found in the io.confluent.developer package.
  2. Set the key deserializer by replacing the instructional text and surrounding < > with StringDeserializer.class.
  3. Set the value serializer by replacing the instructional text and surrounding < > with KafkaProtobufDeserializer.class.

A consumer that uses KafkaProtobufDeserializer needs to be provided with the specific Protobuf class expected for deserialization. Notice that this has been set to Purchase.class.

Let’s now configure the consumer to connect to Schema Registry in Confluent Cloud.

  1. Set the schema.registry.url to the corresponding value contained in confluent.properties.
  2. Set the basic.auth.user.info to the corresponding value contained in confluent.properties.

Now let’s run the producer and consumer. The build.gradle file includes a couple of tasks to run both of them; let’s start with the producer.

  1. Open a terminal window at the root of the schema-registry project and run command ./gradlew runProducer.

After executing the command, you will see the build run and then see the results of producing records in the console output. Once the producer finishes running, run the consumer to consume the records that were just produced.

  1. Run command ./gradlew runConsumer.

Again, you will see the build run and then you will see the records print out to the console.

  1. Enter Ctrl+C to stop the consumer application.

Now let’s integrate Schema Registry using the Confluent CLI. First you will need to log in using your Confluent Cloud user and password.

  1. Run command confluent login --save and provide your credentials when prompted.

To consume records that were serialized with a Schema Registry compatible serializer, you must provide the Confluent CLI with the Schema Registry connection and authorization settings. You also need to use the --value-format parameter to inform the CLI that the record value format is Protobuf.

  1. Run command (update the parameter values with their values contained in confluent.properties):

    confluent kafka topic consume proto-purchase \
        --from-beginning\
        --value-format protobuf \
        --api-key <cluster API key> \
        --api-secret <API key secret> \
        --sr-endpoint <Schema Registry url> \
        --sr-api-key <Schema Registry API key> \
        --sr-api-secret <Schema Registry API key secret>

The underlying console consumer will retrieve the schema and use it to deserialize the record.
  1. Enter Ctrl+C to stop the consumer application.

Now let’s see Schema Registry in action with ksqlDB.

  1. Return to the Confluent Cloud Console and navigate to the schema-registry-101 cluster.
  2. Click ksqlDB on the left side of the Confluent Cloud Console.

Note: For this part of the exercise you will need to create a ksqlDB cluster if you have not already done so. When prompted, select the Global access option.

  1. In the ksqlDB view, click on the ksqlDB cluster link in the Cluster name column.

  2. To create a stream named purchase_events, enter the following in the Editor view:

    CREATE STREAM purchase_events (
        id VARCHAR KEY,
        item VARCHAR,
        total_cost DOUBLE)
        WITH (KAFKA_TOPIC = 'proto-purchase',
        VALUE_FORMAT = 'PROTOBUF');

sr101-m8-03

Note: Specifying the value format of Protobuf results in ksqlDB using the appropriate Schema Registry compatible deserializer under the covers.

  1. Set the auto.offset.reset query property to Earliest.
  2. Click on the Run query button.
  3. Scroll down to the results window to confirm successful creation of the stream.
  4. Navigate to the Streams view.
  5. Click on the PURCHASE_EVENTS stream you just created.
  6. Click on the Query stream button below the stream details.
  7. Scroll down and observe the query results.

This concludes the exercise.

Use the promo code SCHEMA101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.