Staff Software Practice Lead
Note: This exercise is part of a larger course. You are expected to have completed the previous exercises.
In the previous exercise, we updated our REST endpoint to publish data to Kafka using a specific schema. This schema provides us with a guarantee that the data in Kafka will match a specific format.
In this exercise, we will create a consumer for the data. We will pull the messages from Kafka, log some details, and move on. In later exercises, we'll add more complex logic.
We will be working on a new microservice known as the Heart Rate Zone Service. Its eventual job is to determine whether a user's heart rate was in one of 5 zones. If it was, then we'll emit an event to record the zone. However, for now, we are going to start with a skeleton implementation.
Stage the code for this exercise by executing:
./exercise.sh stage 11
exercise.bat stage 11
We will start by modifying the configuration for our HeartRateZoneService
.
Open the HeartRateZoneService/appsettings.json
file.
Add the following configuration at the root level:
"Consumer": {
"BootstrapServers": "",
"ClientId": "HeartRateZoneService",
"GroupId": "HeartRateZoneService",
"SecurityProtocol": "SaslSsl",
"SaslMechanism": "PLAIN",
"SaslUsername": "",
"SaslPassword": "",
"AutoOffsetReset": "Earliest",
"EnableAutoCommit": "true"
},
ClientGateway
.Note: We named the configuration section Consumer
because later we'll add a secondary Kafka configuration for a Producer
.
Next, we need to define the classes that represent the messages we will consume. These are nearly identical to the Biometrics
we defined in the previous exercise.
HeartRateZoneService
called Domain
.HeartRate
, and Biometrics
classes into the Domain
namespace in the HeartRateZoneService
.StepCounts
property from the Biometrics
. We won't be using it.HeartRateZoneService.Domain
.Note: There are different schools of thought on whether to share code using libraries or to copy code as we have done here. Both options have advantages and disadvantages. For our use, keeping them separate allows us to simplify the model in our HeartRateZoneService
by eliminating the StepCounts
. This reduces the amount of data coupling between our services. However, for production projects, you will want to evaluate whether it is better to share, or copy.
Next, we are going to create the consumer.
HeartRateZoneService/Program.cs
file.using
directive for the HeartRateZoneService.Domain
namespace.ConfigureServices
method. Make your changes inside this function (above the services.AddHostedService<HeartRateZoneWorker>();
call).ConsumerConfig
.
ProducerConfig
in the ClientGateway
except:
services
where you would have used builder.Services
.hostContext.Configuration
where you would have used builder.Configuration
.IConsumer<String, Biometrics>
(using services.AddSingleton
).
ConsumerConfig
.ConsumerBuilder<String, Biometrics>
.SetValueDeserializer
and pass it a new JsonDeserializer<Biometrics>().AsSyncOverAsync()
Build
to build the consumer and return it.Our consumer will be used inside the HeartRateZoneWorker
.
Open the HeartRateZoneService/Workers/HeartRateZoneWorker.cs
file.
Add a using
directive for the HeartRateZoneService.Domain
namespace.
Add a private constant field of type String
named BiometricsImportedTopicName
and assign it a value of BiometricsImported
.
Add a private field of type IConsumer<String, Biometrics>
named _consumer
.
Update the constructor to accept an IConsumer
as the first parameter and initialize the _consumer
.
Define a new protected
virtual
async
method named HandleMessage
.
Task
Biometrics
and CancellationToken
.Implement HandleMessage
as follows:
"Message Received: " + biometrics.DeviceId
.Task.CompletedTask
.Implement the ExecuteAsync
method as follows:
_consumer
to Subscribe
to the topic.stoppingToken
hasn't been canceled (IsCancellationRequested
) repeat the following:
Consume
on the _consumer
with the CancellationToken
and store the result.HandleMessage
, pass it the Biometrics
from the message (message.Message.Value
), and await the result.Close
the _consumer
.We should now be ready to run our tests.
dotnet test Fitness.sln
Note: The tests make certain assumptions about parameter order and naming. Adjust your code accordingly.
Now that we have two microservices we need to run both. You'll need to open two separate terminals.
In the first terminal execute the following:
cd ClientGateway
dotnet run
In the second terminal execute the following:
cd HeartRateZoneService
dotnet run
Now we can try interacting with the application.
HeartRateZoneService
received all of the messages that had previously been placed in the topic. Look for the log messages.Biometrics
endpoint to send a message to the ClientGateway
.HeartRateZoneService
.At this point, we have two microservices that can communicate asynchronously through Kafka. Our next step is to do something more complex with the consumer.
This brings us to the end of this exercise.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.