Get Started Free
Wade Waldron

Wade Waldron

Staff Software Practice Lead

Creating Branching Data Flows in Flink

Overview

When building datastreams you start with a source, apply a series of operations and eventually send the data to a sink. This creates a linear pipeline, but what if you want to introduce branches? Flink streams can include both fan-in, and fan-out style branch points. This includes unions, connectors, side-outputs, and more. In this video, we'll introduce the different types of branches and show how to implement them in Java.

Topics:

  • Fan-in Branches
  • Fan-out Branches
  • Union
  • CoProcess, CoMap, CoFlatMap
  • Multiple sinks
  • Side-outputs

Code

Union

DataStream<MyClass> stream1 = ...
DataStream<MyClass> stream2 = ...
DataStream<MyClass> union = stream1.union(stream2);

Connect

DataStream<Input1> stream1 = ...;
DataStream<Input2> stream2 = ...;
ConnectedStreams<Input1, Input2> connected = 
	stream1.connect(stream2);

CoProcessFunction

connected.process(
	new CoProcessFunction<Input1, Input2, Output>() {
	    @Override
	    public void processElement1(
	    	Input1 input1, 
	    	CoProcessFunction<Input1, Input2, Output>.Context context,
	    	Collector<Output> collector) throws Exception {
	    		...
	    }

	    @Override
	    public void processElement2(
	    	Input2 input2, 
	    	CoProcessFunction<Input1, Input2, Output>.Context context,
	    	Collector<Output> collector) throws Exception {
				...
		}
	}
)

CoMapFunction and CoFlatMapFunction

connected.map(
	new CoMapFunction<Input1, Input2, Output>() {
		... 
	}
)

connected.flatMap(
	new CoFlatMapFunction<Input1, Input2, Output>() {
		...
	}
)

Splitting Streams

DataStream<MyClass> filter1 = stream.filter(...);
DataStream<MyClass> filter2 = stream.filter(...);
DataStream<MyClass> map = filter1.map(...);
DataStream<MyClass> union = map.union(filter2);

Side Outputs

final OutputTag<Output2> outputTag = new OutputTag<Output2>("output2-id"){};

public void processElement(
	Input value,
	Context context,
	Collector<Output1> collector) {
		collector.collect(new Output1(value));
		context.output(
			outputTag, 
			new Output2(value)
		);
	}
SingleOutputStreamOperator<Output1> mainDataStream = ...;

DataStream<Output2> sideOutputStream = mainDataStream.getSideOutput(outputTag);

Resources

Use the promo code FLINKJAVA101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.