course: Kafka Streams 101

Hands On: Basic Operations

3 min
Screen Shot 2021-07-23 at 4.13.50 PM

Sophie Blee-Goldman

Senior Software Engineer (Presenter)

bill-bejeck

Bill Bejeck

Integration Architect (Author)

Hands On: Basic Operations

The Basic Operations exercise demonstrates how to use Kafka Streams stateless operations such as filter and mapValues. Note that the next few steps, including setting up Confluent Cloud, creating the properties, adding the application ID, and creating the StreamsBuilder, apply to each exercise but will only be shown in this one.

  1. Clone the course's GitHub repository and load it into your favorite IDE or editor.
        git clone https://github.com/confluentinc/learn-kafka-courses.git
  2. Change to the directory.
        cd learn-kafka-courses/kafka-streams
  3. Compile the source with ./gradlew build. The source code in this course is compatible with Java 11, and this module's code can be found in the source file java/io/confluent/developer/basic/BasicStreams.java.

4. [Go to Confluent Cloud](https://www.confluent.io/confluent-cloud/tryfree/) and use the promo code `STREAMS101` for $101 of free usage.
  1. Create a new cluster in Confluent Cloud. For the purposes of all the exercise modules you can use the Basic type. Name the cluster kafka_streams_course. Note the associated costs, then click on the Launch Cluster button on the bottom right. (Make a mental note at this point to remember to shut down the cluster when you no longer need it; there are shutdown instructions at the end of the course).

    ks-cc-cluster-naming

  2. You’ll also need to set up a Schema Registry. Click on the environment link in the upper left corner (probably DEFAULT). Then click the Schema Registry link and follow the prompts to set up a schema registry on the provider of your choice. Once that is complete, go back to your cluster.

Schema Registry

  1. Next click on Data Integration in the menu on the left, then select Clients, then the Java tile.

    cc-select-java-client

  2. You'll be dropped down to a window where you can create credentials for your cluster and Confluent Schema Registry. create-cc-properties

    1. Click on Create Kafka cluster API key.
    2. Copy your key and secret, name the file, then click Download and continue. (Your credentials will populate into the configurations boilerplate.)
    3. Click on Create Schema Registry API key.
    4. Copy your key and secret, name the file, then click Download and continue. (Your credentials will populate into the configurations boilerplate.)
    5. Make sure Show API keys is selected, then Copy the configurations in the window.

    show-api-keys-ui 6. Create a file named ccloud.properties in the src/main/resources directory of the repo you downloaded. Then paste the configurations into the ccloud.properties file. Note that this file is ignored and should never get checked into GitHub.

  3. Now set up properties for the exercises (you are taking a couple of minor extra steps to make sure that any sensitive information doesn't get accidentally checked into GitHub). First make sure you are in src/main/resources. Then run the following commands:

    • cat streams.properties.orig > streams.properties
    • cat ccloud.properties >> streams.properties
  4. Next, create a properties object in your BasicStreams.java file:

    package io.confluent.developer.basic;
    
    import java.io.IOException;
    import java.util.Properties;	
    
    public class BasicStreams {
        public static void main(String[] args) throws IOException {
        Properties streamsProps = new Properties();
        }
    }
  5. Use a FileInputStream to load properties from the file that includes your Confluent Cloud properties; in addition, add the application configuration ID to the properties:

    try (FileInputStream fis = new FileInputStream("src/main/resources/streams.properties")) {
        streamsProps.load(fis);
    }
    streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "basic-streams");
  6. Create a StreamsBuilder instance, and retrieve the name of the inputTopic and outputTopic from the Properties:

    StreamsBuilder builder = new StreamsBuilder()	
    final String inputTopic = streamsProps.getProperty("basic.input.topic");
    final String outputTopic = streamsProps.getProperty("basic.output.topic");
  7. Create an order number variable (you'll see where it comes into play soon), and then create the KStream instance (note the use of the inputTopic variable):

    final String orderNumberStart = "orderNumber-";
    KStream<String, String> firstStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String())); 
  8. Add a peek operator (it's expected that you don't modify the keys and values). Here, it's printing records as they come into the topology:

    firstStream.peek((key, value) -> System.out.println("Incoming record - key " +key +" value " + value))
  9. Add a filter to drop records where the value doesn't contain an order number string:

    .filter((key, value) -> value.contains(orderNumberStart))	
  10. Add a mapValues operation to extract the number after the dash:

    .mapValues(value -> value.substring(value.indexOf("-") + 1))
  11. Add another filter to drop records where the value is not greater than 1000:

    .filter((key, value) -> Long.parseLong(value) > 1000)
  12. Add an additional peek method to display the transformed records:

    .peek((key, value) -> System.out.println("Outgoing record - key " +key +" value " + value))
  13. Add the to operator, the processor that writes records to a topic:

    .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
  14. Create the Kafka Streams instance:

    KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps);
  15. Use the utility method TopicLoader.runProducer() to create the required topics on the cluster and produce some sample records (we’ll see this pattern throughout the exercises, but keep in mind that it's not part of a standard Kafka Streams application):

    TopicLoader.runProducer();
  16. Start the application:

    kafkaStreams.start();

Now you can run the basic operations example with this command:

./gradlew runStreams -Pargs=basic

and your output on the console should resemble this:

Incoming record - key order-key value orderNumber-1001
Outgoing record - key order-key value 1001
Incoming record - key order-key value orderNumber-5000
Outgoing record - key order-key value 5000

Take note that it's expected to not have a corresponding output record for each input record due to the filters applied by the Kafka Steams application.

Use the promo code STREAMS101 to get $101 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.