If you haven’t already, clone the course GitHub repository and load it into your favorite IDE or editor.
git clone https://github.com/confluentinc/learn-kafka-courses.git cd learn-kafka-courses/kafka-streams
The source code in this course is compatible with Java 11. Compile the source with
./gradlew build and follow along in the code. This module’s code can be found in the source file
This exercise features the KTable version of the Streams application shown in the Basic Operations exercise.
Start by creating a variable to store the string that we want to filter on:
final String orderNumberStart = "orderNumber-";
Now create the KTable instance. Note that you call
builder.table instead of
builder.stream; also, with the
Materialized configuration object, you need to provide a name for the KTable in order for it to be materialized. It will use caching and will only emit the latest records for each key after a commit (which is 30 seconds, or when the cache is full at 10 MB).
KTable<String, String> firstKTable = builder.table(inputTopic, Materialized.<String, String, KeyValueStore<Bytes, byte>>as("ktable-store")
Add SerDes for the key and value on your
filter operator for removing records that don't contain the order number variable value:
firstKTable.filter((key, value) -> value.contains(orderNumberStart))
Map the values by taking a substring:
.mapValues(value -> value.substring(value.indexOf("-") + 1))
Then filter again by taking out records where the number value of the string is less than or equal to 1000:
.filter((key, value) -> Long.parseLong(value) > 1000)
Convert the KTable to a KStream:
peek operation to view the key values from the table:
.peek((key, value) -> System.out.println("Outgoing record - key " +key +" value " + value))
Write the records to a topic:
.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
KafkaStreams object and run the topic data helper utility:
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps); TopicLoader.runProducer();
Finally, start the application:
You should let the application run for about 40 seconds, and you should see one result output: the latest update for the event with a given key.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.