Get Started Free
Wade Waldron

Wade Waldron

Staff Software Practice Lead

Flink Data Sources

Overview

Every Flink datastream starts with a Source (or possibly more than one). This is the origin of the data. This data may be created programmatically, it may be read from a file or a database, or it may come from a streaming platform such as Apache Kafka. Depending on the nature of the source, the datastream might be finite it might be infinite. Understanding the difference can be important for implementing certain types of operations in the stream. This video will introduce a few different types of sources and discuss the difference between finite and infinite data streams.

Topics:

  • Sources
  • Finite Sources
  • Infinite Sources
  • Creating programmatic Sources
  • File Sources
  • Kafka Sources

Code

StreamExecutionEnvironment.fromElements

DataStream<Integer> stream = env.fromElements(1,2,3,4,5);

DataGeneratorSource

DataGeneratorSource<String> source =
	new DataGeneratorSource<>(
			index -> "String"+index,
			numRecords,
			RateLimiterStrategy.perSecond(1),
			Types.STRING
	);

FileSource

FileSource<String> source =
	FileSource.forRecordStreamFormat(
		new TextLineInputFormat(),
		new Path("<PATH_TO_FILE>")
	).build();

KafkaSource

KafkaSource<String> source = KafkaSource.<String>builder()
	.setProperties(config)
	.setTopics("topic1", "topic2")
	.setValueOnlyDeserializer(new SimpleStringSchema())
	.build();

DataStream.fromSource

DataStream<String> stream = env
    .fromSource(
    	source,
    	WatermarkStrategy.noWatermarks(), 
    	"my_source"
	);

DataStream.print

stream.print();

Resources

Use the promo codes FLINKJAVA101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.

Flink Data Sources

Hi, I'm Wade from Confluent. In this video, we're going to look at data sources in Flink. We'll get a better sense of what they are, and we'll see how we can implement some of them. We can think of our data source as roughly equivalent to the water source in a plumbing system. The water source often takes the form of a pump, but it needs to draw water from somewhere. This would usually be from a reservoir or perhaps a river. When we build a data source, we have a similar need to draw data from somewhere. This could be something simple, such as a list of integers or strings, but more often it will be from a file, a database, or a streaming system such as Kafka. Let's go back to our plumbing analogy for a moment. Imagine our water pump is drawing water from a reservoir. Reservoirs typically have a limited supply of water. Once we've drawn all the water, our stream will stop flowing. Data streams might have similar limitations. If we draw data from a file, when we reach the end of the file the stream will terminate. We refer to this as a finite data source. However, what if our water source was drawing from a river? As long as the river doesn't dry up, we can keep pumping water forever. Similarly, a data source that comes from a Kafka topic can continue to consume records as long as the topic exists. This is what we refer to as an infinite data source. This decision to consume from a finite or infinite data source has consequences. It can impact how we design our stream. For example, in a finite stream of integers, we can take the sum of all of the integers. But if the stream is infinite, then it would never produce a sum. Similarly, we could collect all of the integers into an array if the stream is finite, but in an infinite stream we'd run out of memory and the system would crash. When we design our streams, we need to consider what kind of data source we are using or we could end up building a system that crashes or fails to produce a result. So how do we go about building a data source? One of the simplest data sources comes from a programmatically created collection of elements, such as a set of integers. We can use the fromElements method in our StreamExecutionEnvironment to create a finite data source from the collection. The fromElements method goes a step further and wraps our source in a DataStream so that it's ready to use. This technique is useful when writing test code, but tends to be too limited for a production application. Another useful technique for testing is to use a DataGeneratorSource. This creates a source using a function that goes from an index value to another type. In this case, a string. Each time data is requested from the source, it will call the function. We can set limits on this so that it only produces a certain number of records. We can also limit the frequency. Much like with the fromElements method, this technique has limited value in a production application. A more common technique for a production application is to stream data from a file. We can use the forRecordStreamFormat method to stream records one at a time from our file. We just need to supply a format for each record. In this case, we are using lines of text. And we need to supply a path to the file. Finally, we build the source. Because the file is finite, the stream will terminate once it reads to the end. However, it's possible to create sources that monitor files for changes and continue to stream the data. One of the most common ways to create a stream is from a Kafka topic. To create a KafkaSource, we can leverage the KafkaSource.builder. This will require us to provide a variety of Kafka properties, such as the Bootstrap servers. We can do this using individual setters on the builder, but it's often easier to simply load them from a properties file. We also need to provide the topic or topics that we'll be subscribing to, and we need a deserializer. In this case, we are using a SimpleStringSchema for deserialization so that each record will be read as a string. Once we've created our source, the final step is to turn it into a stream. We can use the fromSource method on our StreamExecutionEnvironment to create a DataStream from our source. This requires us to supply the source, a WatermarkStrategy such as noWatermarks, and a name for the source. As a side note, watermarks are a critical topic in Flink. It's too much for us to cover here, but we will have other videos that discuss it in more detail. Make sure to check those out along with the Flink documentation. Once we've created our source, we're ready to start using it. A simple test is to print the stream. This will print each record to the standard output of the job. We now have the tools we need to start building data sources. Later, we'll see how to do something more interesting than just printing to the standard output. In the meantime, let's put some of these skills to use in some practical exercises. If you aren't already on Confluent Developer, head there now using the link in the video description to access the rest of this course and its hands-on exercises.