In a growing Apache Kafka-based application, consumers tend to grow in complexity. What might have started as a simple stateless transformation (e.g., masking out personally identifiable information or changing the format of a message to conform with internal schema requirements) soon evolves into complex aggregation, enrichment, and more. If you recall the consumer code we looked at previously, there isn’t much support in that API for operations like those: You’re going to have to build a lot of framework code to handle time windows, late-arriving messages, lookup tables, aggregation by key, and more. And once you’ve got that, recall that operations like aggregation and enrichment are typically stateful.
That “state” is going to be memory in your program’s heap, which means it’s a fault tolerance liability. If your stream processing application goes down, its state goes with it, unless you’ve devised a scheme to persist that state somewhere. That sort of thing is fiendishly complex to write and debug at scale and really does nothing to directly make your users’ lives better. This is why Apache Kafka provides a stream processing API. This is why we have Kafka Streams.
Kafka Streams is a Java API that gives you easy access to all of the computational primitives of stream processing: filtering, grouping, aggregating, joining, and more, keeping you from having to write framework code on top of the consumer API to do all those things. It also provides support for the potentially large amounts of state that result from stream processing computations. If you’re grouping events in a high-throughput topic by a field with many unique values then computing a rollup over that group every hour, you might need to use a lot of memory.
Indeed, for high-volume topics and complex stream processing topologies, it’s not at all difficult to imagine that you’d need to deploy a cluster of machines sharing the stream processing workload like a regular consumer group would. The Streams API solves both problems by handling all of the distributed state problems for you: It persists state to local disk and to internal topics in the Kafka cluster, and it automatically reassigns state between nodes in a stream processing cluster when adding or removing stream processing nodes to the cluster.
In a typical microservice, stream processing is a thing the application does in addition to other functions. For example, a shipment notification service might combine shipment events with events in a product information changelog containing customer records to produce shipment notification objects, which other services might turn into emails and text messages. But that shipment notification service might also be obligated to expose a REST API for synchronous key lookups by the mobile app or web front end when rendering views that show the status of a given shipment.
The service is reacting to events—and in this case, joining three streams together, and perhaps doing other windowed computations on the joined result—but it is also servicing HTTP requests against its REST endpoint, perhaps using the Spring Framework or Micronaut or some other Java API in common use. Because Kafka Streams is a Java library and not a set of dedicated infrastructure components that do stream processing and only stream processing, it’s trivial to stand up services that use other frameworks to accomplish other ends (like REST endpoints) and sophisticated, scalable, fault-tolerant stream processing.
Here is a code listing that illustrates some concepts from the Streams API. We won’t take the time here to walk through it all, but you might find it helpful just to get a sense of the API’s approach. This code computes the average of a value in a stream (raw-ratings), then joins that average to a table (movies) to produce a new aggregated, enriched topic (rated-movies).
StreamsBuilder builder = new StreamsBuilder(); builder.stream("raw-movies", Consumed.with(Serdes.Long(), Serdes.String())).mapValues(Parser::parseMovie).map((key, movie) -> new KeyValue<>(movie.getMovieId(), movie)).to("movies", Produced.with(Serdes.Long(), movieSerde)); KTable<Long, Movie> movies = builder.table("movies", Materialized.<Long, Movie, KeyValueStore<Bytes, byte>>as("movies-store").withValueSerde(movieSerde).withKeySerde(Serdes.Long())); KStream<Long, String> rawRatings = builder.stream("raw-ratings", Consumed.with(Serdes.Long(), Serdes.String())); KStream<Long, Rating> ratings = rawRatings.mapValues(Parser::parseRating).map((key, rating) -> new KeyValue<>(rating.getMovieId(), rating)); KStream<Long, Double> numericRatings = ratings.mapValues(Rating::getRating); KGroupedStream<Long, Double> ratingsById = numericRatings.groupByKey(); KTable<Long, Long> ratingCounts = ratingsById.count(); KTable<Long, Double> ratingSums = ratingsById.reduce((v1, v2) -> v1 + v2); KTable<Long, Double> ratingAverage = ratingSums.join(ratingCounts, (sum, count) -> sum / count.doubleValue(),Materialized.as("average-ratings")); ratingAverage.toStream().to("average-ratings"); KTable<Long, String> ratedMovies = ratingAverage.join(movies, (avg, movie) -> movie.getTitle() + "=" + avg); ratedMovies.toStream().to("rated-movies", Produced.with(Serdes.Long(), Serdes.String()));
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.