Wade Waldron

Staff Software Practice Lead

Creating Branching Data Flows in Flink


When building datastreams you start with a source, apply a series of operations and eventually send the data to a sink. This creates a linear pipeline, but what if you want to introduce branches? Flink streams can include both fan-in, and fan-out style branch points. This includes unions, connectors, side-outputs, and more. In this video, we'll introduce the different types of branches and show how to implement them in Java.


  • Fan-in Branches
  • Fan-out Branches
  • Union
  • CoProcess, CoMap, CoFlatMap
  • Multiple sinks
  • Side-outputs



DataStream<MyClass> stream1 = ...
DataStream<MyClass> stream2 = ...
DataStream<MyClass> union = stream1.union(stream2);


DataStream<Input1> stream1 = ...;
DataStream<Input2> stream2 = ...;
ConnectedStreams<Input1, Input2> connected = 


	new CoProcessFunction<Input1, Input2, Output>() {
	    public void processElement1(
	    	Input1 input1, 
	    	CoProcessFunction<Input1, Input2, Output>.Context context,
	    	Collector<Output> collector) throws Exception {

	    public void processElement2(
	    	Input2 input2, 
	    	CoProcessFunction<Input1, Input2, Output>.Context context,
	    	Collector<Output> collector) throws Exception {

CoMapFunction and CoFlatMapFunction
	new CoMapFunction<Input1, Input2, Output>() {

	new CoFlatMapFunction<Input1, Input2, Output>() {

Splitting Streams

DataStream<MyClass> filter1 = stream.filter(...);
DataStream<MyClass> filter2 = stream.filter(...);
DataStream<MyClass> map =;
DataStream<MyClass> union = map.union(filter2);

Side Outputs

final OutputTag<Output2> outputTag = new OutputTag<Output2>("output2-id"){};

public void processElement(
	Input value,
	Context context,
	Collector<Output1> collector) {
		collector.collect(new Output1(value));
			new Output2(value)
SingleOutputStreamOperator<Output1> mainDataStream = ...;

DataStream<Output2> sideOutputStream = mainDataStream.getSideOutput(outputTag);


