Course: Kafka Streams 101

Hands On: KTable

2 min
Sophie Blee-GoldmanSoftware Engineer II (Course Presenter)
Bill BejeckIntegration Architect (Course Author)

Hands On: KTable

If you haven’t already, clone the course GitHub repository and load it into your favorite IDE or editor.

git clone https://github.com/confluentinc/learn-kafka-courses.git
cd learn-kafka-courses/kafka-streams

The source code in this course is compatible with Java 11. Compile the source with ./gradlew build and follow along in the code. This module’s code can be found in the source file java/io/confluent/developer/ktable/KTableExample.java.

This exercise features the KTable version of the Streams application shown in the Basic Operations exercise.

  1. Start by creating a variable to store the string that we want to filter on:

    final String orderNumberStart = "orderNumber-";
  2. Now create the KTable instance. Note that you call builder.table instead of builder.stream; also, with the Materialized configuration object, you need to provide a name for the KTable in order for it to be materialized. It will use caching and will only emit the latest records for each key after a commit (which is 30 seconds, or when the cache is full at 10 MB).

    KTable<String, String> firstKTable = builder.table(inputTopic,
        Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("ktable-store")
  3. Add SerDes for the key and value on your Materialized object:

    .withKeySerde(Serdes.String())
    .withValueSerde(Serdes.String()));
  4. Add a filter operator for removing records that don't contain the order number variable value:

    firstKTable.filter((key, value) -> value.contains(orderNumberStart))
  5. Map the values by taking a substring:

    .mapValues(value -> value.substring(value.indexOf("-") + 1))
  6. Then filter again by taking out records where the number value of the string is less than or equal to 1000:

    .filter((key, value) -> Long.parseLong(value) > 1000)
  7. Convert the KTable to a KStream:

    .toStream()
  8. Add a peek operation to view the key values from the table:

    .peek((key, value) -> System.out.println("Outgoing record - key " +key +" value " + value))
  9. Write the records to a topic:

    .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
  10. Create a KafkaStreams object and run the topic data helper utility:

    KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps);
    TopicLoader.runProducer();
  11. Finally, start the application:

    kafkaStreams.start(); 

    You should let the application run for about 40 seconds, and you should see one result output: the latest update for the event with a given key.

Use the promo code STREAMS101 to get $101 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.