Course: Kafka Streams 101

Hands On: Basic Operations

3 min
Sophie Blee-GoldmanSoftware Engineer II (Course Presenter)
Bill BejeckIntegration Architect (Course Author)

Hands On: Basic Operations

Clone the course GitHub repository and load it into your favorite IDE or editor.

git clone https://github.com/confluentinc/learn-kafka-courses.git
cd learn-kafka-courses/kafka-streams

The source code in this course is compatible with Java 11. Compile the source with ./gradlew build and follow along in the code.

This module’s code can be found in the source file java/io/confluent/developer/basic/BasicStreams.java.

The Basic Operations exercise demonstrates how to use Kafka Streams stateless operations such as filter and mapValues. The next few steps, including creating the properties, adding the application ID, and creating the StreamsBuilder, are found in each exercise, so they will only be discussed in this one.

  1. Begin by creating a properties object:

    package io.confluent.developer.basic;
    
    import java.io.IOException;
    import java.util.Properties;	
    
    public class BasicStreams {
        public static void main(String[] args) throws IOException {
        Properties streamsProps = new Properties();
        }
    }
  2. Use a FileInputStream to load properties from a file that includes the Confluent Cloud properties that you created for working with the cluster; in addition, add the application configuration ID to the properties:

    try (FileInputStream fis = new FileInputStream("src/main/resources/streams.properties")) {
        streamsProps.load(fis);
    }
    streamsProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "basic-streams");
  3. Create a StreamsBuilder instance, and retrieve the name of the inputTopic and outputTopic from the Properties:

    StreamsBuilder builder = new StreamsBuilder()	
    final String inputTopic = streamsProps.getProperty("basic.input.topic");
    final String outputTopic = streamsProps.getProperty("basic.output.topic");
  4. Create the order number variable (you'll see where it comes into play soon), and then create the KStream instance (note the use of the inputTopic variable):

    final String orderNumberStart = "orderNumber-";
    KStream<String, String> firstStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String())); 
  5. Add a peek operator (it's expected that you don't modify the keys and values). Here, it's printing records as they come into the topology:

    firstStream.peek((key, value) -> System.out.println("Incoming record - key " +key +" value " + value))
  6. Add a filter to drop records where the value doesn't contain an order number string:

    .filter((key, value) -> value.contains(orderNumberStart))	
  7. Add a mapValues operation to extract the number after the dash:

    .mapValues(value -> value.substring(value.indexOf("-") + 1))
  8. Add another filter to drop records where the value is not greater than 1000:

    .filter((key, value) -> Long.parseLong(value) > 1000)
  9. Add an additional peek method to display the transformed records:

    .peek((key, value) -> System.out.println("Outgoing record - key " +key +" value " + value))
  10. Add the to operator, the processor that writes records to a topic:

    .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
  11. Create the Kafka Streams instance:

    KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), streamsProps);
  12. Use the utility method TopicLoader.runProducer() to create the required topics on the cluster and produce some sample records (we’ll see this pattern throughout the exercises, but keep in mind that it's not part of a standard Kafka Streams application):

    TopicLoader.runProducer();
  13. Start the application:

    kafkaStreams.start();

Use the promo code STREAMS101 to get $101 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.