Get Started Free
Wade Waldron

Wade Waldron

Staff Software Practice Lead

Schemas & Serialization (Hands-On)

Note: This exercise is part of a larger course. You are expected to have completed the previous exercises.

In the previous exercise, we created a REST endpoint that accepted data from a fitness tracker. We pushed that data directly to Kafka with no validation.

The lack of validation creates potential issues, so in this exercise, we will introduce basic validation by converting the strings to a known object format. We will serialize the object and validate it against a schema. This will ensure that all data being pushed into Kafka matches the expected schema.

Stage the Exercises

Stage the code for this exercise by executing:

./exercise.sh stage 09
exercise.bat stage 09

Create a New Topic

Previously, we pushed data to a topic named RawBiometricsImported. Because the data wasn't validated, we can't guarantee its integrity.

Let's create a new topic where we can guarantee the validity of the data.

  1. In the Confluent Cloud Console create a new topic named BiometricsImported (use the default partitions).

  2. Define a data contract by choosing to Create a schema for message values.

  3. Add a JSON Schema using the following definition:

    {
      "$schema": "http://json-schema.org/draft-07/schema#",
      "title": "Biometrics",
      "description": "Biometrics collected from the fitness tracker.",
      "type": "object",
      "additionalProperties": false,
      "definitions": {
        "HeartRate": {
          "additionalProperties": false,
          "properties": {
            "DateTime": {
              "format": "date-time",
              "type": "string"
            },
            "Value": {
              "format": "int32",
              "type": "integer"
            }
          },
          "type": "object"
        },
        "StepCount": {
          "additionalProperties": false,
          "properties": {
            "DateTime": {
              "format": "date-time",
              "type": "string"
            },
            "Value": {
              "format": "int32",
              "type": "integer"
            }
          },
          "type": "object"
        }
      },
      "properties": {
        "DeviceId": {
          "format": "guid",
          "type": "string"
        },
        "HeartRates": {
          "items": {
            "$ref": "#/definitions/HeartRate"
          },
          "type": "array"
        },
        "StepCounts": {
          "items": {
            "$ref": "#/definitions/StepCount"
          },
          "type": "array"
        },
        "MaxHeartRate": {
          "format": "int32",
          "type": "integer"
        }
      }
    }

Define a new Biometrics Class

This schema will be mapped to classes in C#. Let's define them.

  1. In the ClientGateway project, create a folder named Domain.
  2. In the Domain folder create a new file named Biometrics.cs.
  3. In this file define the namespace ClientGateway.Domain.
  4. Add three public classes named HeartRate, StepCount, Biometrics.
  5. Define the HeartRate class as follows:
    • Include the following properties (with get, but no set):
      • Value of type int.
      • DateTime of type DateTime.
    • Create a constructor to populate these properties.
  6. Define the StepCount class as follows:
    • Include the same properties and constructor parameters as the HeartRate class.
  7. Define the Biometrics class as follows:
    • Include the following properties (with get, but no set):
      • DeviceId of type Guid.
      • HeartRates of type List<HeartRate>.
      • StepCounts of type List<StepCount>.
      • MaxHeartRate of type int.
    • Create a constructor to populate these properties.

Note: We are glossing over complexities in this model. For example, what happens when the MaxHeartRate changes? This simplified model is enough to teach the basic concepts we need, but would likely be insufficient for a production application.

Connect the Application to the Schema Registry

Next, we need to add a Schema Registry client to our application. We already added the configuration in a previous exercise.

  1. Open Program.cs.
  2. Load the SchemaRegistry section of the config into a SchemaRegistryConfig object.
    • Hint: This is similar to loading the ProducerConfig.
  3. Register an instance of a ISchemaRegistryClient using a new CachedSchemaRegistryClient.
    • Hint: This is similar to creating the IProducer but you won't need a builder.

Provide a Serializer to the Producer

Next, our producer will need access to a serializer for the Biometrics class.

  1. In Program.cs add a using directive for the ClientGateway.Domain namespace.
  2. Update the type of your IProducer and ProducerBuilder to <String, Biometrics>.
  3. Prior to calling .Build(), register a new JsonSerializer<Biometrics> using the SetValueSerializer method on the builder.
    • Hint: Retrieving an ISchemaRegistryClient is similar to retrieving an IOptions<ProducerConfig>. Pass it to the constructor for the serializer.

Update the REST Endpoint

Next, update the REST endpoint to accept a Biometrics class, rather than a basic String.

  1. Open ClientGatewayController.cs.
  2. Add a using directive for the ClientGateway.Domain namespace.
  3. Change the BiometricsImportedTopicName from RawBiometricsImported to BiometricsImported.
  4. Modify the _producer to IProducer<String, Biometrics> and propagate those changes.
  5. Modify the RecordMeasurements method to accept Biometrics instead of a String.
  6. Modify the ProducesResponseType to produce Biometrics.
  7. Modify the message as follows:
    • Change the type to <string, Biometrics>.
    • Assign a Key by extracting the DeviceId from the Biometrics and converting it to a String.
  8. Return the Biometrics inside of the Accepted response.

Run the Tests

Run the tests:

dotnet test Fitness.sln

Note: The tests make certain assumptions about parameter order and naming. Adjust your code accordingly.

Run the Application

Run the ClientGateway:

cd ClientGateway
dotnet run

Try It Out

Now we can try interacting with the application.

  1. Navigate to the BiometricsImported topic and visit the Messages tab.
  2. In another browser window, visit the Swagger page at http://localhost:5000/swagger/index.html.
  3. Try sending bad data to the Biometrics endpoint.
    • Modify the input to something incompatible with the schema.
      • Eg. Change the "MaxHeartRate" to a String.
    • What happens when you click execute?
  4. Now try it again with valid input.
  5. Check the Messages tab in the Confluent Cloud Console to verify your message(s) arrived.

We've introduced a minimal amount of validation to ensure that our messages are always well-formed. This puts us in a good position to start consuming those messages downstream because we know exactly what they will look like, based on the schema.

Finish

This brings us to the end of this exercise.

Use the promo code DOTNETKAFKA101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.