Staff Software Practice Lead
Note: This exercise is part of a larger course. You are expected to have completed the previous exercises.
In the previous exercise, we created a REST endpoint that accepted data from a fitness tracker. We pushed that data directly to Kafka with no validation.
The lack of validation creates potential issues, so in this exercise, we will introduce basic validation by converting the strings to a known object format. We will serialize the object and validate it against a schema. This will ensure that all data being pushed into Kafka matches the expected schema.
Stage the code for this exercise by executing:
./exercise.sh stage 09
exercise.bat stage 09
Previously, we pushed data to a topic named RawBiometricsImported
. Because the data wasn't validated, we can't guarantee its integrity.
Let's create a new topic where we can guarantee the validity of the data.
In the Confluent Cloud Console create a new topic named BiometricsImported
(use the default partitions).
Define a data contract by choosing to Create a schema for message values.
Add a JSON Schema using the following definition:
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Biometrics",
"description": "Biometrics collected from the fitness tracker.",
"type": "object",
"additionalProperties": false,
"definitions": {
"HeartRate": {
"additionalProperties": false,
"properties": {
"DateTime": {
"format": "date-time",
"type": "string"
},
"Value": {
"format": "int32",
"type": "integer"
}
},
"type": "object"
},
"StepCount": {
"additionalProperties": false,
"properties": {
"DateTime": {
"format": "date-time",
"type": "string"
},
"Value": {
"format": "int32",
"type": "integer"
}
},
"type": "object"
}
},
"properties": {
"DeviceId": {
"format": "guid",
"type": "string"
},
"HeartRates": {
"items": {
"$ref": "#/definitions/HeartRate"
},
"type": "array"
},
"StepCounts": {
"items": {
"$ref": "#/definitions/StepCount"
},
"type": "array"
},
"MaxHeartRate": {
"format": "int32",
"type": "integer"
}
}
}
This schema will be mapped to classes in C#. Let's define them.
Domain
.Domain
folder create a new file named Biometrics.cs
.ClientGateway.Domain
.HeartRate
, StepCount
, Biometrics
.HeartRate
class as follows:
get
, but no set
):
Value
of type int
.DateTime
of type DateTime
.StepCount
class as follows:
HeartRate
class.Biometrics
class as follows:
get
, but no set
):
DeviceId
of type Guid
.HeartRates
of type List<HeartRate>
.StepCounts
of type List<StepCount>
.MaxHeartRate
of type int
.Note: We are glossing over complexities in this model. For example, what happens when the MaxHeartRate
changes? This simplified model is enough to teach the basic concepts we need, but would likely be insufficient for a production application.
Next, we need to add a Schema Registry client to our application. We already added the configuration in a previous exercise.
Program.cs
.SchemaRegistry
section of the config into a SchemaRegistryConfig
object.
ProducerConfig
.ISchemaRegistryClient
using a new CachedSchemaRegistryClient
.
IProducer
but you won't need a builder.Next, our producer will need access to a serializer for the Biometrics
class.
Program.cs
add a using
directive for the ClientGateway.Domain
namespace.IProducer
and ProducerBuilder
to <String, Biometrics>
..Build()
, register a new JsonSerializer<Biometrics>
using the SetValueSerializer
method on the builder.
ISchemaRegistryClient
is similar to retrieving an IOptions<ProducerConfig>
. Pass it to the constructor for the serializer.Next, update the REST endpoint to accept a Biometrics
class, rather than a basic String
.
ClientGatewayController.cs
.using
directive for the ClientGateway.Domain
namespace.BiometricsImportedTopicName
from RawBiometricsImported
to BiometricsImported
._producer
to IProducer<String, Biometrics>
and propagate those changes.RecordMeasurements
method to accept Biometrics
instead of a String
.ProducesResponseType
to produce Biometrics
.message
as follows:
<string, Biometrics>
.Key
by extracting the DeviceId
from the Biometrics
and converting it to a String.Biometrics
inside of the Accepted
response.Run the tests:
dotnet test Fitness.sln
Note: The tests make certain assumptions about parameter order and naming. Adjust your code accordingly.
Run the ClientGateway:
cd ClientGateway
dotnet run
Now we can try interacting with the application.
BiometricsImported
topic and visit the Messages tab.Biometrics
endpoint.
We've introduced a minimal amount of validation to ensure that our messages are always well-formed. This puts us in a good position to start consuming those messages downstream because we know exactly what they will look like, based on the schema.
This brings us to the end of this exercise.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.