Get Started Free
course: Apache Kafka® for .NET Developers

How to Consume Messages from a Kafka Topic (Hands-On)

Wade Waldron

Wade Waldron

Staff Software Practice Lead

How to Consume Messages from a Kafka Topic (Hands-On)

Note: This exercise is part of a larger course. You are expected to have completed the previous exercises.

In the previous exercise, we updated our REST endpoint to publish data to Kafka using a specific schema. This schema provides us with a guarantee that the data in Kafka will match a specific format.

In this exercise, we will create a consumer for the data. We will pull the messages from Kafka, log some details, and move on. In later exercises, we'll add more complex logic.

We will be working on a new microservice known as the Heart Rate Zone Service. Its eventual job is to determine whether a user's heart rate was in one of 5 zones. If it was, then we'll emit an event to record the zone. However, for now, we are going to start with a skeleton implementation.

Stage the Exercises

Stage the code for this exercise by executing:

./exercise.sh stage 11
exercise.bat stage 11

Modify the Configuration

We will start by modifying the configuration for our HeartRateZoneService.

  1. Open the HeartRateZoneService/appsettings.json file.

  2. Add the following configuration at the root level:

    "Consumer": {
      "BootstrapServers": "",
      "ClientId": "HeartRateZoneService",
      "GroupId": "HeartRateZoneService",
      "SecurityProtocol": "SaslSsl",
      "SaslMechanism": "PLAIN",
      "SaslUsername": "",
      "SaslPassword": "",
      "AutoOffsetReset": "Earliest",
      "EnableAutoCommit": "true"
    },
    • Hint Populate the blank fields by copying their values from the ClientGateway.

Note: We named the configuration section Consumer because later we'll add a secondary Kafka configuration for a Producer.

Copy the Biometrics

Next, we need to define the classes that represent the messages we will consume. These are nearly identical to the Biometrics we defined in the previous exercise.

  1. Create a namespace in the HeartRateZoneService called Domain.
  2. Copy the HeartRate, and Biometrics classes into the Domain namespace in the HeartRateZoneService.
  3. Remove the StepCounts property from the Biometrics. We won't be using it.
  4. Double-check that the namespace is HeartRateZoneService.Domain.

Note: There are different schools of thought on whether to share code using libraries or to copy code as we have done here. Both options have advantages and disadvantages. For our use, keeping them separate allows us to simplify the model in our HeartRateZoneService by eliminating the StepCounts. This reduces the amount of data coupling between our services. However, for production projects, you will want to evaluate whether it is better to share, or copy.

Create the Consumer

Next, we are going to create the consumer.

  1. Open the HeartRateZoneService/Program.cs file.
  2. Add a using directive for the HeartRateZoneService.Domain namespace.
  3. Locate the lambda function passed to the ConfigureServices method. Make your changes inside this function (above the services.AddHostedService<HeartRateZoneWorker>(); call).
  4. Load a ConsumerConfig.
    • Note: This is done similarly to loading a ProducerConfig in the ClientGateway except:
      • Use services where you would have used builder.Services.
      • Use hostContext.Configuration where you would have used builder.Configuration.
    • Hint: What is the name of the section we are loading?
  5. Register an instance of an IConsumer<String, Biometrics> (using services.AddSingleton).
    • Retrieve the ConsumerConfig.
    • Create a new ConsumerBuilder<String, Biometrics>.
    • Call SetValueDeserializer and pass it a new JsonDeserializer<Biometrics>().AsSyncOverAsync()
    • Call Build to build the consumer and return it.

Update the Worker

Our consumer will be used inside the HeartRateZoneWorker.

  1. Open the HeartRateZoneService/Workers/HeartRateZoneWorker.cs file.

  2. Add a using directive for the HeartRateZoneService.Domain namespace.

  3. Add a private constant field of type String named BiometricsImportedTopicName and assign it a value of BiometricsImported.

  4. Add a private field of type IConsumer<String, Biometrics> named _consumer.

  5. Update the constructor to accept an IConsumer as the first parameter and initialize the _consumer.

  6. Define a new protected virtual async method named HandleMessage.

    • It should return a Task
    • It should accept two parameters of types Biometrics and CancellationToken.
  7. Implement HandleMessage as follows:

    • Log a message at info: "Message Received: " + biometrics.DeviceId.
    • Await Task.CompletedTask.
    • Note: This is a skeleton implementation that doesn't do much. Don't worry, we'll expand on it in a future exercise.
    • Note: We are marking this as virtual because it will be overridden in test classes.
  8. Implement the ExecuteAsync method as follows:

    • Use the _consumer to Subscribe to the topic.
    • While the stoppingToken hasn't been canceled (IsCancellationRequested) repeat the following:
      • Call Consume on the _consumer with the CancellationToken and store the result.
      • Call HandleMessage, pass it the Biometrics from the message (message.Message.Value), and await the result.
    • Once the loop has completed, Close the _consumer.

Run the Tests

We should now be ready to run our tests.

dotnet test Fitness.sln

Note: The tests make certain assumptions about parameter order and naming. Adjust your code accordingly.

Run the Application

Now that we have two microservices we need to run both. You'll need to open two separate terminals.

In the first terminal execute the following:

cd ClientGateway
dotnet run

In the second terminal execute the following:

cd HeartRateZoneService
dotnet run

Try It Out

Now we can try interacting with the application.

  1. Check that the HeartRateZoneService received all of the messages that had previously been placed in the topic. Look for the log messages.
  2. In a browser window, visit the Swagger page at http://localhost:5000/swagger/index.html.
  3. Use the Biometrics endpoint to send a message to the ClientGateway.
  4. Wait a few moments and then check that your new message was received by the HeartRateZoneService.

At this point, we have two microservices that can communicate asynchronously through Kafka. Our next step is to do something more complex with the consumer.

Finish

This brings us to the end of this exercise.

Use the promo code DOTNETKAFKA101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.