Get Started Free
‹ Back to courses
course: Apache Flink® 101

Intro to Flink SQL

5 min
David Anderson

David Anderson

Software Practice Lead

Intro to Flink SQL

Overview

Flink SQL is a standards-compliant SQL engine for processing both batch and streaming data with the scalability, performance, and consistency of Apache Flink. This is a very expressive API, based on powerful abstractions, that can be used to quickly develop many common use cases.

This video explains the relationship of Flink SQL to the Table and DataStream APIs. Through an extended example, it illustrates the stream/table duality at the heart of Flink SQL.

Topics:

  • The Flink APIs
  • Stream/Table Duality and Dynamic Tables
  • Features in Flink SQL
  • Streaming vs. Batch in Flink SQL

Resources

Use the promo codes FLINK101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.

Intro to Flink SQL

Hey, David Anderson from Confluent, here to tell you all about Flink SQL. Flink SQL describes itself as a standards-compliant SQL engine for processing both batch and streaming data with the scalability, performance and consistency of Apache Flink. What's notable about this description is that Flink SQL conforms to the ANSI standard for SQL. It's also very interesting that Flink SQL can be used for both batch and stream processing, and of course, Flink SQL takes full advantage of what Flink has to offer. Flink SQL is just one of several APIs available to Flink developers. I've chosen Flink SQL for this course because I want this course to focus on the big ideas in Flink while illustrating those big ideas with concepts that you may already be familiar with such as grouping for computing aggregations. By contrast, Flink's other APIs operate at lower levels of abstraction. Working our way up from the bottom of this diagram, we find process functions. A process function is a primitive building block capable of implementing almost any operation by directly manipulating Flink's state backends and timer services. At this level of the API stack, you're writing code that reacts to each event as it arrives one at a time. Technically speaking process functions are actually part of the DataStream API. Most of the DataStream API is at a slightly higher level of abstraction where the building blocks include abstractions like streams and windows. Approaching the top of the diagram, we find the Table API. In terms of the abstractions involved, the Table API is roughly equivalent to Flink SQL but as the developer using the Table API, you are writing Java or Python code rather than SQL. It's worth noting that these different APIs are all interoperable. It's not that you must choose one of these APIs to the exclusion of the others, in fact, a single Flink application could use all of these APIs together. The previous diagram repeated here depicts the increasing level of abstraction as you move up this API ladder, you might expect that these layers of abstraction correspond to how the code for these APIs is actually layered in the implementation of Flink but that's not the case. The figure on the right shows how the code implementing these APIs is actually organized. Process functions are just part of the DataStream API and the Table API and Flink SQL are two sides of the same coin. The Table API is not implemented on top of the DataStream API. Instead, the DataStream and Table APIs are peers in the implementation. Both are built on top of the same internal, low level Stream Operator API. Now I want to explain how Flink SQL works by walking through an example with you. Imagine we are opening up a shop that sells two products; we sell hats and mugs. In the brief time since we started business, we've received two shipments from our suppliers; a shipment with 50 hats and another shipment with 10 mugs. We've also shipped out an order of 15 hats to a customer. These shipments in and out of our shop are modeled in this shipments table. We also have an inventory table where we are keeping track of our current stock levels. These two tables are related by this INSERT statement. The current stock level for each item is the sum of the shipment counts for that item. Shipments from our suppliers increase the stock level and outbound shipments to our customers reduce our stock. Flink SQL Planner compiles this INSERT statement into a Flink job. This job has the shipments table as its source, the inventory table is the sync and an aggregation operator sits in the middle. After the source, the events are shuffled to group them by item and the aggregation computes separate sums for the hats and the mugs. The result ends up in the inventory table sitting in the sync. Using Flink SQL engine can feel very much like using a database but it's not really a database. With Flink SQL, you are using standard SQL syntax to describe the processing you want Flink to do but unlike a database, none of your data is stored in Flink. When you create a table in Flink SQL, you are describing data that lives somewhere else. In this example, my shipments table is backed by a Kafka topic. The table object that Flink has is just this little bit of metadata describing the schema and the connector properties needed to get Kafka to deliver the data in the right way to my job. What we have with Flink SQL is a kind of stream/table duality. Our tables are actually dynamic tables that change over time and every table is equivalent to a stream of events describing the changes being made to that table. We call a stream of changes like this a change log stream. In a case like this where the only changes happening to the shipments table are the new shipments being appended to the end of the table, the corresponding change log stream is just a stream of INSERT events. What has just happened here is that another shipment has been added to the table. This is the same as adding another INSERT statement to the stream as shown here below the table. This is an example of an append-only or insert-only table. Not all tables are append-only tables, some tables where experienced events that modify or delete existing rows, the change log streams used by Flink SQL contain three additional event types to accommodate different ways that tables can be updated and those additional event types are shown here. Update Before and Update After are a pair of events that work together to update an earlier result and delete has the effect you'd expect. Let's look at an example that illustrates how updates to tables work. Here you see the two tables we've been working with and the query that relates them. A grouped aggregation like this is a good example of a query that produces updates. We begin by handling a shipment that provides our store with 50 hats, this is what this looks like as a row in a table. At the level of events and streams, this puts an event inserting 50 hats into the SQL statement. The result of the query is another stream. Here the query has reacted to the INSERT event in the input stream by emitting an equivalent INSERT event to the output stream. The sync will then perform this insertion in the output table. The same sequence of steps unfolds as our query processes the arrival of a shipment with 10 mugs which ultimately results in a row showing 10 mugs being added to the inventory table. Now something more interesting is about to happen because this third record in the shipments table is going to update the number of hats in the inventory rather than doing another insert. So far, nothing exciting has happened. Our inventory table is an append only or insert only table so another shipment is being streamed into the aggregation. Because this sequel statement is doing grouping, the result is an updating table rather than an append-only table. In response to processing this event that ships out 15 hats, the query has produced a pair of update events. First, an UPDATE BEFORE event retracting the current result that showed 50 hats in the inventory. And second, an UPDATE AFTER event that replaces that old entry with a new one showing 35 hats. Exactly how the sync will perform this retraction and update depends on which sync connector is being used and which format. For example, the inventory table could be a table in a JDBC database or it could be a Kafka topic where the stream of updates is being written in a change log stream format such as Debezium. But conceptually what happens in the sync is that first the UPDATE_BEFORE is processed which removes the old entry from the inventory table and then the sync processes the UPDATE_AFTER which inserts the updated results. What you've seen in this example are the two principle types of streams and tables in Flink SQL. We have append-only tables and streams like the shipments table at the left. And we have updating tables and streams like the inventory table at the right. When we talk about state later in the course, we'll see that this distinction's important because when Flink executes queries like this one that require updates, the Flink SQL runtime has to keep some internal state in order to keep track of what's going on. For example, the Flink SQL Runtime couldn't know to change the inventory from 50 to 35 without storing the current inventory somewhere internally. Now that you've seen a couple of examples of how Flink SQL can be used, I want to step back and show you the big picture. Flink SQL is a rather complete implementation of the SQL standard. If you know SQL well, you'll find that Flink SQL includes pretty much everything you might hope to find and the features listed here are all available for both stream and batch processing. However, there are some interesting differences between the streaming and batch execution modes. First of all, in streaming mode, it is only feasible to sort by time, sorting by anything else isn't possible. As for the streaming joins mentioned here, these are special joins that can be executed more efficiently than regular joins. By contrast, batch mode is more straightforward. In particular, you can sort any way you want to and none of the special case optimizations that are useful for streaming joins are relevant when Flink is running in batch mode. Now that you know a little bit about Flink SQL, I invite you to look at the first hands-on exercise for this course which will give you a hands-on experience with Flink SQL and show you how to switch between streaming and batch modes. The exercises for this course are all available on Confluent Developer. If you aren't already on Confluent Developer, head there now using the link in the video description to access other courses, hands on exercises, and many other resources for continuing your learning journey.