Get Started Free
‹ Back to courses
course: Practical Event Modeling

Implementing Event-modeled Systems on the Streaming Data Platform

6 min
Bobby Calderwood

Bobby Calderwood


Implementing Event-modeled Systems using Apache Kafka


Our core business domain logic derived from our Event Model has been built. Now we’re ready to build our application by plugging that domain logic into the Streaming Data Platform’s infrastructure and APIs.

Use the promo code EVNTMODELING101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.

Implementing Event-modeled Systems on the Streaming Data Platform

In the last module, we built the core business domain logic derived from our event model. Now we're ready to build our application by plugging that domain logic into the streaming data platform's infrastructure and APIs. The transfer types, along with the Decide, Evolve, and React functions that we outlined in the previous module comprise the contract with the core domain of our system. Now we can use the Confluent streaming data platform to implement the application and infrastructure layers of our system. We won't cover how to expose the Command and Read Model APIs as web services in this course, as there are wealth of tutorials and frameworks online for exposing web service APIs in a variety of styles. Instead, we'll assume the arrival of inbound commands and Read Model queries via some synchronous channel, and will respond with command results and Read Models on the same channel. The first touchpoint between the code we've already built and the streaming data platform is registering the transfer type schemas we defined to convey our events and Read Models with the Confluent Schema Registry. We don't need to worry about registering command transfer types in the Schema Registry since commands are not published to Kafka. However, we will be publishing our events to the various stream topics, as well as publishing our Read Models to the compacted Read Model topics. Registering our Event and Read Model transfer type schemas will ensure that valid schemas are available to downstream consumers, and that we'll be alerted to breaking schema changes if we evolve our schemas improperly. In order to support the various events that comprise our streams, we'll need to configure the Registry with a schema resolution strategy that supports multiple schema types per topic. We represent our streams using a single Kafka topic with multiple schema types, rather than multiple topics with a single type, so that we can preserve the order on our event log. On the other hand, Read Model topics will often only require one schema type per topic. Kafka's serialization, derealization, or SerDes system, integrates with the Schema Registry to automatically produce and consume our transfer types at the boundaries of our program. As a bonus, if our command and query web service APIs use Protobuf or Avro, we could also use those transfer type schemas to define our web service interface with GRPC or a similar framework. Our first connection between our core domain and the streaming data platform is implementing our state change slices using a Kafka producer and the Decide function. For example, here we have a code snippet representing the handling of the requestRide command. First, we can configure our producer with the appropriate serializer for our Event transfer types that we just registered in the Schema Registry. In this snippet, a correctly configured producer is provided to our web service endpoint method by our web framework. We then wire up our command web service endpoint, such that when we receive an inbound command request as a transfer type, the application looks up the current state of the ride from our Kafka Streams application, or in this case provide the InitialRideState, and passes both the current state and the command to the Decide function. If the Decide function returns an error, the application responds to the command web service request with that error. On success, the Decide function returns a sequence of events, which the application publishes in order to the proper event stream topic using our configured Kafka producer. The application then responds to the command web service request with a success message. Our next touchpoint between the core domain and our streaming data platform application is implementing our state view slices using the Kafka Streams API and our Evolve function. For example, here we have a snippet representing evolving our ride Read Model based on events arriving on the ride stream. First, we configure Kafka Streams to read and deserialize from our ride event stream input topic. We then build a simple Kafka Streams topology, where we use the KStreams groupByKey on our ride ID and KStreams aggregate methods to adapt our Evolve function. The KStreams aggregate method has essentially the same signature as the Evolve function, so this adaptation is a natural match. The Kafka Streams runtime does most of the heavy lifting for us here, and we simply handle the values it provides to the KStreams aggregate method to generate the next value of the ride's Read Model. The Kafka Streams runtime then produces the properly serialized Read Model value to the compacted Read Model output topic for rides, as well as storing the new Read Model value in local storage. The Read Model API can use Kafka Stream's Interactive Queries capability to look up the latest value of the Read Model. Next, we can use the same Kafka Streams topology to wire up our Saga reactions in order to translate certain ride events to the vehicle's event stream. Starting with the rideEventStream, we pass each event in the rideEventStream to our React function, and then flatMap the resulting list of commands. For example, when we observe the rider dropped off event, React would return a single element list containing the mark vehicle unoccupied command. Then we join that stream of commands with our vehicle's KTable in order to fetch the current state of the affected vehicle for use in our Decide function. Just as what the user initiated command decision, if the Decide function returns a successful result, we flatMap those events onto our vehicle's event topic. Otherwise, we record an error event to the vehicle's event topic for follow up and analysis. Kafka Streams has high level API methods that map naturally both to evolving state and to translating or importing events between streams. Kafka Streams also supports Interactive Queries of its data stores, which is exactly what we'll need in order to service query requests against our models. We've already configured the Kafka Streams state store in our topology, so here we can access that store by name and fetch a ride by its ID. One great benefit of structuring our implementation this way is testability. We can test our purely functional domain in total isolation from any complex infrastructure concerns using a familiar Given-When-Then testing format. We test the decide function given an initial remodel value. When a command arrives, then a sequence of events is returned. We test the Evolve function similarly. Given an initial model value, when an event arrives, then a new model value is returned with certain checkable properties. We can test the infrastructure separately using the kafka-streams-test-utils Library to validate the transfer types produced by the Decide state change, and then feed the resulting events into a simulated input topic of a topology test driver and check its output Read Models. Finally, we can wire the whole application together using an in-process single-node Kafka cluster or a containerized Kafka cluster. Doing so will allow us to test the Command and Read Model APIs from the outside and read the event stream output to validate correct system behavior. We'll show an example of this in our hands-on exercise Module Number 11. We began this module with a core business domain and transfer type schemas derived from our event model, and we saw how to adapt them to the streaming data platform using the Schema Registry and various Kafka APIs in order to implement our application. Our next module is a hands-on exercise to put these ideas into practice within a real code base. If you aren't already on Confluent developer, head there now using the link in the video description to access other courses, hands-on exercises, and many other resources for continuing your learning journey.