Course: Kafka Streams 101

Serialization

5 min
Sophie Blee-GoldmanSoftware Engineer II (Course Presenter)
Bill BejeckIntegration Architect (Course Author)

Serialization

Serialization is a general term that covers deserializing and serializing.

When you push an array of bytes through a deserializer, it gives you an object on the other end:

deserializer

A serializer is just the opposite—you give it an object, and it returns an array of bytes:

serializer

Serialization is important for Apache Kafka® because as mentioned above, a Kafka broker only works with bytes. Kafka stores records in bytes, and when a fetch request comes in from a consumer, Kafka returns records in bytes. The broker really knows nothing about its records; it just appends them to the end of a file, and that's the end of it.

Kafka Streams

To bring data into Kafka Streams, you provide SerDes for your topic’s key and value in the Consumed configuration object.

StreamsBuilder builder = new StreamsBuilder()
KStream<String, MyObject> stream = builder.stream("topic",
    Consumed.with(Serdes.String(), customObjectSerde)

A SerDes is a convenience function, a wrapper around the serializer for a certain type and the deserializer for a certain type.

Similarly, when you write out from Kafka Streams, you have to provide a SerDes to serialize your data:

KStream<String, CustomObject> modifiedStream = 
    stream.filter( (key, value) -> value.startsWith(“ID5”))               
.mapValues( value -> new CustomObject(value));

modifiedStream.to(“output-topic”, Produced.with(Serdes.String(), customObjectSerde);

(Note that with state stores, you would use a Materialized to provide the SerDes.)

Custom SerDes

To create a custom SerDes, use the factory method Serdes.serdeFrom and pass both a serializer instance and a deserializer instance:

Serde<T> serde = Serdes.serdeFrom( new CustomSerializer<T>, 
    new CustomDeserializer<T>); 

Creating your own serializer and deserializer is actually not that difficult. Consult the documentation to accomplish it. You just need to implement the Serializer and Deserializer interfaces from the org.apache.kafka.clients package.

Pre-Existing SerDes

Out of the box, Kafka Streams includes SerDes for String, Integer, Double, Long, Float, Bytes, ByteArray, and ByteBuffer types.

Avro, Protobuf, or JSON Schema

If you're working with Avro, Protobuf or JSON Schema, there are also SerDes available from Schema Registry:

Avro

Depending on whether you are using a specific or generic object, you can use either SpecificAvroSerde or GenericAvroSerde.

SpecificAvroSerde serializes from a specific type, and when you deserialize, you get a specific type back. GenericAvroSerde returns a type-agnostic object, and you access fields in a manner similar to how you retrieve elements from a HashMap.

Protobuf

Like the other examples, KafkaProtobufSerde encapsulates a serializer and deserializer for Protobuf-encoded data. It has methods that allow you to retrieve a SerDes as needed.

JSON Schema

KafkaJsonSchemaSerde encapsulates a serializer and deserializer for JSON-formatted data. It also has methods that allow you to retrieve a SerDes as needed.

Use the promo code STREAMS101 to get $101 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.