Get Started Free
‹ Back to courses
course: Kafka Streams 101

Hands On: Processor API

4 min
Screen Shot 2021-07-23 at 4.13.50 PM

Sophie Blee-Goldman

Senior Software Engineer (Presenter)

bill-bejeck

Bill Bejeck

Integration Architect (Author)

Hands On: Processor API

If you haven’t already, clone the course GitHub repository and load it into your favorite IDE or editor.

git clone https://github.com/confluentinc/learn-kafka-courses.git
cd learn-kafka-courses/kafka-streams

The source code in this course is compatible with Java 11. Compile the source with ./gradlew build and follow along in the code.

This module’s code can be found in the source file java/io/confluent/developer/processor/ProcessorApi.java.

In this exercise, you will create an aggregation that calls a punctuation every 30 seconds. You'll use a ProcessorSupplier and Processor instance, whereby the Processor will contain all of your stream processing logic.

  1. Create the ProcessorSupplier implementation:

    static class TotalPriceOrderProcessorSupplier implements ProcessorSupplier<String, ElectronicOrder, String, Double> {
        final String storeName;
    }
  2. Add a constructor. The get() method implementation is important, since it returns a new processor instance each time it's called. You'll also declare a variable for ProcessorContext and KeyValueStore, and implement the init method, which is called by Kafka Streams when the application is starting up. In the init method, store a reference to the Processor context, get a reference to the state store by name, and store it in the storeName variable declared above. Then use the processor context to schedule a punctuation that fires every 30 seconds, based on stream time.

    @Override
    public Processor<String, ElectronicOrder, String, Double> get() {
        return new Processor<>() {
            private ProcessorContext<String, Double> context;
            private KeyValueStore<String, Double> store;
            @Override
            public void init(ProcessorContext<String, Double> context) {
                this.context = context;
                store = context.getStateStore(storeName);
                this.context.schedule(Duration.ofSeconds(30), PunctuationType.STREAM_TIME, this::forwardAll);
             }
        }
  3. Implement the forwardAll method, beginning by opening an iterator for all records in the store. (It’s important to close iterators when you're done with them; it's best to use them within a try-with-resources block, so that closing is automatic.)

    private void forwardAll(final long timestamp) {
        try (KeyValueIterator<String, Double> iterator = store.all()) {
  4. Iterate over all of the records, and get a KeyValue from the iterator inside your loop. In addition, create a new Record instance. Then forward the Record to any child nodes.

    while (iterator.hasNext()) {
        final KeyValue<String, Double> nextKV = iterator.next();
        final Record<String, Double> totalPriceRecord = new Record<>(nextKV.key, nextKV.value, timestamp);
        context.forward(totalPriceRecord);
        System.out.println("Punctuation forwarded record - key " + totalPriceRecord.key() + " value " + totalPriceRecord.value());
        }
      }
    }
  5. Implement the Process method on the Processor interface by first getting the key from the Record, then using the key to see if there is a value in the state store. If it's null, initialize it to "0.0". Add the current price from the record to the total, and place the new value in the store with the given key.

    @Override
    public void process(Record<String, ElectronicOrder> record) {
        final String key = record.key();
        Double currentTotal = store.get(key);
        if (currentTotal == null) {
            currentTotal = 0.0;
        }
        Double newTotal = record.value().getPrice() + currentTotal;
        store.put(key, newTotal);
        System.out.println("Processed incoming record - key " + key + " value " + record.value());
    }
  6. We're not quite done with the ProcessorSupplier implementation, but we have some details to attend to first. Define the storeName variable and create a StoreBuilder, which you'll need for creating the state store. In the StoreBuilder, set the store type to persistent and use the storeName variable for the name of the store. Add SerDes for the key/value types in the store (Kafka Streams stores everything as byte arrays in state stores).

    final static String storeName = "total-price-store";
    static StoreBuilder<KeyValueStore<String, Double>> totalPriceStoreBuilder = Stores.keyValueStoreBuilder(
        Stores.persistentKeyValueStore(storeName),
        Serdes.String(),
        Serdes.Double());
  7. With StoreBuilder complete, now override the Stores method on the Processor interface, which gives the Processor access to the store:

    @Override
    public Set<StoreBuilder<?>> stores() {
        return Collections.singleton(totalPriceStoreBuilder);
    }
  8. Now build a topology for the streaming application. This will take a few more steps since we’re using the Processor API and not the Kafka Streams DSL. Begin by creating an instance and adding a source node (you need to provide the names for the source node, SerDes, and input topic):

    final Topology topology = new Topology();
    topology.addSource(
        "source-node",
        stringSerde.deserializer(),
        electronicSerde.deserializer(),
        inputTopic);
  9. Next, add a processor to the topology. Provide a name for the Processor, add the ProcessorSupplier instance you created before, and set the parent name(s) for the Processor (you can specify multiple names).

    topology.addProcessor(
        "aggregate-price",
        new TotalPriceOrderProcessorSupplier(storeName),
        "source-node");
  10. Complete the topology by adding a sink node, specifying its name and then adding an output topic, SerDes, and parent name(s):

    topology.addSink(
        "sink-node",
        outputTopic,
        stringSerde.serializer(),
        doubleSerde.serializer(),
        "aggregate-price");
  11. Finally, instantiate the kafkaStreams object, add the utility method for creating topics and providing sample data, and start the application.

    final KafkaStreams kafkaStreams = new KafkaStreams(topology, streamsProps);
        TopicLoader.runProducer();
        kafkaStreams.start();

You run this example with the following command:

./gradlew runStreams -Pargs=processor

Your results should look like this:

Processed incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "10261998", "price": 2000.0, "time": 1622156159867}
Punctuation forwarded record - key HDTV-2333 value 2000.0
Processed incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1033737373", "price": 1999.23, "time": 1622156194867}
Punctuation forwarded record - key HDTV-2333 value 3999.23
Processed incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1026333", "price": 4500.0, "time": 1622156229867}
Punctuation forwarded record - key HDTV-2333 value 8499.23
Processed incoming record - key HDTV-2333 value {"order_id": "instore-1", "electronic_id": "HDTV-2333", "user_id": "1038884844", "price": 1333.98, "time": 1622156264867}
Punctuation forwarded record - key HDTV-2333 value 9833.21

Note that the timestamps are simulated to provide more activity, and the observed behavior could be different in a production environment.

Use the promo code STREAMS101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.

Hands On: Processor API

In this exercise, we're going to write a processor supplier and processor instance, in which the processor will contain all the stream processing logic. The first step is to create this processor supplier implementation. This returns a new processor instance each time get is called. Next step is to add a constructor and you'll see where the storeName variable comes into play soon. The get method implementation is important That it returns a new processor instance, each time it's called declaring a variable for the processor context, declaring a variable for the key value store and implementing the init method. This is called by a KafkaStreams when the application is starting up. You want to store a reference to the processor context. Here you're getting a reference to the state store, by name, and storing it in the variable declared earlier. Here, you're using the processor context to schedule a punctuation to fire every 30 seconds, based on stream time. Next you'll implement the forward all method. First, you'll open an iterator for all records in the store. Note: it's very important to always close iterators when you're done with them. Its best practice is to use it within a try with resources block. So closing in automatic. Then you'll iterate over all the records. Inside the loop, you'll get a key value from the iterator. Next you'll create a new record instance. Finally, you'll forward the record to any child nodes. Then you implement the process method on the processor interface. First, get the key from the record. Then use the key to see if there's a value in the state store. If it's null, initialize it to zero. Next, add the current price from the record to the total. Finally, place the new value in the store with the given key. We're not quite done with the processor supplier implementation, but we have some details to attend to first. Let's define the storeName variable. You'll use it very soon. Next you'll create a store builder. You'll need this for creating the state store In the store builder, you're setting the store type to persistent and using the storeName variable defined earlier for the name of the store. Next you'll add serdes for the key and value types in the store. KafkaStreams stores everything as byte arrays in the state source. The store builder complete. Now override the store's method on the processor interface, which gives the processor access to the store. Now let's build the topology for the streaming application. This will take a few more steps since it's the processor API versus the streams DSL. First, you'll add a source node. Then you need to provide the name of the source node, followed by the key deserializer and then the value deserializer. Finally, you'll add the input topic name to complete the source node. Next, let's add a processor to the topology. Again, you provide a name for the processor, then you'll add the processor supplier instance you created before. To complete the processor node definition, you'll set the parent name or names of this processor Note there can be more than one parent name specified. Finally, let's complete the topology by adding a sink node. As we've done with the other node, you'll provide the name for the sink node. Next you'll specify the name of the output topic. Then you'll add the key serializer and the value serializer as well. Finally, you'll add the parent name or names for the sink node. And congrats! You've just built a topology using the processor API. Now you'll instantiate the KafkaStreams objects. Again, you'll add the utility method that will create topics and provide sample data. Finally, let's fire up the application. Note that the timestamps are simulated to provide more activity and the observed behavior could be different in a production environment.