Get Started Free
Screen Shot 2021-07-23 at 4.13.50 PM

Sophie Blee-Goldman

Senior Software Engineer (Presenter)

bill-bejeck

Bill Bejeck

Integration Architect (Author)

Processor API

So far you've learned about the Kafka Streams DSL, which gives you maximum developer velocity when building an application, but is very opinionated. Sometimes you need to accomplish things that aren't provided to you by the DSL, and that's where the Processor API comes in. It gives you maximum flexibility, but you are responsible for all of the details.

The Processor API gives you access to state stores for custom stateful operations. In the Stateful Operations module, you learned about state and the DSL, which uses under-the-hood state stores that you can't directly access. With the Processor API, you create a store that you pass in, so you have full access to the store: You can pull out all of the records, you can do a range query, you can do all sorts of things. You can also programmatically call commit, and you can control how often it's called.

Punctuation

Punctuation gives you the ability to schedule arbitrary operations. When you schedule a punctuator, it can use either stream-time processing or wall-clock-time processing.

Stream-Time Processing

With stream-time processing, you schedule punctuation for an arbitrary action that you want to fire periodically—every 30 seconds, for example—and the action is driven by the timestamps on the records.

Wall-Clock-Time Processing

You can also schedule punctuation according to wall-clock time. Under the hood, Kafka Streams uses a consumer, and consumers call poll() to get records. Wall-clock time advances with the poll() calls. So wall-clock time advancement depends in part on how long it takes you to return from a poll() loop.

Building Streams Applications with the Processor API

You will recall the Kafka Streams topology from the Basic Operations module. Related to that, Kafka Streams applications with the Processor API are typically built as follows:

  • Add source node(s)
  • Add N number of processors, child nodes of the source node(s) (child nodes can have any number of parents)
  • Optionally create StoreBuilder instances and attach them to the processor nodes to give them access to state stores
  • Add sink node(s), to which one or more parent nodes forward records

When you create nodes, you need to provide a name for each one, since you use the name of one node as the parent name of another. As mentioned, a node can have more than one child node, but you can be selective about the nodes to which you forward records.

Creating a Topology with the Processor API

Creating a topology involves straightforward code:

Topology topology = new Topology();

topology.addSource(“source-node”, “topicA”, ”topicB”);

topology.addProcessor(“custom-processor”, new CustomProcessorSupplier(storeName), “source-node”);

toplogy.addSink(“sink-node”, “output-topic”, “custom-processor”);

First you create an instance and add a source node. Then you add a custom processor, whose parent is the source node. Finally, you create a sink node, whose parent is the custom processor. For any one of these nodes, you can have multiple parent nodes and multiple child nodes—this is just a simple example.

Use the promo code STREAMS101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.