Get Started Free
course: Apache Flink® 101

Flink and Kafka (Docker)

20 min
David Anderson

David Anderson

Principal Software Practice Lead

Hands-on with Flink and Kafka (Docker)

This exercise is a hands-on introduction to using open source Apache Flink SQL with Apache Kafka. You will learn more about how Flink separates stream processing from stream storage, and the basic ideas behind why open source Flink has two distinct Kafka connectors: kafka and upsert-kafka.

If you haven't already done so, begin this Docker-based exercise by following the setup instructions for the Docker-based exercises.

In what follows, you'll be using the Flink SQL Client (the CLI), which is started with docker compose run sql-client.

What is a table?

In open source Flink SQL, a table is metadata describing how to interpret external storage as a SQL table, or in other words, it's a view onto a data stream stored outside of Flink.

We can experiment with this. Begin by creating a table, backed by a kafka topic named append (we'll see why this name is appropriate in the next section).

CREATE TABLE json_table (
    `key` STRING,
    `value` STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'append',
    'properties.bootstrap.servers' = 'broker:9092',
    'format' = 'json',
    'scan.startup.mode' = 'earliest-offset'
);

The docker setup includes kcat, which you can use to see whether or not creating this table has created the underlying topic in Kafka. You'll want to keep the SQL CLI session open, while running kcat in another terminal:

docker compose exec -it kcat kcat -b broker:9092 -L

At this point, the topic won't have been created. But once you write into it, the topic will be created because the broker is configured with auto.create.topics.enable set to default value, which is true.

INSERT INTO json_table VALUES ('foo','one'), ('foo', 'two');

You can read the data back using SQL:

SELECT * FROM json_table;
----------------------------+----------------------------+
                        key |                      value |
----------------------------+----------------------------+
                        foo |                        one |
                        foo |                        two |

Type Ctrl-C to terminate the SELECT query.

If you are curious, you can also inspect the Kafka topic directly (again, in the other terminal):

docker compose exec -it kcat kcat -b broker:9092 -C -t append -f '\n
\tKey (%K bytes): %k
\tValue (%S bytes): %s
\tPartition: %p
\tOffset: %o
\tTimestamp: %T
\tHeaders: %h
--\n'

To really drive the point home -- namely, that tables are independent of the underlying storage -- back in the Flink SQL CLI let's create another table mapped onto the same topic:

CREATE TABLE raw_table (
    `data` STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'append',
    'properties.bootstrap.servers' = 'broker:9092',
    'format' = 'raw',
    'scan.startup.mode' = 'earliest-offset'
);

This new table interprets the data stored in the append topic differently (note that the format specification has been changed from json to raw). To see this in action, inspect the new table, and compare the results to what you saw before:

SELECT * FROM raw_table;
-----------------------------+
                        data |
-----------------------------+
 {"key":"foo","value":"one"} |
 {"key":"foo","value":"two"} |

Append vs. update streams, and the two Kafka connectors

Flink SQL is designed around the idea of processing changelog streams.

The simplest form of changelog stream is an insert-only, or append stream, where each new message is a new record being appended at the end of the stream -- or if described using database terminology, each new message is a new row being inserted into a table.

The SQL CLI can be configured to show this underlying changelog information:

SET 'sql-client.execution.result-mode' = 'tableau';

Now the output of a SELECT statement will include information about how that row is being processed -- whether it is an insert or part of an update.

For example,

SELECT * FROM json_table;

now shows, via +I, that each of these records is an INSERT event.

+----+----------------------------+----------------------------+
| op |                        key |                      value |
+----+----------------------------+----------------------------+
| +I |                        foo |                        one |
| +I |                        foo |                        two |

Sometimes, operating in this append-only mode is too limiting, and it makes sense to be able to interpret incoming records as updating (or deleting) existing records. Confluent Cloud handles this differently, but with open source Apache Flink, the kafka connector interprets the topic as an append-only stream, and the upsert-kafka connector interprets the topic as an updating stream -- where the updates are done based on the table's primary key.

It's time for another experiment:

CREATE TABLE updating_table (
    `key` STRING PRIMARY KEY NOT ENFORCED,
    `value` STRING
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'update',
    'properties.bootstrap.servers' = 'broker:9092',
    'key.format' = 'json',
    'value.format' = 'json'
);
INSERT INTO updating_table VALUES ('foo','one'), ('foo', 'two');
SELECT * FROM updating_table;
+----+----------------------------+----------------------------+
| op |                        key |                      value |
+----+----------------------------+----------------------------+
| +I |                        foo |                        one |
| -U |                        foo |                        one |
| +U |                        foo |                        two |

Here -U and +U represent the update of the value associated with foo in two steps: an UPDATE_BEFORE that deletes the old value, and an UPDATE_AFTER that inserts the new value.

If you want to go back to the default display mode, that's done like this:

SET 'sql-client.execution.result-mode' = 'table';

Note that only Flink SQL and the Table API are designed for processing changelog streams. With the DataStream API, every stream is an append-only stream.

This topic is explored more deeply in the module on Changelog Processing in the course on Apache Flink SQL, here on Confluent Developer.

Resources

In Confluent Cloud, Flink and Kafka are more closely integrated than what you see here. To learn more about that, see the companion exercise.

Do you have questions or comments? Join us in the #confluent-developer community Slack channel to engage in discussions with the creators of this content.

Use the promo codes FLINK101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.