Get Started Free
‹ Back to courses
course: Kafka Streams 101

Serialization

5 min
Screen Shot 2021-07-23 at 4.13.50 PM

Sophie Blee-Goldman

Senior Software Engineer (Presenter)

bill-bejeck

Bill Bejeck

Integration Architect (Author)

Serialization

Serialization is a general term that covers deserializing and serializing.

When you push an array of bytes through a deserializer, it gives you an object on the other end:

deserializer

A serializer is just the opposite—you give it an object, and it returns an array of bytes:

serializer

Serialization is important for Apache Kafka® because as mentioned above, a Kafka broker only works with bytes. Kafka stores records in bytes, and when a fetch request comes in from a consumer, Kafka returns records in bytes. The broker really knows nothing about its records; it just appends them to the end of a file, and that's the end of it.

Kafka Streams

To bring data into Kafka Streams, you provide SerDes for your topic’s key and value in the Consumed configuration object.

StreamsBuilder builder = new StreamsBuilder()
KStream<String, MyObject> stream = builder.stream("topic",
    Consumed.with(Serdes.String(), customObjectSerde)

A SerDes is a convenience function, a wrapper around the serializer for a certain type and the deserializer for a certain type.

Similarly, when you write out from Kafka Streams, you have to provide a SerDes to serialize your data:

KStream<String, CustomObject> modifiedStream = 
    stream.filter( (key, value) -> value.startsWith(ID5))               
.mapValues( value -> new CustomObject(value));

modifiedStream.to(“output-topic”, Produced.with(Serdes.String(), customObjectSerde);

(Note that with state stores, you would use a Materialized to provide the SerDes.)

Custom SerDes

To create a custom SerDes, use the factory method Serdes.serdeFrom and pass both a serializer instance and a deserializer instance:

Serde<T> serde = Serdes.serdeFrom( new CustomSerializer<T>, 
    new CustomDeserializer<T>); 

Creating your own serializer and deserializer is actually not that difficult. Consult the documentation to accomplish it. You just need to implement the Serializer and Deserializer interfaces from the org.apache.kafka.clients package.

Pre-Existing SerDes

Out of the box, Kafka Streams includes SerDes for String, Integer, Double, Long, Float, Bytes, ByteArray, and ByteBuffer types.

Avro, Protobuf, or JSON Schema

If you're working with Avro, Protobuf or JSON Schema, there are also SerDes available from Schema Registry:

Avro

Depending on whether you are using a specific or generic object, you can use either SpecificAvroSerde or GenericAvroSerde.

SpecificAvroSerde serializes from a specific type, and when you deserialize, you get a specific type back. GenericAvroSerde returns a type-agnostic object, and you access fields in a manner similar to how you retrieve elements from a HashMap.

Protobuf

Like the other examples, KafkaProtobufSerde encapsulates a serializer and deserializer for Protobuf-encoded data. It has methods that allow you to retrieve a SerDes as needed.

JSON Schema

KafkaJsonSchemaSerde encapsulates a serializer and deserializer for JSON-formatted data. It also has methods that allow you to retrieve a SerDes as needed.

Use the promo code STREAMS101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.

Serialization

Hi, I'm Sophie Blee-Goldman with Confluent and this module will cover Data Serialization. [String Instrumental]

So, why is serialization important? Well, Kafka brokers really only know records and events as bytes. The consumer receives bytes from the broker. The producer sends bytes to the broker. The broker has no notion of individual types of these events. So it's important for Kafka streams or the consumer or the producer to know how to convert these actual useful types, your objects, whatever you are using in your application, into bytes that can be sent across the network, or put into a state-store. Now, there's two things that we mean when we say "data serialization": There is serialization, which is converting from an object to bytes, so you might have something like a string or an integer, or a more complicated custom data-type. And serialization just specifies how do you convert that to zeroes and the ones that can be sent to the broker, or stored in a state-store that Kafka streams uses. Similarly, on the other end, we have the bytes and we want to know how to get some useful information or objects out of those bytes. And this is what we call a deserialization. So deserialization just specifies how to get these ones and zeros into a useful, usable form. Now, with Kafka streams, we actually work with Serdes. Now, Serdes is not itself a thing; it's just a convenience wrapper around a serializer and a deserializer. It's useful because you don't have to say those long words. And because then you don't have to specify both the serializer and deserializer for objects that are the same-- the same type. Uh, they're never going to change type. And therefore, you can just specify the Serde. You also don't have to worry about when to specify a serializer, and when to specify a deserializer. You can just specify a Serde everywhere in Kafka streams, and it will handle calling the correct thing when it needs to. Now, we've seen this before with the consumed object, for example, when creating a K stream, you pass it in this consumed object, and you might notice that it takes in the Serdes. So, the first argument to the "consumed.with" constructor is the Serdes.String. And that's just a built-in Serde that specifies that this is actually a key with the string-type in that topic. Now, the value is this my custom object Serde. So whenever you're creating a topic with some more complicated type, such as your custom, my object, then you need to pass in a custom Serde that you yourself will define. Now, similarly for sync nodes, you have this Produced.with config object. And again, you specify the Serdes that you saw before. So for creating a KStream object for a source node, you use this consumed.with. Now for sync nodes, and producing to an output topic, you use this produced object, and that also takes in a key and a value Serde. Again, the key in this example was a string and the value would be this custom object Serde. And for state stores, you will use a materialized object to specify the Serdes. And that is useful for when you were writing them to an actual state store on local disk. So how do you create a Serde? Creating a custom Serde is just a matter of creating the serializer and the deserializer that it wraps, as we said, it's just a convenience object. And to do that, you can just pass in the custom serializer and deserializer into this Serde from a convenience method. This will give you your Serde. So how do you get a custom serializer and a custom deserializer? Well, all you have to do there is implement the serializer or deserializer interface of the Kafka client library and tell it, how do you convert your custom object into bytes or convert bytes into your custom object. Now, luckily Kafka provides a lot of out-of-the box Serdes, so that you don't have to specify this all the time. Often you'll be working with strings or number-types like integer or long, or even raw bytes, byte array, whatever it is, uh, anything that is included in the pre-existing, out-of-the-box Serdes, you can just use, and don't have to worry about writing yourself. Now, that only covers some relatively simple types, but there are many more advanced types that you can still specify and avoid having to write the serialization for. And that's why schema registry provides several out-of-the-box, Avro, Protobuf, and Json schema Serdes that you can use in Kafka streams to avoid having to write this yourself while using these predefined types that you're probably using anyways, for your data in Kafka. That's serialization in Kafka streams. [String Music]