Enhance your career, get Certified as a Data Streaming Engineer | Get Certified

TestConsume Apache Kafka Messages using Apache Flink and Java

Getting started with Apache Flink® can be challenging. In this short video, Wade Waldron will walk you through a complete example of how to consume Apache Kafka® messages using Apache Flink and Java.

9 min

Consume Apache Kafka Messages using Apache Flink and Java

9 min
Wade Waldron

Wade Waldron

Principal Software Practice Lead

Getting started with Apache Flink® can be challenging. In this short video, Wade Waldron will walk you through a complete example of how to consume Apache Kafka® messages using Apache Flink and Java.

Take the Building Apache Flink Applications in Java course now.

Resources

Consume Apache Kafka Messages using Apache Flink and Java

Intro

Hi. I'm Wade from Confluent. I've been working with streaming data for a long time, but recently I had a chance to learn something new. I started to investigate Apache Flink and discovered that it can be challenging to learn on your own. Thankfully, I had access to some of the best experts in the field to help me through the tricky parts. But where does that leave everyone else? We can't all have a direct line to the Flink experts, although it would be awesome if we could. But maybe I can share that expertise to help you get started with your first Flink application. I'm going to show you how to build a Flink job that consumes data from a Kafka topic. The job we're building will be responsible for importing flight data from a fictional source known as SkyOneAirlines. We'll pull the data from a Kafka topic named SkyOne, and for now, simply print it to the standard output. So let's write some code.

The Flink Job

Our Flink job is going to live inside of a package named flightimporter. It will have a series of imports, but you don't really want to watch me type a bunch of imports. So let's just move along. You can pause the video if you want to see them in more detail. Next, we need to define the class that will contain our job. Let's call it FlightImporterJob. It will contain the entry point that we use when we start the application.

The Entry Point

Flink uses a standard Java main method for its entry point. This will include an array of arguments passed to the application when its started. For today, we're not worried about error handling, so we'll just throw all exceptions. Note to self this may not be the best approach in production.

The Stream Execution Environment

Our Flink job is executed inside something known as a StreamExecutionEnvironment. Let's create that and name it e-n-v. To obtain the environment we can call StreamExecutionEnvironment. getExecutionEnvironment

Kafka Configuration

Now our application is going to be consuming a Kafka topic. That means we will need access to some Kafka configurations, such as the bootstrap servers, access keys, etc. Let's use a java util properties object to get access to this config. We can load our configuration by creating an input stream using the ClassLoader. I'm putting my configuration into a file named consumer.properties, so we'll load it from there. Finally, we just need to load the stream into our config. That gets the boilerplate out of the way.

The Kafka Source

Now let's do some actual Flink. We'll start by building a KafkaSource. This source will read SkyOneAirlinesFlightData objects from the Kafka topic. The details of the object don't really matter. It's just a plain old Java object with a bunch of getters and setters. We create the source using a typed builder from the Kafka source object. The builder provides a variety of methods to construct our source. To configure our Kafka connection. We can use the setProperties method and pass it the properties object we just created. We also need to tell the source what topic to consume from. This can be done using the setTopics method and supplying one or more topic names In this case, I'm using a single topic named skyone. When we consume a topic for the first time, we have a decision to make. We can consume from the beginning of the topic using the earliest offset, or we can consume from the end using the latest offset. For the purpose of this demo, we don't care about old messages, so we'll use the latest offset. I also cluttered that topic with garbage while building this, and I'm too lazy to delete the old data. However, if you wanted to ensure all messages were consumed, you might choose earliest instead. This is quite common for production applications. The Kafka topic contains objects that have been serialized to a JSON format. JSON is nice for demos like this because it's human readable and common. On the other hand, JSON serializers are often slow and the data size can be problematic. You might want a more compact and efficient format such as Avro or Protobuf in a production system. The JSON objects are stored in the value portion of the Kafka message. To extract these objects, I'm going to use a ValueOnlyDeserializer along with a JsoneDerializationSchema. We just need to pass the schema a reference to the class that we want to deserialize to. Finally, we just need to build our source. All right. That takes care of our data source.

The Data Stream

Next, we need to use the source to construct a stream that we can process. Let's create a DataStream of SkyOneAirlinesFlightData objects We’ll name it skyOneStream. Following the pattern we defined with the SkyOneSource To create the stream, we can use the fromSource method on our environment. It requires a source to pull from. So we pass it the skyOneSource. It also requires a watermark strategy. Watermarks are a complex topic. I've been told that 90% of the time, if your stream isn't producing any data, it's probably due to watermarks. Since this is such a critical and complicated topic, let's just ignore it and go with no watermarks. Actually, if you want more details, check out our Flink courses on Confluent Developer or the Flink documentation. The final thing we need for our stream is a name for the source. Let's call it skyone_source. This gives us a stream that we can consume, but a stream isn't really complete unless it has an endpoint. It would be a bit like building a plumbing system with no drains. Sure, you can turn on the water, but it's going to make a pretty big mess.

The Print Sink

For now, let's do the simplest thing possible and print the stream to the standard output. Now that our stream has both a beginning and an end, we can go ahead and run it. Let's call the execute function on the environment and pass a name for the job.

Executing the Stream

In this case, FlightImporter. Okay. That takes care of the code. Now let's see what it looks like in action.

Compiling and Running

We need to build the code into a jar file. Using Maven, we do this by executing mvn clean package. This will print a bunch of output as Maven downloads the internet. Feel free to go grab a drink while we wait or maybe I'll just skip ahead. Eventually, the build should succeed. Next, we start the job using the Flink run command. We'll use the -c argument to specify the entry point. However, we could also register the entry point in the jar manifest file. In that case, the -c argument could be skipped.

Verifying it Works

Once the job is running, we can check the Flink logs to ensure that it's working. Finding the logs can be tricky. It seems like this should be easy, but the thing to remember is that Flink is a distributed system and it might be running on many different machines. In fact, each operation in the job may be running somewhere else. That makes logging a little trickier than I might like to find the logs we will select the running job from the list. Pick the task we are looking for and navigate to the task manager. This is where we find the logging. We are specifically interested in the standard out. So let's open that. And now we can see there is a series of flight messages that have been printed. If you were really curious about what the SkyOneAirlinesFlightDataObject looked like, you can see the structure here. This shows that the job is working the way we would expect, which is unsurprising because why would I record a video where all my code is broken?

Next Steps

Now, normally we wouldn't stop here. Printing to the standard output is useful for debugging, but basically pointless otherwise in a real application we'd want to export the data to a database, another Kafka topic or something like that. But I think I'll leave that for another video. In the meantime, if you want more information on Flink or want to try this yourself, check out the course Building Flink applications in Java on Confluent Developer. And feel free to drop a comment below to let us know what topic you'd like us to explore next. Don't forget to like, share and subscribe for more awesome content. And thanks for watching.