Senior Software Engineer (Presenter)
Integration Architect (Author)
If you haven’t already, clone the course GitHub repository and load it into your favorite IDE or editor.
git clone https://github.com/confluentinc/learn-kafka-courses.git
cd learn-kafka-courses/kafka-streams
The source code in this course is compatible with Java 11. Compile the source with ./gradlew build and follow along in the code. This module’s code can be found in the source file java/io/confluent/developer/ktable/KTableExample.java.
This exercise features the KTable version of the Streams application shown in the Basic Operations exercise.
Start by creating a variable to store the string that we want to filter on:
final String orderNumberStart = "orderNumber-";
Now create the KTable instance. Note that you call builder.table instead of builder.stream; also, with the Materialized configuration object, you need to provide a name for the KTable in order for it to be materialized. It will use caching and will only emit the latest records for each key after a commit (which is 30 seconds, or when the cache is full at 10 MB).
KTable<String, String> firstKTable = builder.table(inputTopic,
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("ktable-store")
Add SerDes for the key and value on your Materialized object:
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()));
Add a filter operator for removing records that don't contain the order number variable value:
firstKTable.filter((key, value) -> value.contains(orderNumberStart))
Map the values by taking a substring:
.mapValues(value -> value.substring(value.indexOf("-") + 1))
Then filter again by taking out records where the number value of the string is less than or equal to 1000:
.filter((key, value) -> Long.parseLong(value) > 1000)
Convert the KTable to a KStream:
.toStream()
Add a peek operation to view the key values from the table:
.peek((key, value) -> System.out.println("Outgoing record - key " +key +" value " + value))
Write the records to a topic:
.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
Create a KafkaStreams object and run the topic data helper utility:
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps);
TopicLoader.runProducer();
Finally, start the application:
kafkaStreams.start();
Now you can run the KTable example with this command:
./gradlew runStreams -Pargs=ktable
You should let the application run for about 40 seconds, and you should see one result output:
Outgoing record - key order-key value 8400
There's a single output result because the sample data for this exercise has the same key.
This exercise features the KTable version of the first Streams Application. Let's start by creating a variable to store the string that we want to filter on. Now, create the KTable instance. Note, that you call builder.table versus builder.stream. With the Materialized config object here, you need to provide a name for the KTable in order for it to be Materialized. It will use caching and only admit the latest records for each key after a commit, which is 30 seconds or when the cache is full at 10 megabytes. Now, we will add the Serde for the Key and the Serde for the Value. Next, to add a filter operator, removing records that don't contain the order number variable value. Next, you'll map the values by taking a substring. Then, you'll filter again, taking out records, where the number value of string is less than or equal to 1000. Here, you want some convert the KTable, aka the update stream to a KStream, aka an event stream, add a peek operation to view the key values from the table. Finally, write the records to a topic. Now, create the KafkaStreams object and run the topic data helper utility, with this, you can now start the application. You should let this application run for about 40 seconds, and you'll see one result output, the latest update for the event with a given key.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.