Get Started Free
course: Building Apache Flink® Applications in Java

Creating Branching Data Streams in Flink

7 min
Wade Waldron

Wade Waldron

Staff Software Practice Lead

Creating Branching Data Flows in Flink

Overview

When building datastreams you start with a source, apply a series of operations and eventually send the data to a sink. This creates a linear pipeline, but what if you want to introduce branches? Flink streams can include both fan-in, and fan-out style branch points. This includes unions, connectors, side-outputs, and more. In this video, we'll introduce the different types of branches and show how to implement them in Java.

Topics:

  • Fan-in Branches
  • Fan-out Branches
  • Union
  • CoProcess, CoMap, CoFlatMap
  • Multiple sinks
  • Side-outputs

Code

Union

DataStream<MyClass> stream1 = ...
DataStream<MyClass> stream2 = ...
DataStream<MyClass> union = stream1.union(stream2);

Connect

DataStream<Input1> stream1 = ...;
DataStream<Input2> stream2 = ...;
ConnectedStreams<Input1, Input2> connected = 
	stream1.connect(stream2);

CoProcessFunction

connected.process(
	new CoProcessFunction<Input1, Input2, Output>() {
	    @Override
	    public void processElement1(
	    	Input1 input1, 
	    	CoProcessFunction<Input1, Input2, Output>.Context context,
	    	Collector<Output> collector) throws Exception {
	    		...
	    }

	    @Override
	    public void processElement2(
	    	Input2 input2, 
	    	CoProcessFunction<Input1, Input2, Output>.Context context,
	    	Collector<Output> collector) throws Exception {
				...
		}
	}
)

CoMapFunction and CoFlatMapFunction

connected.map(
	new CoMapFunction<Input1, Input2, Output>() {
		... 
	}
)

connected.flatMap(
	new CoFlatMapFunction<Input1, Input2, Output>() {
		...
	}
)

Splitting Streams

DataStream<MyClass> filter1 = stream.filter(...);
DataStream<MyClass> filter2 = stream.filter(...);
DataStream<MyClass> map = filter1.map(...);
DataStream<MyClass> union = map.union(filter2);

Side Outputs

final OutputTag<Output2> outputTag = new OutputTag<Output2>("output2-id"){};

public void processElement(
	Input value,
	Context context,
	Collector<Output1> collector) {
		collector.collect(new Output1(value));
		context.output(
			outputTag, 
			new Output2(value)
		);
	}
SingleOutputStreamOperator<Output1> mainDataStream = ...;

DataStream<Output2> sideOutputStream = mainDataStream.getSideOutput(outputTag);

Resources

Use the promo code FLINKJAVA101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. ­čÖé By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.

Creating Branching Data Streams in Flink

Hi, I'm Wade from Confluent. One of the powerful features of Flink is its ability to create branch points in the datastream. In this video, we'll explore the branching functionality provided by Flink, and situations where it might be useful. We've been comparing our datastreams to plumbing systems to better understand how they work. Fanin vs Fanout Branch If you break down a modern plumbing system, you'll see that there are often many branches in the pipes. The incoming pipes might have branches for a variety of needs including: sinks washing machines sprinklers and more Here we are picturing a fan-out branch. Essentially, a single pipe fans out to multiple pipes. However, fan-in branches where multiple pipes join into a single one, are less obvious because they tend to exist upstream where the average person won't see them. For example, a local water supply might have a treatment plant that draws water from the river. After treatment, some of it will be sent downstream for immediate use. But some of the water might be sent to a reservoir to be available during peak usage or other scenarios. This means we now have two possible water sources. The treatment plant, and the reservoir. In order to draw water from both sources, we need to introduce a fan-in style branch, where two pipelines are merged into one. Use Cases Datastreams have similar needs for both fan-in and fan-out branches. For example, you may want to take data from both a Kafka topic, and a database, and pipe it through the same stream. Or you might want to take data from multiple topics or multiple databases. You also might want to send that data to multiple locations. It's fairly common to send some data to a database for long-term storage, but also to a Kafka topic for immediate downstream consumption. Each of these use cases requires us to create a branch in our stream. Unions So how do we create branches? Let's start by considering fan-in branches. There are a variety of ways that you can create these, but perhaps the simplest is a union. A union requires two streams that each contain the same datatype. The union function will take the first stream, and merge it with the second. The result is a new stream that takes records from the original two, interleaving them where necessary. This type of operation can be useful when you are drawing similar data from multiple locations. As an example, pulling telemetry data from Internet of Things devices might warrant a union. Each device could emit the same type of data, and we need to collect that data and unify it into a single stream. However, what if we have two streams with different data types that we need to merge into one? One option is to leverage the connect operator to create ConnectedStreams. It will merge our two streams into one while maintaining their separate data types. It will then convert those two data types into a single unified type. This is done with the process operation. When working with a normal stream, process operates on a ProcessFunction. With connected streams, it becomes a CoProcessFunction. The difference is that a CoProcessFunction has two processElement methods, one for each of the connected streams. These methods will take each of the inputs, and convert them to a single output. Recall that the process operator can be simplified with functions such as map and flatMap. The same is true of the CoProcessFunction. It can be simplified using a CoMapFunction or a CoFlatMapFunction. Each of these has two methods for handling the input types. Input Types On the surface, it might seem like union and connect can both be used to solve the same problem. The primary difference is whether you map the data to a new type before the operator, or after. Technically, that's true. However, their intended use cases are actually quite different. Stateless Operations For simple stateless operations, it's better to perform conversions to a unified type using techniques like map and then applying a union. Techniques such as CoProcess are typically used in more complex operations where state must be maintained between the two streams while doing the conversion. For example, implementing a join from two data streams into a single object would require something like CoProcess. We'll cover stateful operations in more detail a little later in the course. In the meantime, what about fan-out branches? How can they be implemented? Implementation Often when we write streaming code in Flink, we chain together our operations like this. However, that's not strictly necessary. We can also write each step as an independent statement. When we do that, it opens up possibilities, because now we can take earlier steps and apply different transformations to them creating new branch points along the way. We can eventually funnel those branches into multiple sinks, or even merge them back into a single stream if necessary. This is powerful and allows us to create complex data flows. Side Output An alternative approach to splitting a stream is to use a side output. This requires access to the Context object found inside of a ProcessFunction or similar. To use a side output, we need a way to uniquely identify it. We do this using an OutputTag. OutputTags consist of a type that represents the data contained in the side output, and an id which uniquely identifies it. These tags must be defined as anonymous inner classes so that Flink can use them to derive important type information. We use the output method defined in the context to create a side output. It takes the OutputTag that we defined previously, along with the message that we want to send. Get Side Output We call getSideOutput on a stream operator to retrieve any side outputs it produces. It takes the OutputTag to identify which output we are interested in, in case there is more than one. This function is not available on all stream types. The base DataStream does not include it. You'll need something like a DataStreamSource or a SingleOutputStreamOperator to access this function. The branches we've covered here aren't the only ones. Others become available when you introduce windowing in your stream. This is a topic we haven't covered yet, so we'll have to wait a bit before introducing those other branch types. In the meantime, we can practice using some of these branches in an exercise. If you aren't already on Confluent Developer, head there now using the link in the video description to access the rest of this course and its hands-on exercises.