Senior Software Engineer (Presenter)
Integration Architect (Author)
If you haven’t already, clone the course GitHub repository and load it into your favorite IDE or editor.
git clone https://github.com/confluentinc/learn-kafka-courses.git
cd learn-kafka-courses/kafka-streams
The source code in this course is compatible with Java 11. Compile the source with ./gradlew build
and follow along in the code. This module’s code can be found in the source file java/io/confluent/developer/ktable/KTableExample.java
.
This exercise features the KTable version of the Streams application shown in the Basic Operations exercise.
Start by creating a variable to store the string that we want to filter on:
final String orderNumberStart = "orderNumber-";
Now create the KTable instance. Note that you call builder.table
instead of builder.stream
; also, with the Materialized
configuration object, you need to provide a name for the KTable in order for it to be materialized. It will use caching and will only emit the latest records for each key after a commit (which is 30 seconds, or when the cache is full at 10 MB).
KTable<String, String> firstKTable = builder.table(inputTopic,
Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("ktable-store")
Add SerDes for the key and value on your Materialized
object:
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.String()));
Add a filter
operator for removing records that don't contain the order number variable value:
firstKTable.filter((key, value) -> value.contains(orderNumberStart))
Map the values by taking a substring:
.mapValues(value -> value.substring(value.indexOf("-") + 1))
Then filter again by taking out records where the number value of the string is less than or equal to 1000:
.filter((key, value) -> Long.parseLong(value) > 1000)
Convert the KTable to a KStream:
.toStream()
Add a peek
operation to view the key values from the table:
.peek((key, value) -> System.out.println("Outgoing record - key " +key +" value " + value))
Write the records to a topic:
.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
Create a KafkaStreams
object and run the topic data helper utility:
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps);
TopicLoader.runProducer();
Finally, start the application:
kafkaStreams.start();
Now you can run the KTable
example with this command:
./gradlew runStreams -Pargs=ktable
You should let the application run for about 40 seconds, and you should see one result output:
Outgoing record - key order-key value 8400
There's a single output result because the sample data for this exercise has the same key.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.