Senior Software Engineer (Presenter)
Integration Architect (Author)
So far you've learned about the Kafka Streams DSL, which gives you maximum developer velocity when building an application, but is very opinionated. Sometimes you need to accomplish things that aren't provided to you by the DSL, and that's where the Processor API comes in. It gives you maximum flexibility, but you are responsible for all of the details.
The Processor API gives you access to state stores for custom stateful operations. In the Stateful Operations module, you learned about state and the DSL, which uses under-the-hood state stores that you can't directly access. With the Processor API, you create a store that you pass in, so you have full access to the store: You can pull out all of the records, you can do a range query, you can do all sorts of things. You can also programmatically call commit, and you can control how often it's called.
Punctuation gives you the ability to schedule arbitrary operations. When you schedule a punctuator, it can use either stream-time processing or wall-clock-time processing.
With stream-time processing, you schedule punctuation for an arbitrary action that you want to fire periodically—every 30 seconds, for example—and the action is driven by the timestamps on the records.
You can also schedule punctuation according to wall-clock time. Under the hood, Kafka Streams uses a consumer, and consumers call poll() to get records. Wall-clock time advances with the poll() calls. So wall-clock time advancement depends in part on how long it takes you to return from a poll() loop.
You will recall the Kafka Streams topology from the Basic Operations module. Related to that, Kafka Streams applications with the Processor API are typically built as follows:
When you create nodes, you need to provide a name for each one, since you use the name of one node as the parent name of another. As mentioned, a node can have more than one child node, but you can be selective about the nodes to which you forward records.
Creating a topology involves straightforward code:
Topology topology = new Topology();
topology.addSource(“source-node”, “topicA”, ”topicB”);
topology.addProcessor(“custom-processor”, new CustomProcessorSupplier(storeName), “source-node”);
toplogy.addSink(“sink-node”, “output-topic”, “custom-processor”);
First you create an instance and add a source node. Then you add a custom processor, whose parent is the source node. Finally, you create a sink node, whose parent is the custom processor. For any one of these nodes, you can have multiple parent nodes and multiple child nodes—this is just a simple example.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.
Hi, I'm Sophie Blee-Goldman with Confluent. And in this module, we're going to talk about the Processor API. So far we've been showing you how to build applications using something called the DSL. All those operators, the K-Stream, the K-Table, and the Streams Builder that we used to create them, that's all what we call the DSL and it's really just a high level language that you can use in Kafka Streams to specify the processor topology. So the DSL takes whatever you want your stream processing application to do and it builds that processor topology for you. Now, the DSL operators cover most of what you would ever wanna do with the stream processing application. But, what if it doesn't? Well, never fear because the Processor API is here. This lets you do pretty much anything you can imagine using Kafka Streams. By specifying the actual processor topology node by node. Now this is nice because you have full control, but of course this is a lower level API, so you're really responsible for all the details and that's just a it's going to be a little bit more work, but it gives you much more flexibility to build the application that you want. So one of the nice things about the Processor API is that you have access to the state stores that is direct access. In the previous operators such as the accounts or the aggregator, the reduce, there was a state store underlying these actual operators, but you didn't have access to it. You couldn't directly call put or get, or any of the usual operations that you might imagine a state store is useful for. With the Processor API, you have full control over these state stores. You can add one or two or any number of state stores that you might need for whatever your logic is and use them however you want. So you can store things using the windows store that is used for the windowed operations or just a normal key value store. And this lets you do pretty much any kind of logic that you might want all on a single node that processes records one by one, just the way the DSL does. Another nice thing about the Processor API is that you can call commit directly. So you might be familiar with the plain consumer client in Kafka, which has an auto commit function. Now this auto commit is actually disabled in Kafka Streams, but there still is a commit that Kafka Streams performs every 30 seconds by default, as we touched on earlier. And with the Processor API, you can actually use it to call commit earlier or on whatever schedule you would prefer rather than waiting for that 30 seconds or whatever you have set the interval to be. Now keep in mind, this is really more of a request to commit. It does not guarantee that the commit is going to happen, right that second, right that minute. Rather, it just signals to Kafka Streams that you would like to commit to happen after it's done processing that record. Now really one of the most powerful things that the Processor API provides, which really cannot be done in the DSL is it gives you the ability to schedule arbitrary operations using something called punctuator. So a punctuator is just a function that you define that gets run at some regular interval. And you can define this interval based on either stream time processing, like event time or wall clock time. So when stream time processing, you would specify something like 100 milliseconds and the punctuation would only be invoked after stream time had been advanced by 100 milliseconds. So keep in mind stream time is driven by the events themselves. So if no new events come in, then no punctuation is going to be invoked. But, if you have many events coming in all with increasing timestamps, every time you increase by 100 milliseconds, your punctuator is going to be triggered and whatever your punctuation is will be run. Second is the wall clock time punctuation. So with wall clock time, unlike stream time, it's completely divorced from the events themselves. That means that regardless of whether there are events or there are no events, this punctuation is always going to be in vote. And again, you specify an interval. So if you specify a 100 millisecond interval again, then this Wall clock punctuation will be triggered as close to every 100 milliseconds as it can achieve. Regardless of whether any events pass through or not. Now, if you are processing a lot of events, then sometimes this wall clock might not be triggered exactly on the 100 millisecond mark. This is because Kafka Streams will process a batch of records at a time. So the consumer hands, a batch of records to Kafka Streams and Kafka Streams will process these records one at a time through the processor topology. And only after it has finished processing these records will it check this wall clock time punctuation. And if enough time has elapsed, then it will be triggered. Which means that if those records take longer than expected or you happen to get a large number of records that are processed, it's possible that the wall clock time punctuation won't be triggered, right, exactly at the 100 millisecond mark that you specified, but instead will be triggered as soon as possible or as close to the time you specified. So you can't really rely on it. I think often the wall clock time people kind of take to mean that it's exact, and it's really more of an approximation similar to the commit where it's not forcing Kafka Streams to commit it is making a request to commit. So how do you actually build a Kafka Streams application with the Processor API? As we mentioned before, it's a lower level API in which you actually build up this processor topology that we've discussed node by node. Though, this basic pattern of you add a source node or social nodes. This is similar to the K stream or the K table a source note is just something that reads from a topic in Kafka. You can then add any number of custom logic processors as child nodes of the source nodes. These are just the downstream nodes of the source in this directed acyclic graph. And for each of these processor nodes, you can optionally create any number of store builders. A store builder is just a way that you can tell this node that it would like to attach a state store that you can then use when processing events through this node. Lastly, at the end of your processor topology, you're going to add a sink node or sink nodes. And these can be used to send records back to Kafka into whatever topic you want as they exit the processor of topology. So as you add nodes to build up this processor topology, you're going to use the name of the parent to actually specify how this edge of the directed acyclic graph connects these nodes to each other. So when you add one node, you just tell it, what's the name of the parent node and Kafka Streams will do all the work under the hood to connect all these nodes together. Now processors can have more than one parent node. So, you can give them any number of parent nodes and all of those nodes will then flow or forward the records into this new node that you've added. The Processor API really gives you the flexibility to forward records, to, you know, any number of child nodes or just specific ones. So if you do have a node with multiple children nodes, then you can decide whether you want each record to be forwarded to all of them, just a single one or to any subset with the Processor API, you have full flexibility over all of these things. So first, let's show a quick example of how you would use the Processor API. And we'll dig into this more in the exercise. First, you create a typology object and this is just the foundation for this processor topology graph that you're going to build up. Then you add a source node. So in this case, we have source node as the name of the node and topic A and topic B as the name of the topic. The topics that will be read by this source. So in all cases, you have to actually give a name to each of these nodes because that's how you specify their children. So once you have a source node, you can start to add more nodes to actually build up this processor topology. And you do this by calling addProcessor. Now again, the first argument to addProcessor. is going to be the name of the processor itself. Like we said, it always needs a name and this name should always be unique. The second argument here is the processor itself or the processor supplier. Now it's important that this processor always be a unique instance of whatever the processor object is. And thus you supply a new instance in your processor supplier rather than creating a single instance and returning that over and over again into our supplier. This is the basic supplier pattern that you'll find in Java. Now also note that the last argument to the add processor method is source node. So this is just the parent node for this processor. And it says, hook this up to the source node that we created earlier. And any records in source node will then be able to be forwarded down to the custom processor that we have just attached. So now that you have your processor or processors and have constructed the processor topology with all your custom logic. You can choose to add a sink node. A sink node is not strictly necessary. You can choose not to forward your records to any topic in Kafka, but that's generally what people use Kafka Streams for. So to do so, now all you need to do is call addSink and pass it in the name of the processor again, which is sink-node. And this time the name of the topic that the records will be sent to. So, somewhat like the converse of the addSource, the addSink just takes the output topic name in our case 'output-topic'. And then lastly, it also is provided a custom processor. Now in this case, custom processor is just whatever the parent node is for our sink node. If you have any number of parent nodes that forward to the single sink, then you would provide more than just that one processor node name. The key here is that you're really responsible for hooking up all of these nodes and defining all the edges. Like we said, building up this graph exactly how it looks and its shape all yourself. So it's a little bit more work, a lot more code to do something relatively simple than in the case of the DSL. But it's a lot more powerful and you can really do almost anything that you would want to using the Processor API now. And interesting thing about the Processor API is that you can actually mix it into the DSL. Now the DSL is usually preferred because it's simpler, you write less code, it's easier to verify that it's correct because most of it is already written for you, but it can't do everything like we said. So in those cases you might want to take advantage of the DSL, but also use the Processor API for those one or two custom logic conditions where the DSL just isn't cutting it. So the way that you can do this is using a special operators in the DSL called transform transform values and process. Now, these just allow you to specify a processor supplier, like we saw in the earlier example where you just define processor and you can insert any amount of custom logic, state stores, commits, punctuations, all of the same things that you can use in the processor API. And they just become one step in the middle of your DSL application. So it's great because you can use the more convenient DSL for most of the processing steps and only insert the Processor API when you need to dig down into a lower level API to get those more complicated details. So that's the Processor API. Now we're going to go through an exercise so you can get a feel for what it's like to use it and how to do so.