Senior Software Engineer (Presenter)
Integration Architect (Author)
So far you've learned about the Kafka Streams DSL, which gives you maximum developer velocity when building an application, but is very opinionated. Sometimes you need to accomplish things that aren't provided to you by the DSL, and that's where the Processor API comes in. It gives you maximum flexibility, but you are responsible for all of the details.
The Processor API gives you access to state stores for custom stateful operations. In the Stateful Operations module, you learned about state and the DSL, which uses under-the-hood state stores that you can't directly access. With the Processor API, you create a store that you pass in, so you have full access to the store: You can pull out all of the records, you can do a range query, you can do all sorts of things. You can also programmatically call commit, and you can control how often it's called.
Punctuation gives you the ability to schedule arbitrary operations. When you schedule a punctuator, it can use either stream-time processing or wall-clock-time processing.
With stream-time processing, you schedule punctuation for an arbitrary action that you want to fire periodically—every 30 seconds, for example—and the action is driven by the timestamps on the records.
You can also schedule punctuation according to wall-clock time. Under the hood, Kafka Streams uses a consumer, and consumers call poll()
to get records. Wall-clock time advances with the poll()
calls. So wall-clock time advancement depends in part on how long it takes you to return from a poll()
loop.
You will recall the Kafka Streams topology from the Basic Operations module. Related to that, Kafka Streams applications with the Processor API are typically built as follows:
When you create nodes, you need to provide a name for each one, since you use the name of one node as the parent name of another. As mentioned, a node can have more than one child node, but you can be selective about the nodes to which you forward records.
Creating a topology involves straightforward code:
Topology topology = new Topology();
topology.addSource(“source-node”, “topicA”, ”topicB”);
topology.addProcessor(“custom-processor”, new CustomProcessorSupplier(storeName), “source-node”);
toplogy.addSink(“sink-node”, “output-topic”, “custom-processor”);
First you create an instance and add a source node. Then you add a custom processor, whose parent is the source node. Finally, you create a sink node, whose parent is the custom processor. For any one of these nodes, you can have multiple parent nodes and multiple child nodes—this is just a simple example.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.