Get Started Free
course: Practical Event Modeling

Implementing an Event Model on the Streaming Data Platform (Exercise)

Bobby Calderwood

Bobby Calderwood

Founder EvidentStack.com

Implementing an Event Model on the Streaming Data Platform (Exercise)

Overview

In this exercise, we'll turn our model (png | json) into code to run our system on the Streaming Data Platform. The Event Model will guide our creation of Domain Transfer Objects, Domain Types, and our Decide and Evolve functions. Finally, we'll wire these code components into the Kafka Producer and Streams APIs.

Event Model Diagram

Completing this step will require you to clone the GitHub repository and run each step using your favorite Java/Kotlin IDE or using Gradle from the command line. More details are available in this step’s README.

Step 1

In this step, we'll define our transfer types using Protocol Buffers. The build system is configured to compile src/main/proto/*.proto files into Java classes and Kotlin DSL functions. In this step, implement the Vehicle transfer types in src/main/proto/vehicles.proto To see what needs to be done, run the (initially-failing) JUnit5 tests: Via the command-line ../gradlew test or via your IDE's test runner. Then implement until the tests turn green!

Step 2

In this step, we'll define our domain types. Our transfer types can be a useful guide here, but beware of status fields and enums. Those are fine for transfer types, but we'll want separate domain types for each case. This will allow the compiler to do much of our work for us, reducing conditionals and testable cases significantly. Check out the excellent book Domain Modeling Made Functional for much more on this subject. We'll also implement the conversion logic to turn transfer types into domain types, and vice versa. In this step, implement the missing domain types in domain/rides.kt, as well as the missing conversion logic in transfer/conversion.kt. To see what needs to be done, run the (initially-failing) JUnit5 tests:

Via the command-line ../gradlew test or via your IDE's test runner. Then implement until the tests turn green!

Step 3

In this step, we'll define our domain functions. We'll start by implementing interface methods for decide and evolve: Command.decide(state: ReadModel) and ReadModel.evolve(event: Event). Then, we'll encapsulate these methods inside pure functions implemented in terms of our transfer types. This will allow the adapters to invoke these domain functions without having to dig down into the domain types. In this step, implement the missing decide and evolve interface methods in domain/rides.kt and domain/vehicles.kt. Then, complete the domain function implementations in domain_functions.kt. To see what needs to be done, run the (initially-failing) JUnit5 tests: Via the command-line ../gradlew test or via your IDE's test runner. Then implement until the tests turn green!

Step 4

In this final step, we'll adapt our domain functions to the Streaming Data Platform.

We'll begin by implementing a helper function in the HTTP adapter (avoiding the actual HTTP stuff) whose job it is to apply the Decide function and then record its results to the proper Event topic.

We'll then adapt the Evolve function to a Kafka Streams topology using

`KGroupedStream.aggregate`

In this step, implement the missing adapters in

[`adapters/http.kt`](./src/main/kotlin/io/confluent/examples/autonomo/adapters/http.kt) and [`adapters/kafka.kt`](./src/main/kotlin/io/confluent/examples/autonomo/adapters/kafka.kt)

To see what needs to be done, run the (initially-failing) JUnit5 tests:

bash
# Via the command-line
../gradlew test

or via your IDE's test runner. Then implement until the tests turn green!

Running the Final App

The final app code (no peeking!) is in final/. The app must be configured to run with a Kafka cluster, and you can do this in one of two ways.

Running with Confluent Cloud

To run with a Confluent Cloud Kafka cluster, first sign up for a Confluent Cloud account. Then create a cluster and download the configuration file. Export the following environment variables before running the app: export KAFKA_BOOTSTRAP_SERVERS=[from config file] export KAFKA_SECURITY_PROTOCOL=SASL_SSL export KAFKA_SASL_JAAS_CONFIG='org.apache.kafka.common.security.plain.PlainLoginModule required username="[from config file]" password="[from config file]";' export KAFKA_SASL_MECHANISM=PLAIN export KAFKA_SCHEMA_REGISTRY_URL=[confluent cloud schema registry] ./gradlew :final:run

Running with local Kafka via Docker Stack

Run locally using: docker-compose up -d ./gradlew :final:run To clean up the docker-compose stack that runs our Kafka and Schema Registry: docker-compose down

Use the promo code EVNTMODELING101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.