Get Started Free
Screen Shot 2021-07-23 at 4.13.50 PM

Sophie Blee-Goldman

Senior Software Engineer (Presenter)

bill-bejeck

Bill Bejeck

Integration Architect (Author)

KTable

This module defines the KTable, explains how it differs from a KStream, and covers its basic operations, as well as its GlobalKTable variant.

Update Streams

The module Basic Operations defined event streams and mentioned that keys across records in event streams are completely independent of one another, even if they are identical.

Update Streams are the exact opposite: if a new record comes in with the same key as an existing record, the existing record will be overwritten.

update-streams

This means that when using a KTable, keys are required, although they aren't required when using a KStream. By overwriting records, a KTable creates a completely different data structure from a KStream, even given the same source records.

Defining a KTable

To define a KTable, you use a StreamsBuilder, as with a KStream, but you call builder.table instead of builder.stream. With the builder.table method, you provide an inputTopic, along with a Materialized configuration object specifying your SerDes (this replaces the Consumed object that you use with a KStream):

 StreamsBuilder builder = new StreamsBuilder();
 KTable<String, String> firstKTable = 
    builder.table(inputTopic, 
    Materialized.with(Serdes.String(), Serdes.String()));

KTable Operations

The KTable API has operations similar to those of the KStream API, including mapping and filtering.

Mapping

ktable-mapping

As with KStream, mapValues transforms values and map lets you transform both keys and values.

firstKTable.mapValues(value -> ..)
firstKTable.map((key,value) -> ..)

Filtering

As with KStream, the filter operation lets you supply a predicate, and only records that match the predicate are forwarded to the next node in the topology:

firstKTable.filter((key, value) -> ..)

ktable-filtering

GlobalKTable

A GlobalKTable is built using the GlobalKTable method on the StreamBuilder. As with a regular KTable, you pass in a Materialized configuration with the SerDes:

 StreamsBuilder builder = new StreamsBuilder();
 GlobalKTable<String, String> globalKTable = 
    builder.globalTable(inputTopic, 
    Materialized.with(Serdes.String(), Serdes.String()));

The main difference between a KTable and a GlobalKTable is that a KTable shards data between Kafka Streams instances, while a GlobalKTable extends a full copy of the data to each instance. You typically use a GlobalKTable with lookup data. There are also some idiosyncrasies regarding joins between a GlobalKTable and a KStream; we’ll cover these later in the course.

Use the promo code STREAMS101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.