Get Started Free
Wade Waldron

Wade Waldron

Staff Software Practice Lead

Hands-On: Use the Stream Lineage

Note: This exercise is part of a larger course. You are expected to have completed the previous exercises.

As we have been working through our exercises and slowly building up a pipeline of data, Confluent Cloud has been building a map of that pipeline. You can view this map through the Stream Lineage. It shows the various topics that have been created, the producers and consumers of those topics, as well as some additional details.

In this exercise, we'll be exploring the Stream Lineage in greater depth.

Stage the Exercise

As with the last exercise, you will continue to work in the exercises folder. To start with, let's make sure we are in the right initial state for the exercise. Run the following command:

./exercise.sh stage 13

View the Current Lineage

Let's start by viewing the current lineage.

  1. Compile and run the OrderService and PaymentService.

    mvn clean compile package
    mvn exec:java -Dexec.mainClass="io.confluent.OrderService"
    mvn exec:java -Dexec.mainClass="io.confluent.PaymentService"
  2. In your Confluent Cluster, select Stream lineage from the left-hand navigation.

  3. This should take you to a lineage that looks something like the following:

    Initial Stream Lineage

    This lineage is interesting because it shows each of the steps we have implemented so far. However, it is limited. Our current steps don't include any branch points. Next, we want to make the lineage a little more complex by adding a branch.

Create the PaymentFailed Topic

To add a branch in our logic, we are going to modify the PaymentService to emit failure events in addition to successes. To do that, we will need a new topic to contain our failure events.

  1. Start by adding a new topic named PaymentFailed.

  2. Next, add a schema for the topic. This schema is identical to the PaymentSucceed schema, except that it contains a reason for the failure.

    {
       "type":"record",
       "name":"PaymentFailed",
       "doc":"An event that is produced each time a payment is successful.",
       "namespace":"io.confluent",
       "fields":[
          {
             "name":"paymentId",
         	   "doc":"A unique identifier for this payment.",
             "type":"string"
          },
          {
             "name":"orderId",
        	   "doc":"The unique identifier for the order that this payment is attached to.",
             "type":"string"
          },
          {
             "name":"amount",
        	   "doc":"The amount of the payment (in US Dollars).",
             "type":"double"
          },
          {
             "name":"reason",
             "doc":"A text description for why the payment failed.",
             "type":"string"
          }
       ]
    }	
  3. Tag this schema with the appropriate tags and add a description.

Modify the Payment Service

Next, we need to modify the PaymentService to emit the proper events.

  1. Start by downloading the schema.

    mvn schema-registry:download
  2. Next, open PaymentService.java

    Note: We recommend taking the time to modify the code on your own. However, if you are unfamiliar with Java, you can copy the relevant solution file by running the command:

    ./exercise.sh solve 13 PaymentService.java

    If you choose to copy the solution, you can skip ahead to step 8.

  3. Create a new private static final field named PAYMENT_FAILED with a value of PaymentFailed.

  4. Inside the execute method, you'll find a series of statements that creates a new PaymentSucceeded and then sends that record to the Producer. Copy these lines and modify the copy to instead produce a PaymentFailed. For a reason, you can use "Insufficient Funds".

    Note: Be careful to ensure you change all of the references to PaymentSucceeded, otherwise you could end up producing the wrong data, or putting it in the wrong topic.

  5. Wrap the two versions of the message production in a conditional statement with the following logic:

    • Generate a random integer (between 0-100).
      • You can use the rnd random number generator defined above.
    • If the number is greater than or equal to 10 (90% chance) emit a PaymentSucceeded.
    • If the number is less than 10 (10% chance) emit a PaymentFailed.
  6. Remember to flush the paymentFailedProducer.

  7. Compile and execute the OrderService and the PaymentService.

    mvn clean compile package
    mvn exec:java -Dexec.mainClass="io.confluent.OrderService"
    mvn exec:java -Dexec.mainClass="io.confluent.PaymentService"

View the Stream Lineage

Now that we have introduced a branch point into our pipeline, let's view the lineage that has been created.

  1. Open the Stream Lineage. You should see something like the following:

    Lineage including PaymentFailed events.

    Note: It takes time for the lineage to update. If you don't see a change immediately, try refreshing the page after a few minutes.

    Notice how the PaymentFailed topic is now visible in our lineage.

However, you'll also want to notice how the line going from the PaymentService to the PaymentSucceeded topic is much wider than the line going to the PaymentFailed topic. This is because only 10% of our messages are PaymentFailed. This is reflected in the lineage.

Run the ShippingService

Next, let's enrich our lineage even more. Once a payment has been processed successfully, the next step is to ship the product. We are going to run a ShippingService that will consume the PaymentSucceeded events, but not the PaymentFailed events. Then we will see how that impacts our lineage.

  1. Run the ShippingService.

    mvn exec:java -Dexec.mainClass="io.confluent.ShippingService"
  2. Inspect the Stream Lineage. You should see something like this:

    Final lineage with all events.

    Notice how our ShippingService is now visible on the lineage.

  3. Take some time to explore the lineage. Click on the topics and services to see what kind of information is available. You should be able to get valuable insights such as throughput as well as links to relevant producers, consumers, and schemas.

Finish

This brings us to the end of this exercise.

Use the promo code GOVERNINGSTREAMS101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.