Senior Software Engineer (Presenter)
Integration Architect (Author)
If you haven’t already, clone the course GitHub repository and load it into your favorite IDE or editor.
git clone https://github.com/confluentinc/learn-kafka-courses.git
cd learn-kafka-courses/kafka-streams
The source code in this course is compatible with Java 11. Compile the source with ./gradlew build
and follow along in the code.
This module’s code can be found in the source file java/io/confluent/developer/processor/ProcessorApi.java
.
In this exercise, you will create an aggregation that calls a punctuation every 30 seconds. You'll use a ProcessorSupplier
and Processor
instance, whereby the Processor
will contain all of your stream processing logic.
Create the ProcessorSupplier
implementation:
static class TotalPriceOrderProcessorSupplier implements ProcessorSupplier<String, ElectronicOrder, String, Double> {
final String storeName;
}
Add a constructor. The get()
method implementation is important, since it returns a new processor instance each time it's called. You'll also declare a variable for ProcessorContext
and KeyValueStore
, and implement the init
method, which is called by Kafka Streams when the application is starting up. In the init
method, store a reference to the Processor context, get a reference to the state store by name, and store it in the storeName
variable declared above. Then use the processor context to schedule a punctuation that fires every 30 seconds, based on stream time.
@Override
public Processor<String, ElectronicOrder, String, Double> get() {
return new Processor<>() {
private ProcessorContext<String, Double> context;
private KeyValueStore<String, Double> store;
@Override
public void init(ProcessorContext<String, Double> context) {
this.context = context;
store = context.getStateStore(storeName);
this.context.schedule(Duration.ofSeconds(30), PunctuationType.STREAM_TIME, this::forwardAll);
}
}
Implement the forwardAll
method, beginning by opening an iterator for all records in the store. (It’s important to close iterators when you're done with them; it's best to use them within a try-with-resources
block, so that closing is automatic.)
private void forwardAll(final long timestamp) {
try (KeyValueIterator<String, Double> iterator = store.all()) {
Iterate over all of the records, and get a KeyValue
from the iterator inside your loop. In addition, create a new Record
instance. Then forward the Record
to any child nodes.
while (iterator.hasNext()) {
final KeyValue<String, Double> nextKV = iterator.next();
final Record<String, Double> totalPriceRecord = new Record<>(nextKV.key, nextKV.value, timestamp);
context.forward(totalPriceRecord);
System.out.println("Punctuation forwarded record - key " + totalPriceRecord.key() + " value " + totalPriceRecord.value());
}
}
}
Implement the Process
method on the Processor
interface by first getting the key
from the Record
, then using the key
to see if there is a value in the state store. If it's null, initialize it to "0.0". Add the current price from the record to the total, and place the new value in the store with the given key.
@Override
public void process(Record<String, ElectronicOrder> record) {
final String key = record.key();
Double currentTotal = store.get(key);
if (currentTotal == null) {
currentTotal = 0.0;
}
Double newTotal = record.value().getPrice() + currentTotal;
store.put(key, newTotal);
System.out.println("Processed incoming record - key " + key + " value " + record.value());
}
We're not quite done with the ProcessorSupplier
implementation, but we have some details to attend to first. Define the storeName
variable and create a StoreBuilder
, which you'll need for creating the state store. In the StoreBuilder
, set the store type to persistent
and use the storeName
variable for the name of the store. Add SerDes for the key/value types in the store (Kafka Streams stores everything as byte arrays in state stores).
final static String storeName = "total-price-store";
static StoreBuilder<KeyValueStore<String, Double>> totalPriceStoreBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(storeName),
Serdes.String(),
Serdes.Double());
With StoreBuilder
complete, now override the Stores
method on the Processor interface, which gives the Processor access to the store:
@Override
public Set<StoreBuilder<?>> stores() {
return Collections.singleton(totalPriceStoreBuilder);
}
Now build a topology for the streaming application. This will take a few more steps since we’re using the Processor API and not the Kafka Streams DSL. Begin by creating an instance and adding a source node (you need to provide the names for the source node, SerDes, and input topic):
final Topology topology = new Topology();
topology.addSource(
"source-node",
stringSerde.deserializer(),
electronicSerde.deserializer(),
inputTopic);
Next, add a processor to the topology. Provide a name for the Processor
, add the ProcessorSupplier
instance you created before, and set the parent name(s) for the Processor
(you can specify multiple names).
topology.addProcessor(
"aggregate-price",
new TotalPriceOrderProcessorSupplier(storeName),
"source-node");
Complete the topology by adding a sink node, specifying its name and then adding an output topic, SerDes, and parent name(s):
topology.addSink(
"sink-node",
outputTopic,
stringSerde.serializer(),
doubleSerde.serializer(),
"aggregate-price");
Finally, instantiate the kafkaStreams
object, add the utility method for creating topics and providing sample data, and start the application.
final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsProps);
TopicLoader.runProducer();
kafkaStreams.start();
You run this example with the following command:
./gradlew runStreams -Pargs=processor
Your results should look like this:
Processed incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "10261998", "price": 2000.0, "time": 1622156159867}
Punctuation forwarded record - key HDTV-2333 value 2000.0
Processed incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1033737373", "price": 1999.23, "time": 1622156194867}
Punctuation forwarded record - key HDTV-2333 value 3999.23
Processed incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1026333", "price": 4500.0, "time": 1622156229867}
Punctuation forwarded record - key HDTV-2333 value 8499.23
Processed incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1038884844", "price": 1333.98, "time": 1622156264867}
Punctuation forwarded record - key HDTV-2333 value 9833.21
Note that the timestamps are simulated to provide more activity, and the observed behavior could be different in a production environment.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.