Get Started Free
‹ Back to courses
course: Schema Registry 101

Integrating Schema Registry with Client Applications

6 min
Danica Fine

Danica Fine

Senior Developer Advocate (Presenter)

In this module, you will put what you have learned so far into practice by working with client applications.

Schema Registry integrates with several client types. First you will see how to use the Confluent CLI and the console Kafka producer and Kafka consumer clients that ship with Schema Registry. From there, you will learn about KafkaProducer and KafkaConsumer clients, Kafka Streams, and ksqlDB.

This module will wrap up with a discussion on how to use the Gradle and Maven plugins to test your schema.

Confluent CLI Producer

sr101-m7-01

Here is an example that uses the Confluent CLI to produce to a topic using the schema associated with the topic. A few things to note:

  • The value-format of Protobuf is provided since the command would otherwise expect the schema to be using the default value of Avro
  • You provide the path to the schema definition using the --schema parameter
  • To produce a record from the command line you provide the record data in JSON format

In general, using the Confluent CLI is a great way to quickly test and prototype.

Confluent CLI Consumer

sr101-m7-02

In this example, the Confluent CLI is used to consume records from a topic. In this case the records were serialized using the Protobuf format so you need to provide the associated value format. It tells the consumer which deserializer to use for the record value. Just like producing from the CLI, consuming from the CLI is a fantastic way to quickly test and prototype.

Console Producer

sr101-m7-03

The Schema Registry project located on GitHub ships with a console producer and consumer for each of the supported serialization types: Protobuf, Avro, and JSON Schema. Here is an example using the Protobuf console producer. One difference between the console producer and the Confluent CLI produce action is how you provide the schema file. With the console producer, you provide your schema as part of the command. With the CLI, you provide the path to a file that contains your schema. In both cases you provide the schema in JSON format regardless of the serialization format you use.

You can also use the console producer and consumer with Confluent Cloud. To do so, you must supply the Schema Registry api-key and api-secret configs.

Console Consumer

sr101-m7-04

The console consumer completes the picture for working from the command line.

When it comes to using the console producer or consumer versus the Confluent CLI, you can quickly install CLI from the internet. For the console producer and consumer you need to check out the Schema Registry project from GitHub. At the time of this writing, the CLI doesn’t offer everything the console producer and consumer implementations offer.

Common Client Configuration Settings

sr101-m7-05

Now let’s look at integrating Schema Registry with the KafkaProducer and KafkaConsumer clients. Before diving into the specifics, there are some common configuration settings you will always provide whether you are using a producer or a consumer.

The first is the location of the Schema Registry instance which is provided in the form of the schema.registry.url setting. In this example the URL points to a Schema Registry instance in Confluent Cloud. The next two settings in this example are required if you are connecting to a secured Schema Registry instance. The basic.auth.credentials shown here are what you would use when connecting to Schema Registry in Confluent Cloud.

KafkaProducer

sr101-m7-06

Integrating Schema Registry with a KafkaProducer is simply a matter of using a Schema Registry aware serializer. By providing the Schema Registry URL shown in the previous slide, the serializer will handle the rest of the action. It will retrieve the schema and serialize the record into the correct format. In fact, the producer and consumer clients are unaware of Schema Registry—they simply call the correct interface method—in this case the serializer.serialize method. The serializer instance handles the rest.

Now let’s take a look at the life cycle of a schema with the Kafka clients.

Schema Lifecycle

sr101-m7-07

While it is true a Schema Registry-aware serializer will retrieve the schema from Schema Registry, it does so only when the schema has not already been stored in its local cache. This diagram shows how the client interaction takes place with Schema Registry from the perspective of the producer.

  1. The producer attempts to serialize a record and calls serializer.serialize.
  2. This triggers the serializer to check its local cache for the schema file, but it doesn’t find it.
  3. The serializer then queries Schema Registry and retrieves the schema and ID and stores it in its local cache.
  4. Now the serializer serializes the record and stores the ID as part of the byte payload sent to Kafka.

When the consumer attempts to deserialize the record, it extracts the schema ID from the bytes that it receives from Kafka and looks in its cache for the schema. If it’s not there it follows the same retrieval process. The most important thing to note here is that unless the schema ID changes due to a schema change, the clients only connect with Schema Registry once to retrieve the schema file.

Now let’s move on to how you would integrate Schema Registry with a KafkaConsumer.

KafkaConsumer

sr101-m7-08

Integrating Schema Registry with a Kafka consumer is much the same as what you do with a producer but in this case you provide a deserializer instead. But with the consumer there are a couple extra configurations you need to provide.

sr101-m7-09

All of the serializers supported by Schema Registry generate objects from the schemas. You saw this in a previous module and did an exercise where you generated domain objects from a schema. Part of the object hierarchy includes a generic type that contains common methods that all generated objects will share. The specific object type generated contains the methods for retrieving the specific fields contained in the schema.

Because of this object structure, you need to tell the serializer if it should return the specific object or the generic object type.

  • For Avro, you would set the specific.avro.reader configuration to true
  • For Protobuf you provide the specific class name
  • For JSON Schema you provide the class name

sr101-m7-10

Generated objects from Avro extend both the SpecificRecord and GenericRecord type.

For Protobuf, the specific type is Message and the generic type is DynamicMessage.

The JSON Schema deserializer can return an instance of a specific Java class, or an instance of JsonNode.

For the most part, you will always want to use the specific object types since it’s much easier to access the fields. When building an application, after you’ve generated the domain objects you can use the code completion of your IDE to discover the field names to use.

In some cases you might find that you only know the topic contains Avro or Protobuf serialized records, but you don’t know the exact type. In those cases, you want to use the Generic types.

ksqlDB

sr101-m7-11

With ksqlDB it’s even easier. You simply provide the VALUE_FORMAT of AVRO, PROTOBUF, or JSON_SR values and ksqlDB automatically retrieves (reads) and registers (writes) schemas as needed. This spares you from defining columns and data types manually in CREATE statements and from manual interaction with Schema Registry. Before using schema inference in ksqlDB, make sure that the Schema Registry is up and running and ksqlDB is configured to use it.

Use the promo code SCHEMA101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.

Integrating Schema Registry with Client Applications

Hi, I'm Danica Fine. In this Schema Registry module, let's put what we've learned so far into practice by working with Client Applications. Schema Registry integrates with several client types. We'll start by showing how to use the Confluence CLI and the console producer and consumer clients that ship the Schema Registry. From there, we'll move on to the KafkaProducer and KafkaConsumer clients, Kafka Streams and ksqlDB. Here's an example, using the Confluence CLI to produce events to a topic using the schema associated with the topic. A few things to note. First, notice how we provide the value-format of "protobuf", since otherwise, the command would expect the schema to be using the default value of "avro". Also note, that you provide the path to the schema definition using the schema parameter. To produce a record from the command line, you provide the record data in JSON format. In general, the Confluence CLI is a great way to quickly test and prototype your schemas. Next, let's see how the Confluence CLI is used to consume records from a topic. In this case, the records were serialized using the Protobuf format. So again, you need to provide the associated value format. It tells the consumer which deserializer to use for the record value. Just like producing from the CLI, consuming from the CLI is a fantastic way to quickly test and prototype. If you've poked around the Schema Registry project located on GitHub, you may have noticed that it ships with a console producer and consumer for each of the supported serialization types, Protobuf, Avro and JSON Schema. Let's take a look at an example using the Protobuf console producer. One difference between the console producer and the Confluence CLI produce action is how you provide the schema file. With the console producer, you provide your schema as part of the command, while with the CLI you provide the path to a file that contains your schema. In both cases, you provide the schema in JSON format regardless of the serialization format you use. You can also use the console producer and consumer with Confluent Cloud. To do so, you need to supply the Schema Registry API-key and API-secret configs in addition to your other parameters. And to round out the experience of working from the command line, we have the console consumer. All right, we've seen the command line. So now let's look at integrating Schema Registry with the KafkaProducer and KafkaConsumer clients. But before diving into the specifics, there are some common configuration settings you'll always provide whether you're using a producer or a consumer. So let's take a look at those. The first is the location of the Schema Registry instance. We provide this using the schema.registry.url parameter. In this example, the URL points to a Schema Registry instance in Confluent Cloud. The next two settings in this example are required if you're connecting to a secured Schema Registry instance. The basic.auth.credentials shown here are what you would use when connecting to Schema Registry in Confluent Cloud. Integrating Schema Registry with a KafkaProducer is simply a matter of using a Schema Registry-aware serializer. By providing the Schema Registry URL we saw in a previous slide, the serializer will handle the rest. It retrieves the schema and serializes the record into the correct format. In fact, the producer and consumer clients are unaware of Schema Registry

  • they simply call the correct interface method - in this case, the serializer.serialize method. The Serializer instance handles the rest. Now, let's take a look at the life cycle of a schema with the Kafka clients. While it is true that Schema Registry-aware serializer will retrieve a schema from Schema Registry, it only does this when the schema has not already been stored in its local cache. In this diagram, we can see how the client interaction takes place with Schema Registry from the perspective of the producer. First, the producer attempts to serialize a record and calls serializer.serialize. This triggers the serializer to check its local cache for the schema file, but it doesn't find it. The serializer then queries Schema Registry and retrieves the schema and ID and stores it in its local cache. Now, the serializer serializes the record and stores the ID as part of the byte payload sent to Kafka. It's a very similar process for consumers. When the consumer attempts to deserialize the record, it first extracts the schema ID from the bytes that it receives from Kafka and looks in its local cache for the schema. If it's not there it follows the same retrieval process. The most important thing to note here is unless the schema ID changes due to a schema change, the clients only connect with Schema Registry once to retrieve the schema file. Now let's move on to see how you would integrate Schema Registry with a KafkaConsumer. Integrating Schema Registry with a Kafka consumer is much the same as you would do with a producer, but in this case you provide a deserializer instead. With consumers, there are a couple extra configurations you need to provide though. All of the serializers supported by Schema Registry generate objects from the schemas. You saw this in a previous module and did an exercise where you generated domain objects from a schema. Part of the object hierarchy includes a generic type that contains common methods that all generated objects will share. The specific object type generated contains the methods for retrieving the specific fields contained in the schema. Because of this object structure, you need to tell the serializer if it should return the specific object or the generic object type. For Avro, you would set the specific.avro.reader configuration to true. For Protobuf, you provide the specific class name. For JSON schema, you provide the class name. Generated objects from Avro extend both the SpecificRecord and GenericRecord type. For Protobuf, the specific type is Message and the generic type is DynamicMessage. The JSON schema deserializer can return an instance of a specific Java class or an instance of JsonNode. For the most part, you will always want to use the specific object types since it's much easier to access the fields. When building an application, after you've generated the domain objects, you can use the code completion of your IDE to discover the field names to use. In some cases you might find that you only know this topic contains Avro or Protobuf serialized records, but you don't know the exact type. In those cases, you definitely want to use the Generic types. With ksqlDB, it's even easier. You simply provide the VALUE_FORMAT of AVRO, PROTOBUF or JSON_SR values and ksqlDB automatically retrieves and registers schema as needed. This spares you from defining columns and data types manually in CREATE statements and from manual interaction with Schema Registry. Before using schema inference in ksqlDB, make sure that the Schema Registry is up and running and ksqlDB is configured to use it. Knowing how to use Schema Registry within client applications is pretty important. To start applying your knowledge, join me in the upcoming hands on. See you there.