Staff Software Practice Lead
In Flink, the endpoint of your datastream takes the form of a data sink. These sinks will usually connect to a database or streaming platform. As data flows through the datastream it eventually gets pushed into the sink through the use of one of Flink's connectors. In this video, we'll introduce the concept of a Flink data sink and show how you can create a simple Kafka sink.
Topics:
KafkaRecordSerializationSchema<MyClass> serializer =
KafkaRecordSerializationSchema.<MyClass>builder()
.setTopic("topic_name")
.setValueSerializationSchema(
new JsonSerializationSchema<>()
)
.build();
KafkaSink<MyClass> sink = KafkaSink.<MyClass>builder()
.setKafkaProducerConfig(config)
.setRecordSerializer(serializer)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build();
stream
.sinkTo(sink)
.name("sink_name");
Hi, I'm Wade from Confluent. In previous videos, we focused on moving data through a Flink Data Stream, but we're at the end of that journey. Now, it's time to see how we get data out of the stream using a data Sink. Overview We started our discussion by talking about how Flink Data Streams are comparable to plumbing systems. We saw the water pump and how it's similar to the data source that pushes data into the stream, and we explored the pipes that carry our water and how they correspond to the operators that transform our data. But as long as the data is trapped in the data stream, it isn't very useful. Just like having water trapped in a bunch of pipes doesn't do much for us. At some point, we need to make use of it. This is where the sink comes in. In this case, we're talking about a Data Sink. Data Sync The purpose of the Data Sink is to take the messages that are flowing through our data stream and push them to an external destination. Often, this will be a streaming endpoint, such as Kafka. However, it can just as easily be a relational database or even something like Cassandra. Flink includes a variety of connectors to many popular databases and streaming systems. So how do we create a sink? Depending on what connector is used, the requirements might change, but let's look at how to create a KafkaSink. Kafka Serializer Before we create our sink, we'll need to create a serializer for the data. This will take our Java objects, convert them to the appropriate wire format, and wrap them in a Kafka message. Remember to check out our earlier video on serializers and deserializers for more information. To create a serializer for Kafka records, we can use the KafkaRecordSerializationSchema. It includes a builder with a variety of settings for the record. One of the first things we'll need to set is the topic where our records will be stored. We do that using the setTopic method on the builder. We also need to provide a technique for serializing the payload of each record. In this case, we are using a JsonSerializationSchema, so that each record will be serialized to Json. There are a variety of other settings you might use, depending on your needs, but this gives you the basics. Make sure to check out the documentation for more details. Once you've defined your settings, you just need to build the schema. The build method will return your schema, which can then be used inside of your sink. Kafka Sink Constructing a KafkaSink isn't much different from constructing a KafkaSource. Once again, we make use of a builder. We then need to configure values on the builder, such as the BootstrapServers. However, like with the source, it might be easier to load the necessary configuration from a properties file. Once that's done, we can attach the RecordSerializer that we created earlier. One interesting setting on the sink is the delivery guarantee. Flink supports three Guarantees, including NONE, which is basically a best effort, AT_LEAST_ONCE, which guarantees delivery, but may result in duplicates, or EXACTLY_ONCE, sometimes known as effectively once. This guarantees delivery but also prevents duplicates. These guarantees are supported throughout the data stream so that if Kafka provides both the Source and Sink, Flink can provide the same EXACTLY_ONCE Guarantee that Kafka provides. Once we have configured all of our settings, we just need to build the sink. When that's done, we can attach it to our stream. We do this using the sinkTo method. This will have set up our stream so that when it executes, it will push all the messages to our new sink. It's also a good practice to name your sink. This will make it easier to understand and debug when looking at the job in the Flink Console. Remember, the configuration and setup of each sink, will be different depending on what your endpoint is. Make sure to check the documentation for details on how to set up each type of sink. In the meantime, we can now put this to use in some hands-on exercises. If you aren't already on Confluent Developer, head there now using the link in the video description to access the rest of this course and its hands-on exercises.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.