Get Started Free
Screen Shot 2021-07-23 at 4.13.50 PM

Sophie Blee-Goldman

Senior Software Engineer (Presenter)

bill-bejeck

Bill Bejeck

Integration Architect (Author)

Hands On: Processor API

If you haven’t already, clone the course GitHub repository and load it into your favorite IDE or editor.

git clone https://github.com/confluentinc/learn-kafka-courses.git
cd learn-kafka-courses/kafka-streams

The source code in this course is compatible with Java 11. Compile the source with ./gradlew build and follow along in the code.

This module’s code can be found in the source file java/io/confluent/developer/processor/ProcessorApi.java.

In this exercise, you will create an aggregation that calls a punctuation every 30 seconds. You'll use a ProcessorSupplier and Processor instance, whereby the Processor will contain all of your stream processing logic.

  1. Create the ProcessorSupplier implementation:

    static class TotalPriceOrderProcessorSupplier implements ProcessorSupplier<String, ElectronicOrder, String, Double> {
        final String storeName;
    }
  2. Add a constructor. The get() method implementation is important, since it returns a new processor instance each time it's called. You'll also declare a variable for ProcessorContext and KeyValueStore, and implement the init method, which is called by Kafka Streams when the application is starting up. In the init method, store a reference to the Processor context, get a reference to the state store by name, and store it in the storeName variable declared above. Then use the processor context to schedule a punctuation that fires every 30 seconds, based on stream time.

    @Override
    public Processor<String, ElectronicOrder, String, Double> get() {
        return new Processor<>() {
            private ProcessorContext<String, Double> context;
            private KeyValueStore<String, Double> store;
            @Override
            public void init(ProcessorContext<String, Double> context) {
                this.context = context;
                store = context.getStateStore(storeName);
                this.context.schedule(Duration.ofSeconds(30), PunctuationType.STREAM_TIME, this::forwardAll);
             }
        }
  3. Implement the forwardAll method, beginning by opening an iterator for all records in the store. (It’s important to close iterators when you're done with them; it's best to use them within a try-with-resources block, so that closing is automatic.)

    private void forwardAll(final long timestamp) {
        try (KeyValueIterator<String, Double> iterator = store.all()) {
  4. Iterate over all of the records, and get a KeyValue from the iterator inside your loop. In addition, create a new Record instance. Then forward the Record to any child nodes.

    while (iterator.hasNext()) {
        final KeyValue<String, Double> nextKV = iterator.next();
        final Record<String, Double> totalPriceRecord = new Record<>(nextKV.key, nextKV.value, timestamp);
        context.forward(totalPriceRecord);
        System.out.println("Punctuation forwarded record - key " + totalPriceRecord.key() + " value " + totalPriceRecord.value());
        }
      }
    }
  5. Implement the Process method on the Processor interface by first getting the key from the Record, then using the key to see if there is a value in the state store. If it's null, initialize it to "0.0". Add the current price from the record to the total, and place the new value in the store with the given key.

    @Override
    public void process(Record<String, ElectronicOrder> record) {
        final String key = record.key();
        Double currentTotal = store.get(key);
        if (currentTotal == null) {
            currentTotal = 0.0;
        }
        Double newTotal = record.value().getPrice() + currentTotal;
        store.put(key, newTotal);
        System.out.println("Processed incoming record - key " + key + " value " + record.value());
    }
  6. We're not quite done with the ProcessorSupplier implementation, but we have some details to attend to first. Define the storeName variable and create a StoreBuilder, which you'll need for creating the state store. In the StoreBuilder, set the store type to persistent and use the storeName variable for the name of the store. Add SerDes for the key/value types in the store (Kafka Streams stores everything as byte arrays in state stores).

    final static String storeName = "total-price-store";
    static StoreBuilder<KeyValueStore<String, Double>> totalPriceStoreBuilder = Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore(storeName),
        Serdes.String(),
        Serdes.Double());
  7. With StoreBuilder complete, now override the Stores method on the Processor interface, which gives the Processor access to the store:

    @Override
    public Set<StoreBuilder<?>> stores() {
        return Collections.singleton(totalPriceStoreBuilder);
    }
  8. Now build a topology for the streaming application. This will take a few more steps since we’re using the Processor API and not the Kafka Streams DSL. Begin by creating an instance and adding a source node (you need to provide the names for the source node, SerDes, and input topic):

    final Topology topology = new Topology();
    topology.addSource(
        "source-node",
        stringSerde.deserializer(),
        electronicSerde.deserializer(),
        inputTopic);
  9. Next, add a processor to the topology. Provide a name for the Processor, add the ProcessorSupplier instance you created before, and set the parent name(s) for the Processor (you can specify multiple names).

    topology.addProcessor(
        "aggregate-price",
        new TotalPriceOrderProcessorSupplier(storeName),
        "source-node");
  10. Complete the topology by adding a sink node, specifying its name and then adding an output topic, SerDes, and parent name(s):

    topology.addSink(
        "sink-node",
        outputTopic,
        stringSerde.serializer(),
        doubleSerde.serializer(),
        "aggregate-price");
  11. Finally, instantiate the kafkaStreams object, add the utility method for creating topics and providing sample data, and start the application.

    final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsProps);
        TopicLoader.runProducer();
        kafkaStreams.start();

You run this example with the following command:

./gradlew runStreams -Pargs=processor

Your results should look like this:

Processed incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "10261998", "price": 2000.0, "time": 1622156159867}
Punctuation forwarded record - key HDTV-2333 value 2000.0
Processed incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1033737373", "price": 1999.23, "time": 1622156194867}
Punctuation forwarded record - key HDTV-2333 value 3999.23
Processed incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1026333", "price": 4500.0, "time": 1622156229867}
Punctuation forwarded record - key HDTV-2333 value 8499.23
Processed incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1038884844", "price": 1333.98, "time": 1622156264867}
Punctuation forwarded record - key HDTV-2333 value 9833.21

Note that the timestamps are simulated to provide more activity, and the observed behavior could be different in a production environment.

Use the promo code STREAMS101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.