VP Developer Relations
In a growing Apache Kafka-based application, consumers tend to grow in complexity. What might have started as a simple stateless transformation (e.g., masking out personally identifiable information or changing the format of a message to conform with internal schema requirements) soon evolves into complex aggregation, enrichment, and more. If you recall the consumer code we looked at previously, there isn’t much support in that API for operations like those: You’re going to have to build a lot of framework code to handle time windows, late-arriving messages, lookup tables, aggregation by key, and more. And once you’ve got that, recall that operations like aggregation and enrichment are typically stateful.
That “state” is going to be memory in your program’s heap, which means it’s a fault tolerance liability. If your stream processing application goes down, its state goes with it, unless you’ve devised a scheme to persist that state somewhere. That sort of thing is fiendishly complex to write and debug at scale and really does nothing to directly make your users’ lives better. This is why Apache Kafka provides a stream processing API. This is why we have Kafka Streams.
Kafka Streams is a Java API that gives you easy access to all of the computational primitives of stream processing: filtering, grouping, aggregating, joining, and more, keeping you from having to write framework code on top of the consumer API to do all those things. It also provides support for the potentially large amounts of state that result from stream processing computations. If you’re grouping events in a high-throughput topic by a field with many unique values then computing a rollup over that group every hour, you might need to use a lot of memory.
Indeed, for high-volume topics and complex stream processing topologies, it’s not at all difficult to imagine that you’d need to deploy a cluster of machines sharing the stream processing workload like a regular consumer group would. The Streams API solves both problems by handling all of the distributed state problems for you: It persists state to local disk and to internal topics in the Kafka cluster, and it automatically reassigns state between nodes in a stream processing cluster when adding or removing stream processing nodes to the cluster.
In a typical microservice, stream processing is a thing the application does in addition to other functions. For example, a shipment notification service might combine shipment events with events in a product information changelog containing customer records to produce shipment notification objects, which other services might turn into emails and text messages. But that shipment notification service might also be obligated to expose a REST API for synchronous key lookups by the mobile app or web front end when rendering views that show the status of a given shipment.
The service is reacting to events—and in this case, joining three streams together, and perhaps doing other windowed computations on the joined result—but it is also servicing HTTP requests against its REST endpoint, perhaps using the Spring Framework or Micronaut or some other Java API in common use. Because Kafka Streams is a Java library and not a set of dedicated infrastructure components that do stream processing and only stream processing, it’s trivial to stand up services that use other frameworks to accomplish other ends (like REST endpoints) and sophisticated, scalable, fault-tolerant stream processing.
Here is a code listing that illustrates some concepts from the Streams API. We won’t take the time here to walk through it all, but you might find it helpful just to get a sense of the API’s approach. This code computes the average of a value in a stream (raw-ratings), then joins that average to a table (movies) to produce a new aggregated, enriched topic (rated-movies).
StreamsBuilder builder = new StreamsBuilder();
builder.stream("raw-movies", Consumed.with(Serdes.Long(), Serdes.String())).mapValues(Parser::parseMovie).map((key, movie) -> new KeyValue<>(movie.getMovieId(), movie)).to("movies", Produced.with(Serdes.Long(), movieSerde));
KTable<Long, Movie> movies = builder.table("movies", Materialized.<Long, Movie, KeyValueStore<Bytes, byte[]>>as("movies-store").withValueSerde(movieSerde).withKeySerde(Serdes.Long()));
KStream<Long, String> rawRatings = builder.stream("raw-ratings", Consumed.with(Serdes.Long(), Serdes.String()));
KStream<Long, Rating> ratings = rawRatings.mapValues(Parser::parseRating).map((key, rating) -> new KeyValue<>(rating.getMovieId(), rating));
KStream<Long, Double> numericRatings = ratings.mapValues(Rating::getRating);
KGroupedStream<Long, Double> ratingsById = numericRatings.groupByKey();
KTable<Long, Long> ratingCounts = ratingsById.count();
KTable<Long, Double> ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2);
KTable<Long, Double> ratingAverage = ratingSums.join(ratingCounts, (sum, count) -> sum / count.doubleValue(),Materialized.as("average-ratings"));
ratingAverage.toStream().to("average-ratings");
KTable<Long, String> ratedMovies = ratingAverage.join(movies, (avg, movie) -> movie.getTitle() + "=" + avg);
ratedMovies.toStream().to("rated-movies", Produced.with(Serdes.Long(), Serdes.String()));
For a more thorough introduction to stream processing in Kafka, check out the Kafka Streams 101 course or Michael Noll’s four-part series on Streams and Tables in Apache Kafka.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.
Hey, Tim Berglund here with Confluent to tell you all about Kafka Streams. In a growing Kafka-based application, consumers tend to grow in complexity, maybe producers don't too much, but consumers definitely do. What may have started as a simple stateless transformation, like masking out some personally identifying information or changing the format of a message to conform with some internal schema requirements or something like that, soon evolves into complex aggregation and enrichment and all kinds of crazy things like that. If you recall the consumer code we looked at in the section on Consumers, there isn't a lot of support in that API. Remember I stressed, it was a very simple API, not a lot of support for things like that. You're gonna have to build a lot of framework code to handle TimeWindows, late arriving messages, out-of-order messages, lookup tables, aggregating by key, and more. And once you've gone there, recall that operations like aggregation and enrichment are typically stateful. Now that state, and let's just be precise here, that's memory allocated in your program's heap. That means it's a fault tolerance liability. If your stream processing application goes down, its state goes with it. That is unless you've devised a scheme to persist that state somewhere. And that kind of thing is fiendishly complex. If you don't just happen to have an in-memory data grid deployed and humming along nicely, and you can just put things into it performantly and get them out, most of us don't, you don't want to write and debug that kind of thing at scale. And again, this is another thing that does nothing directly to make your users' lives better, delivers no direct value to the business. This is why Apache Kafka provides a stream processing API. This is called Kafka Streams. Now Kafka Streams is a Java API, and I'm really telling you about it in principle here. This is not a full API tutorial. That's kind of a lengthy thing on its own. I really just wanna get you the idea of what Streams is all about. And what it's trying to do is give you easy access to all of the computational primitives of stream processing. The basics here are filtering, grouping, aggregating, and joining. Now there's more to it than that, but those are good primitives to start with. This keeps you from having to write framework code on top of the consumer API to do all those things. It also provides direct support for the potentially large amounts of state that result from doing stream processing computations like these. Just about anything interesting that you're gonna do in a consumer is going to be stateful. If you're grouping events in a high throughput topic by a field with lots of unique values, let's say, and then computing a roll-up over that group every hour, every five minutes, every whatever, you're gonna use, potentially, a lot of memory. You don't wanna be in the business of managing that anymore than you have to. You don't wanna be in the business of thinking about fault tolerance in terms of that memory. You don't wanna be persisting that somewhere else. You'd much rather someone do that for you. And let's keep going with this. For high volume topics and complex stream processing topologies that are computationally expensive, it's really not hard to imagine that you're gonna need to deploy a cluster of machines sharing the stream processing workload like a regular consumer group would. Now stream processing isn't necessarily fundamentally inexpensive thing, but it can be. You can do really rich things. And the Streams API helps you out here by handling all of the distributed state problems for you. It manages state off-heap, since Streams is a Java API, that state goes off-heap, persists it to local disk, and persists that same state to internal topics in the Kafka cluster. Local disk is nice, right? If you bounce, if you go down, you come back up, "Oh, there's my state. I'll just swap that back into memory." You have a super high bandwidth connection to that local disk, whatever local disk means depending on how you're deployed, right? But that node can go away altogether. You can lose that storage. And so having that topic in the cluster to be able to restore state from is an incredibly important backstop. And this gets used not just during failure, but also when you're adding or removing stream processing nodes to your cluster. And when I say cluster here, talking about a Kafka Streams application, I really mean consumer group. So if you go back to the lesson on consumers, a Kafka Streams application is a consumer group. So that scale out that consumer groups do automatically, Kafka Streams is building on that and solving this hairy state problem that I seem to be so worried about solving that problem for you so you don't have to write that framework code. An example, in a typical microservice, stream processing is a thing that the application does in addition to some other functions, right? Like that service does something. Shipment notification service might combine shipment events with events in a product information change log. So a topic that has like product records, inventory records in it. And it might also need customer records to produce shipment notification objects, needs a customer record to get like an address or something like that. Other services might say turn those shipment notifications into emails and text messages, let's just say. But what I mean by a service doing other functions other than stream processing, you know, the combining of those data sources, that's the stream processing. The rest of its life might be that it's obligated to expose a REST API for synchronous key lookups, like the mobile app. I wanna see what's the status of my shipment right now. That's a synchronous operation. I should have a REST API for that. And so that service needs to do that kind of thing. It needs to expose that kind of API in addition to the stream processing that it's done. So that service, that shipping service, is reacting to events. It's reacting to shipment events, and it's joining, I named three streams, I think, product, user, shipment, joining those three streams together and maybe doing some other windowed computations on that joined result, but it's also servicing HTTP requests against its REST endpoint. Maybe it could be using Spring Boot or Micronaut or some other Java API that's in common use. Because Kafka Streams is a Java library and not some new set of dedicated infrastructure components that just do stream processing over there in their special cluster, it's trivial to standup services that use those other frameworks, that use Micronaut, use Spring Boot, use whatever it is that you'd like, to accomplish those other purposes, like REST endpoints. And it turns them into sophisticated, scalable, fault tolerance stream processing applications. This is a key opinion of Kafka Streams, that Kafka Streams is a library, it's not infrastructure. It's a thing that you add to an application that you already need. You already know how to deploy that application as a Kafka consumer. And that consumer group already has its own scaling capabilities built in. Kafka Streams just gets on board with that and makes that work even better and gives you these layers of functional stream processing functionality without you having to build all that framework yourself. So here's a code listing that just illustrates some concepts from the Streams API. And I'm not gonna take the time to walk through all of this line-by-line, but you might just find it useful to get a sense of its approach to things and some of the keywords. You can pause it and look at it. Not a complete tutorial here, but I wanna just get the code into your eyes so you know what it looks like. This code, what it's doing is it's computing the average value in a stream, a stream called raw-ratings. And then joining that average value to a table, a table called movies, to produce a new, aggregated, enriched topic called rated-movies. Oh, I definitely recommend you get your hands dirty at this point with some code. So this will start to make a lot more sense when you type it out and when you see it run yourself.