Serialization is a general term that covers deserializing and serializing.
When you push an array of bytes through a deserializer, it gives you an object on the other end:
A serializer is just the opposite—you give it an object, and it returns an array of bytes:
Serialization is important for Apache Kafka® because as mentioned above, a Kafka broker only works with bytes. Kafka stores records in bytes, and when a fetch request comes in from a consumer, Kafka returns records in bytes. The broker really knows nothing about its records; it just appends them to the end of a file, and that's the end of it.
To bring data into Kafka Streams, you provide SerDes for your topic’s key and value in the Consumed configuration object.
StreamsBuilder builder = new StreamsBuilder() KStream<String, MyObject> stream = builder.stream("topic", Consumed.with(Serdes.String(), customObjectSerde)
A SerDes is a convenience function, a wrapper around the serializer for a certain type and the deserializer for a certain type.
Similarly, when you write out from Kafka Streams, you have to provide a SerDes to serialize your data:
KStream<String, CustomObject> modifiedStream = stream.filter( (key, value) -> value.startsWith(“ID5”)) .mapValues( value -> new CustomObject(value)); modifiedStream.to(“output-topic”, Produced.with(Serdes.String(), customObjectSerde);
(Note that with state stores, you would use a Materialized to provide the SerDes.)
To create a custom SerDes, use the factory method Serdes.serdeFrom and pass both a serializer instance and a deserializer instance:
Serde<T> serde = Serdes.serdeFrom( new CustomSerializer<T>, new CustomDeserializer<T>);
Creating your own serializer and deserializer is actually not that difficult. Consult the documentation to accomplish it. You just need to implement the Serializer and Deserializer interfaces from the org.apache.kafka.clients package.
Out of the box, Kafka Streams includes SerDes for String, Integer, Double, Long, Float, Bytes, ByteArray, and ByteBuffer types.
If you're working with Avro, Protobuf or JSON Schema, there are also SerDes available from Schema Registry:
Depending on whether you are using a specific or generic object, you can use either SpecificAvroSerde or GenericAvroSerde.
SpecificAvroSerde serializes from a specific type, and when you deserialize, you get a specific type back. GenericAvroSerde returns a type-agnostic object, and you access fields in a manner similar to how you retrieve elements from a HashMap.
Like the other examples, KafkaProtobufSerde encapsulates a serializer and deserializer for Protobuf-encoded data. It has methods that allow you to retrieve a SerDes as needed.
KafkaJsonSchemaSerde encapsulates a serializer and deserializer for JSON-formatted data. It also has methods that allow you to retrieve a SerDes as needed.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.