Get Started Free
Wade Waldron

Wade Waldron

Staff Software Practice Lead

Transforming Data in Flink

Overview

The real power of Flink comes from its ability to transform data in a distributed streaming pipeline. It contains a variety of operators that enable both the transformation and the distribution of data. These operators include common functions such as map, flat map, and filter, but they also include more advanced techniques. This video will show a few of the basic operations that can be performed on a Flink datastream as well as how they might fit into a streaming pipeline.

Topics:

  • Flink Datastream Operators
  • Process Functions and Keyed Process Functions
  • Map
  • FlatMap
  • Filter
  • KeyBy
  • Reduce

Code

ProcessFunction - Mapping Elements

public class MyProcessFunction 
  extends ProcessFunction<Input, Output> {
  @Override
  public void processElement(
    Input input,
    ProcessFunction<Input, Output>.Context ctx,
    Collector<Output> collector
  ) {
    collector.collect(new Output(input));
  }
}

ProcessFunction - Flattening Mapped Elements

public class MyProcessFunction 
  extends ProcessFunction<Input[], Output> {
  @Override
  public void processElement(
    Input[] collection,
    ProcessFunction<Input[], Output>.Context ctx,
    Collector<Output> collector
  ) {
    for(Input input : collection) {
      collector.collect(new Output(input));
    }
  }
}

ProcessFunction - Filtering Elements

  public class MyProcessFunction 
  extends ProcessFunction<Input, Input> {
  @Override
  public void processElement(
    Input input,
    ProcessFunction<Input, Input>.Context ctx,
    Collector<Input> collector
  ) {
    if(condition) {
      collector.collect(input);
    }
  }
}

Process

stream.process(new MyProcessFunction());

Map

stream.map(input -> new Output(input));
DataStream<Double> doubles = integers.map(
	input -> Double.valueOf(input) / 2
);

FlatMap

stream.flatMap((collection,collector) -> {
	for(Input input: collection) {
		collector.collect(new Output(input));
	}
});
DataStream<Integer> letterCount = sentences
	.map(input -> input.split(" "))
	.flatMap((words, collector) -> {
		for (String word : words) {
			collector.collect(word.length());
		}
	});

Filter

stream.filter(input -> condition);
DataStream<Integer> evenIntegers = integers
	.filter(input -> input % 2 == 0);

KeyBy

stream.keyBy(
	input -> input.getKey()
)

KeyedProcessFunction

class MyKeyedProcessFunction
  extends KeyedProcessFunction<String, Input, Output> {
  @Override
  public void processElement(
    Input input,
    KeyedProcessFunction<String, Input, Output>.Context ctx,
    Collector<Output> collector) {
      String key = ctx.getCurrentKey();
      ...
  }
}

Reduce

stream
  .keyBy(input -> input.key)
  .reduce((s1, s2) -> s1.merge(s2));
DataStream<Tuple2<String, Integer>> wordCountsByFirstLetter = 
	itemIdsAndCounts
		.keyBy(tuple -> tuple.f0)
		.reduce((l1, l2) -> new Tuple2(l1.f0, l1.f1 + l2.f1));

Resources

Use the promo code FLINKJAVA101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. ­čÖé By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.