Staff Software Practice Lead
Note: This exercise is part of a larger course. You are expected to have completed the previous exercises.
In the previous exercise, we implemented a basic consumer for our topic. One key aspect of that consumer is that it was set up to do automatic commits. As a result, if something failed to process, it may be committed anyway, and the message may be lost.
In this exercise, whenever we process a message, we will extract the heart rates and use them to determine what zone the user was in. These zones are based on a percentage of the user's maximum heart rate (i.e >50% = Zone 1, >60% = Zone 2, etc). If we determine that the user was in a specific zone, then we will emit a new message. Because each incoming message contains multiple heart rates, we could end up producing multiple new messages.
We are going to switch from automatic commits to transactional commits. As we process the messages, we will produce multiple new messages to a separate topic. We want this to happen in a transactional fashion. Either all of the messages get published, or none of them do. We will see how this can be achieved using Kafka transactions.
Stage the code for this exercise by executing:
./exercise.sh stage 13
exercise.bat stage 13
We will emit events to a separate topic, so we'll need to define that topic.
{
"$schema": "http://json-schema.org/draft-04/schema#",
"additionalProperties": false,
"definitions": {
"HeartRateZone": {
"description": "",
"enum": [
0,
1,
2,
3,
4,
5
],
"type": "integer",
"x-enumNames": [
"None",
"Zone1",
"Zone2",
"Zone3",
"Zone4",
"Zone5"
]
}
},
"properties": {
"DateTime": {
"format": "date-time",
"type": "string"
},
"DeviceId": {
"format": "guid",
"type": "string"
},
"HeartRate": {
"format": "int32",
"type": "integer"
},
"MaxHeartRate": {
"format": "int32",
"type": "integer"
},
"Zone": {
"$ref": "#/definitions/HeartRateZone"
}
},
"title": "HeartRateZoneReached",
"type": "object"
}
We will need a new configuration for our producer in the HeartRateZoneService.
Open the HeartRateZoneService/appsettings.json file.
Add the following configuration:
"Producer": {
"BootstrapServers": "",
"ClientId": "HeartRateZoneService",
"TransactionalId": "HeartRateZoneService",
"SecurityProtocol": "SaslSsl",
"SaslMechanism": "PLAIN",
"SaslUsername": "",
"SaslPassword": "",
"EnableIdempotence": ""
},
Populate the empty values using the values from the Consumer config.
Add a SchemaRegistry section and populate it.
[Optional] For completeness, you could go back to the producer configuration in the ClientGateway and set the value for EnableIdempotence.
Heart Rate Zones are determined using a percentage of the maximum heart rate. To determine whether a HeartRate falls into one of the zones, we are going to implement some helpers.
In HeartRateZoneService/Domain create a new file named HeartRateExtensions.cs
Add a namespace declaration for HeartRateZoneService.Domain.
Create a new public enum named HeartRateZone with the following values:
Create a new public static class named HeartRateExtensions.
In the HeartRateExtensions class, define a static extension method named GetHeartRateZone that returns a HeartRateZone.
Next, let's define the event we will emit when our Heart Rate Zone is reached.
In the HeartRateZoneService/Domain namespace, define a new class named HeartRateZoneReached with the following properties (get only, no set):
Define a constructor for the HeartRateZone class to initialize each of the properties.
We will also need a producer to emit the events.
Hint: You can copy what we did in the ClientGateway with only a few changes.
Next, we need to implement the logic to handle the events and emit new ones. We will do this using Kafka transactions so that either all events are emitted, or none of them are.
The final step is to re-implement the HandleMessage method to include the logic necessary to emit our new events.
Hint: Offsets can be obtained using:
_consumer.Assignment.Select(topicPartition =>
new TopicPartitionOffset(
topicPartition,
_consumer.Position(topicPartition)
)
);
If the heart rate is in one of the zones, construct a new Message
Use the DeviceId for the Key
Create a new HeartRateZoneReached for the Value.
Hint: The Select and Where methods may be of benefit here:
var newCollection = collection
.Where(predicate)
.Select(transformationFunction)
Send each message to the HeartRateZoneReached topic using ProduceAsync(topic, message, cancellationToken).
When all messages have been produced, call CommitTransaction on the producer.
If any messages fail, call AbortTransaction on the producer and throw a new Exception.
Hint: Be careful when producing multiple asynchronous tasks. Have a look at Task.WhenAll as a way of waiting for them all to complete.
await Task.WhenAll(collectionOfTasks);
We should now be ready to run our tests.
dotnet test Fitness.sln
Note: The tests make certain assumptions about parameter order and naming. Adjust your code accordingly.
Run the application in two terminals.
In the first terminal execute the following:
cd ClientGateway
dotnet run
In the second terminal execute the following:
cd HeartRateZoneService
dotnet run
Now we can try interacting with the application.
{
"deviceId": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"heartRates": [
{
"dateTime": "2022-12-02T13:50:00.000Z",
"value": 90
},
{
"dateTime": "2022-12-02T13:51:00.000Z",
"value": 110
},
{
"dateTime": "2022-12-02T13:52:00.000Z",
"value": 130
},
{
"dateTime": "2022-12-02T13:53:00.000Z",
"value": 150
},
{
"dateTime": "2022-12-02T13:54:00.000Z",
"value": 170
},
{
"dateTime": "2022-12-02T13:55:00.000Z",
"value": 190
}
],
"stepCounts": [
{
"dateTime": "2022-12-02T13:50:00.000Z",
"value": 200
}
],
"maxHeartRate": 200
}
Our Heart Rate Zone Service is now producing messages in a transactional manner.
This brings us to the end of this exercise.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.