Staff Software Practice Lead
Note: This exercise is part of a larger course. You are expected to have completed the previous exercises.
In the previous exercise, we implemented a basic consumer for our topic. One key aspect of that consumer is that it was set up to do automatic commits. As a result, if something failed to process, it may be committed anyway, and the message may be lost.
In this exercise, whenever we process a message, we will extract the heart rates and use them to determine what zone the user was in. These zones are based on a percentage of the user's maximum heart rate (i.e >50% = Zone 1, >60% = Zone 2, etc). If we determine that the user was in a specific zone, then we will emit a new message. Because each incoming message contains multiple heart rates, we could end up producing multiple new messages.
We are going to switch from automatic commits to transactional commits. As we process the messages, we will produce multiple new messages to a separate topic. We want this to happen in a transactional fashion. Either all of the messages get published, or none of them do. We will see how this can be achieved using Kafka transactions.
Stage the code for this exercise by executing:
./exercise.sh stage 13
exercise.bat stage 13
We will emit events to a separate topic, so we'll need to define that topic.
HeartRateZoneReached
.{
"$schema": "http://json-schema.org/draft-04/schema#",
"additionalProperties": false,
"definitions": {
"HeartRateZone": {
"description": "",
"enum": [
0,
1,
2,
3,
4,
5
],
"type": "integer",
"x-enumNames": [
"None",
"Zone1",
"Zone2",
"Zone3",
"Zone4",
"Zone5"
]
}
},
"properties": {
"DateTime": {
"format": "date-time",
"type": "string"
},
"DeviceId": {
"format": "guid",
"type": "string"
},
"HeartRate": {
"format": "int32",
"type": "integer"
},
"MaxHeartRate": {
"format": "int32",
"type": "integer"
},
"Zone": {
"$ref": "#/definitions/HeartRateZone"
}
},
"title": "HeartRateZoneReached",
"type": "object"
}
We will need a new configuration for our producer in the HeartRateZoneService
.
Open the HeartRateZoneService/appsettings.json
file.
Add the following configuration:
"Producer": {
"BootstrapServers": "",
"ClientId": "HeartRateZoneService",
"TransactionalId": "HeartRateZoneService",
"SecurityProtocol": "SaslSsl",
"SaslMechanism": "PLAIN",
"SaslUsername": "",
"SaslPassword": "",
"EnableIdempotence": ""
},
Populate the empty values using the values from the Consumer
config.
EnableIdempotence
?MaxInFlight
, Retries
, or Acks
value because the default values are sufficient.Add a SchemaRegistry
section and populate it.
ClientGateway
.[Optional] For completeness, you could go back to the producer configuration in the ClientGateway
and set the value for EnableIdempotence
.
Heart Rate Zones are determined using a percentage of the maximum heart rate. To determine whether a HeartRate
falls into one of the zones, we are going to implement some helpers.
In HeartRateZoneService/Domain
create a new file named HeartRateExtensions.cs
Add a namespace declaration for HeartRateZoneService.Domain
.
Create a new public enum
named HeartRateZone
with the following values:
Create a new public static class
named HeartRateExtensions
.
In the HeartRateExtensions
class, define a static
extension method named GetHeartRateZone
that returns a HeartRateZone
.
HeartRate
. We are creating an extension method so it should be defined with the keyword this
.
this HeartRate hr
int
representing the maximum heart rate.HeartRateZone.None
.HeartRateZone.Zone1
.HeartRateZone.Zone2
.HeartRateZone.Zone3
.HeartRateZone.Zone4
.HeartRateZone.Zone5
.Next, let's define the event we will emit when our Heart Rate Zone is reached.
In the HeartRateZoneService/Domain
namespace, define a new class named HeartRateZoneReached
with the following properties (get only, no set):
DeviceId
of type Guid
.Zone
of type HeartRateZone
.DateTime
of type DateTime
.HeartRate
of type int
.MaxHeartRate
of type int
.Define a constructor for the HeartRateZone
class to initialize each of the properties.
We will also need a producer to emit the events.
Hint: You can copy what we did in the ClientGateway
with only a few changes.
HeartRateZoneService/Program.cs
.ISchemaRegistryClient
and its associated config.
IProducer<String, HeartRateZoneReached>
and its associated config.
Next, we need to implement the logic to handle the events and emit new ones. We will do this using Kafka transactions so that either all events are emitted, or none of them are.
HeartRateZoneWorker
class.HeartRateZoneReachedTopicName
of type String
and assign it a value of HeartRateZoneReached
.DefaultTimeout
with a type of TimeSpan
. Give it a value of 30 seconds (See TimeSpan.FromSeconds
).IProducer<String, HeartZoneReached>
as the second parameter of the constructor and assign it to a private field.BiometricsImported
topic, call InitTransactions
on the producer and pass it the DefaultTimeout
.The final step is to re-implement the HandleMessage
method to include the logic necessary to emit our new events.
BeginTransaction
on the producer.Hint: Offsets can be obtained using:
_consumer.Assignment.Select(topicPartition =>
new TopicPartitionOffset(
topicPartition,
_consumer.Position(topicPartition)
)
);
SendOffsetsToTransaction
on the producer.
_consumer.ConsumerGroupMetadata
.DefaultTimeout
.If the heart rate is in one of the zones, construct a new Message
Use the DeviceId
for the Key
Create a new HeartRateZoneReached
for the Value
.
Hint: The Select
and Where
methods may be of benefit here:
var newCollection = collection
.Where(predicate)
.Select(transformationFunction)
Send each message to the HeartRateZoneReached
topic using ProduceAsync(topic, message, cancellationToken)
.
When all messages have been produced, call CommitTransaction
on the producer.
If any messages fail, call AbortTransaction
on the producer and throw a new Exception.
Hint: Be careful when producing multiple asynchronous tasks. Have a look at Task.WhenAll
as a way of waiting for them all to complete.
await Task.WhenAll(collectionOfTasks);
We should now be ready to run our tests.
dotnet test Fitness.sln
Note: The tests make certain assumptions about parameter order and naming. Adjust your code accordingly.
Run the application in two terminals.
In the first terminal execute the following:
cd ClientGateway
dotnet run
In the second terminal execute the following:
cd HeartRateZoneService
dotnet run
Now we can try interacting with the application.
HeartRateZoneReached
topic and open the Messages tab.Biometrics
endpoint to send the following message to the ClientGateway
.
{
"deviceId": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
"heartRates": [
{
"dateTime": "2022-12-02T13:50:00.000Z",
"value": 90
},
{
"dateTime": "2022-12-02T13:51:00.000Z",
"value": 110
},
{
"dateTime": "2022-12-02T13:52:00.000Z",
"value": 130
},
{
"dateTime": "2022-12-02T13:53:00.000Z",
"value": 150
},
{
"dateTime": "2022-12-02T13:54:00.000Z",
"value": 170
},
{
"dateTime": "2022-12-02T13:55:00.000Z",
"value": 190
}
],
"stepCounts": [
{
"dateTime": "2022-12-02T13:50:00.000Z",
"value": 200
}
],
"maxHeartRate": 200
}
HeartRateZoneReached
topic. Using the data above, you should see 5 messages.Our Heart Rate Zone Service is now producing messages in a transactional manner.
This brings us to the end of this exercise.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.