Get Started Free
Wade Waldron

Wade Waldron

Staff Software Practice Lead

Transactional Commits (Hands-On)

Note: This exercise is part of a larger course. You are expected to have completed the previous exercises.

In the previous exercise, we implemented a basic consumer for our topic. One key aspect of that consumer is that it was set up to do automatic commits. As a result, if something failed to process, it may be committed anyway, and the message may be lost.

In this exercise, whenever we process a message, we will extract the heart rates and use them to determine what zone the user was in. These zones are based on a percentage of the user's maximum heart rate (i.e >50% = Zone 1, >60% = Zone 2, etc). If we determine that the user was in a specific zone, then we will emit a new message. Because each incoming message contains multiple heart rates, we could end up producing multiple new messages.

We are going to switch from automatic commits to transactional commits. As we process the messages, we will produce multiple new messages to a separate topic. We want this to happen in a transactional fashion. Either all of the messages get published, or none of them do. We will see how this can be achieved using Kafka transactions.

Stage the Exercises

Stage the code for this exercise by executing:

./exercise.sh stage 13
exercise.bat stage 13

Create a New Topic

We will emit events to a separate topic, so we'll need to define that topic.

  1. In Confluent Cloud, create a new topic named HeartRateZoneReached.
  2. Use the following JSON schema:
    {
      "$schema": "http://json-schema.org/draft-04/schema#",
      "additionalProperties": false,
      "definitions": {
        "HeartRateZone": {
          "description": "",
          "enum": [
            0,
            1,
            2,
            3,
            4,
            5
          ],
          "type": "integer",
          "x-enumNames": [
            "None",
            "Zone1",
            "Zone2",
            "Zone3",
            "Zone4",
            "Zone5"
          ]
        }
      },
      "properties": {
        "DateTime": {
          "format": "date-time",
          "type": "string"
        },
        "DeviceId": {
          "format": "guid",
          "type": "string"
        },
        "HeartRate": {
          "format": "int32",
          "type": "integer"
        },
        "MaxHeartRate": {
          "format": "int32",
          "type": "integer"
        },
        "Zone": {
          "$ref": "#/definitions/HeartRateZone"
        }
      },
      "title": "HeartRateZoneReached",
      "type": "object"
    }

Modify the Configuration

We will need a new configuration for our producer in the HeartRateZoneService.

  1. Open the HeartRateZoneService/appsettings.json file.

  2. Add the following configuration:

    "Producer": {
    	"BootstrapServers": "",
    	"ClientId": "HeartRateZoneService",
    	"TransactionalId": "HeartRateZoneService",
    	"SecurityProtocol": "SaslSsl",
    	"SaslMechanism": "PLAIN",
    	"SaslUsername": "",
    	"SaslPassword": "",
    	"EnableIdempotence": ""
    },
  3. Populate the empty values using the values from the Consumer config.

    • Question: What value should you use for EnableIdempotence?
    • Note: We won't bother changing the MaxInFlight, Retries, or Acks value because the default values are sufficient.
  4. Add a SchemaRegistry section and populate it.

    • Hint: Use the values from the ClientGateway.
  5. [Optional] For completeness, you could go back to the producer configuration in the ClientGateway and set the value for EnableIdempotence.

Implement the Heart Rate Zones

Heart Rate Zones are determined using a percentage of the maximum heart rate. To determine whether a HeartRate falls into one of the zones, we are going to implement some helpers.

  1. In HeartRateZoneService/Domain create a new file named HeartRateExtensions.cs

  2. Add a namespace declaration for HeartRateZoneService.Domain.

  3. Create a new public enum named HeartRateZone with the following values:

    • None
    • Zone1
    • Zone2
    • Zone3
    • Zone4
    • Zone5
  4. Create a new public static class named HeartRateExtensions.

  5. In the HeartRateExtensions class, define a static extension method named GetHeartRateZone that returns a HeartRateZone.

    • The method should take two parameters:
      • The first parameter will be of type HeartRate. We are creating an extension method so it should be defined with the keyword this.
        • I.E. this HeartRate hr
      • The second parameter will be an int representing the maximum heart rate.
    • Implement the method as follows:
      • If HeartRate is less than 50% of max, return HeartRateZone.None.
      • If HeartRate is between 50-59% of max, return HeartRateZone.Zone1.
      • If HeartRate is between 60-69% of max, return HeartRateZone.Zone2.
      • If HeartRate is between 70-79% of max, return HeartRateZone.Zone3.
      • If HeartRate is between 80-89% of max, return HeartRateZone.Zone4.
      • If HeartRate is greater than or equal to 90% of max, return HeartRateZone.Zone5.
    • Note: We could have defined the method directly in the Biometrics class. However, if we eventually decided to share the code from that class between both microservices, or began using code generation tools, this logic would not be part of the biometrics class. Using an extension method lets us keep it separate.

Implement the HeartRateZoneReached Event

Next, let's define the event we will emit when our Heart Rate Zone is reached.

  1. In the HeartRateZoneService/Domain namespace, define a new class named HeartRateZoneReached with the following properties (get only, no set):

    • DeviceId of type Guid.
    • Zone of type HeartRateZone.
    • DateTime of type DateTime.
    • HeartRate of type int.
    • MaxHeartRate of type int.
  2. Define a constructor for the HeartRateZone class to initialize each of the properties.

Create the Producer

We will also need a producer to emit the events.

Hint: You can copy what we did in the ClientGateway with only a few changes.

  1. Open HeartRateZoneService/Program.cs.
  2. Define and register a new ISchemaRegistryClient and its associated config.
    • Hint: We created the config earlier in the exercise.
  3. Define and register a new IProducer<String, HeartRateZoneReached> and its associated config.
    • Hint: We created the config earlier in the exercise.

Update the HeartRateZoneWorker

Next, we need to implement the logic to handle the events and emit new ones. We will do this using Kafka transactions so that either all events are emitted, or none of them are.

  1. Open the HeartRateZoneWorker class.
  2. Define a private field named HeartRateZoneReachedTopicName of type String and assign it a value of HeartRateZoneReached.
  3. Define a private field named DefaultTimeout with a type of TimeSpan. Give it a value of 30 seconds (See TimeSpan.FromSeconds).
  4. Inject an instance of the IProducer<String, HeartZoneReached> as the second parameter of the constructor and assign it to a private field.
  5. Before subscribing to the BiometricsImported topic, call InitTransactions on the producer and pass it the DefaultTimeout.

Re-implement the HandleMessage method

The final step is to re-implement the HandleMessage method to include the logic necessary to emit our new events.

  1. Keep the log message.
  2. Call BeginTransaction on the producer.
  3. Obtain the offsets for the transaction and assign them to a variable:
    • Hint: Offsets can be obtained using:

      	_consumer.Assignment.Select(topicPartition => 
      		new TopicPartitionOffset(
      			topicPartition,
      			_consumer.Position(topicPartition)
      		)
      	);
  4. Call SendOffsetsToTransaction on the producer.
    • Pass it the following:
      • The offsets (see above).
      • The _consumer.ConsumerGroupMetadata.
      • The DefaultTimeout.
  5. Process the heart rates in a loop.
    • If the heart rate is in one of the zones, construct a new Message

      • Use the DeviceId for the Key

      • Create a new HeartRateZoneReached for the Value.

      • Hint: The Select and Where methods may be of benefit here:

        var newCollection = collection
        	.Where(predicate)
        	.Select(transformationFunction)
    • Send each message to the HeartRateZoneReached topic using ProduceAsync(topic, message, cancellationToken).

    • When all messages have been produced, call CommitTransaction on the producer.

    • If any messages fail, call AbortTransaction on the producer and throw a new Exception.

    • Hint: Be careful when producing multiple asynchronous tasks. Have a look at Task.WhenAll as a way of waiting for them all to complete.

      await Task.WhenAll(collectionOfTasks);

Run the Tests

We should now be ready to run our tests.

dotnet test Fitness.sln

Note: The tests make certain assumptions about parameter order and naming. Adjust your code accordingly.

Run the Application

Run the application in two terminals.

In the first terminal execute the following:

cd ClientGateway
dotnet run

In the second terminal execute the following:

cd HeartRateZoneService
dotnet run

Try It Out

Now we can try interacting with the application.

  1. In a browser, navigate to the HeartRateZoneReached topic and open the Messages tab.
  2. In another browser tab, visit the Swagger page at http://localhost:5000/swagger/index.html.
  3. Use the Biometrics endpoint to send the following message to the ClientGateway.
    {
      "deviceId": "3fa85f64-5717-4562-b3fc-2c963f66afa6",
      "heartRates": [
        {
          "dateTime": "2022-12-02T13:50:00.000Z",
          "value": 90
        },
       {
          "dateTime": "2022-12-02T13:51:00.000Z",
          "value": 110
        },
       {
          "dateTime": "2022-12-02T13:52:00.000Z",
          "value": 130
        },
       {
          "dateTime": "2022-12-02T13:53:00.000Z",
          "value": 150
        },
       {
          "dateTime": "2022-12-02T13:54:00.000Z",
          "value": 170
        },
       {
          "dateTime": "2022-12-02T13:55:00.000Z",
          "value": 190
        }
      ],
      "stepCounts": [
        {
          "dateTime": "2022-12-02T13:50:00.000Z",
          "value": 200
        }
      ],
      "maxHeartRate": 200
    }
  4. Wait a few moments and then check that your messages were received by the HeartRateZoneReached topic. Using the data above, you should see 5 messages.

Our Heart Rate Zone Service is now producing messages in a transactional manner.

Finish

This brings us to the end of this exercise.

Use the promo code DOTNETKAFKA101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.