Get Started Free
course: Apache Flink® 101

Flink and Kafka (Confluent Cloud)

20 min
David Anderson

David Anderson

Principal Software Practice Lead

Hands-on with Flink and Kafka (Confluent Cloud)

This exercise will help you understand the special relationship between Flink and Kafka in Confluent Cloud.

With Confluent Cloud for Apache Flink, you can directly access all of the Kafka environments, clusters, and topics that you already have in Confluent Cloud, without creating any additional metadata. You never need to declare tables manually for existing topics. Moreover, creating a table in Flink automatically creates the associated topic and schema.

To gain a deeper appreciation of the value Confluent Cloud brings to Flink SQL, you may find it interesting to first go through the companion exercise based on open source Flink.

If you haven't already done so, you can begin this exercise by following the instructions for getting started with Confluent Cloud.

Note: The instructions below will guide you in using the Confluent CLI, but you can easily do all of this using the web UI at https://confluent.cloud instead, if you'd prefer.

What is a table?

In Confluent Cloud for Apache Flink, a table is a combination of some or all of these components:

  • some metadata
  • schemas stored in Schema Registry
  • data stored in a Kafka topic
  • data stored in Apache Iceberg (using Tableflow)

Create a simple table

If it's not already running, begin by using confluent flink shell to start a Flink shell, and then create a table:

CREATE TABLE simple_table (
  `key` STRING,
  `value` STRING
);

Note: if this fails, wait a moment and try again. (It can happen that some of the cloud resources provisioned by the quickstart won't be immediately available.)

Examine the associated topic in Confluent Cloud

Now let's go take a look at the resources that were just created in Confluent Cloud. To do this, first determine the IDs of the catalog and database in which this table has been created:

show catalogs;
show databases;

Now open another terminal window, and do the following to set your context to the Kafka cluster in which the topic for simple_table was just created:

Using the Catalog ID obtained above, execute

confluent environment use {Catalog ID}

Then using the Database ID obtained above, execute

confluent kafka cluster use {Database ID}

Create an API key to use with this cluster,

confluent api-key create --resource {Database ID}

and set that key into the context of the Confluent CLI

confluent api-key use {API Key} --resource {Database ID}

Now we can poke around inside Kakfa:

confluent kafka topic list

Start consuming from this topic by executing the following:

confluent kafka topic consume --value-format avro --from-beginning simple_table

Note: avro is the default format for Flink tables in Confluent Cloud.

Leave that Kafka consumer running, and use the Flink shell in the other terminal to produce data to the simple_table topic by inserting data into the simple_table table:

INSERT INTO simple_table VALUES ('foo','one'), ('foo', 'two');

The Kafka consumer will then display something like this:

{"key":{"string":"foo"},"value":{"string":"one"}}
{"key":{"string":"foo"},"value":{"string":"two"}}

When you're done with this, type Ctrl-C (^C) to exit the Kafka consumer.

Examine the schema(s)

This table has just one schema stored in the Schema Registry, named simple_table_value. If this table had a primary key, that key would be used as the Kafka key, and it would have its own schema.

To find the schema(s):

confluent schema-registry schema list

To examine a schema, using the ID obtained above:

confluent schema-registry schema describe {ID}
Schema ID: 100001
Type: AVRO
Schema:
{
    "type": "record",
    "name": "simple_table_value",
    "namespace": "org.apache.flink.avro.generated.record",
    "fields": [
        {
            "name": "key",
            "type": [
                "null",
                "string"
            ],
            "default": null
        },
        {
            "name": "value",
            "type": [
                "null",
                "string"
            ],
            "default": null
        }
    ]
}

Changelog modes

If you've been through the related content on open source Flink, you will have learned that Apache Flink has two Kafka connectors, one for handling append-only streams, and another for streams that may include updates and/or deletions.

In Confluent Cloud, this is handled differently. Instead, there is a single confluent connector, which can be configured to use various changelog modes.

In the SQL Shell, use show create table simple_table to reveal the full specification of simple_table, including all of the defaults:

> show create table simple_table;

+-------------------------------------------------------------------+
|                         SHOW CREATE TABLE                         |
+-------------------------------------------------------------------+
| CREATE TABLE `flink101_environment`.`lkc-w8077w`.`simple_table` ( |
|   `key` VARCHAR(2147483647),                                      |
|   `value` VARCHAR(2147483647)                                     |
| )                                                                 |
| DISTRIBUTED INTO 6 BUCKETS                                        |
| WITH (                                                            |
|   'changelog.mode' = 'append',                                    |
|   'connector' = 'confluent',                                      |
|   'kafka.cleanup-policy' = 'delete',                              |
|   'kafka.compaction.time' = '0 ms',                               |
|   'kafka.max-message-size' = '2097164 bytes',                     |
|   'kafka.retention.size' = '0 bytes',                             |
|   'kafka.retention.time' = '0 ms',                                |
|   'scan.bounded.mode' = 'unbounded',                              |
|   'scan.startup.mode' = 'earliest-offset',                        |
|   'value.format' = 'avro-registry'                                |
| )                                                                 |
|                                                                   |
+-------------------------------------------------------------------+

There's a lot going on here, including the specification of various properties for the underlying Kafka topic, the number of partitions (the 6 BUCKETS), the value format, and the changelog mode (which is append).

To create an updating table instead:

CREATE TABLE updating_table (
  `key` STRING PRIMARY KEY NOT ENFORCED,
  `value` STRING
) WITH (
  'changelog.mode' = 'upsert'
);

Using SHOW CREATE TABLE updating_table, we can see how this has changed the defaults:

+---------------------------------------------------------------------+
|                          SHOW CREATE TABLE                          |
+---------------------------------------------------------------------+
| CREATE TABLE `flink101_environment`.`lkc-w8077w`.`updating_table` ( |
|   `key` VARCHAR(2147483647) NOT NULL,                               |
|   `value` VARCHAR(2147483647),                                      |
|   CONSTRAINT `PK_key` PRIMARY KEY (`key`) NOT ENFORCED              |
| )                                                                   |
| DISTRIBUTED BY HASH(`key`) INTO 6 BUCKETS                           |
| WITH (                                                              |
|   'changelog.mode' = 'upsert',                                      |
|   'connector' = 'confluent',                                        |
|   'kafka.cleanup-policy' = 'compact',                               |
|   'kafka.compaction.time' = '7 d',                                  |
|   'kafka.max-message-size' = '2097164 bytes',                       |
|   'kafka.retention.size' = '0 bytes',                               |
|   'kafka.retention.time' = '0 ms',                                  |
|   'key.format' = 'avro-registry',                                   |
|   'scan.bounded.mode' = 'unbounded',                                |
|   'scan.startup.mode' = 'earliest-offset',                          |
|   'value.format' = 'avro-registry'                                  |
| )                                                                   |
|                                                                     |
+---------------------------------------------------------------------+

Note that if you want to retain the full history, you can. It's not necessary to set the cleanup-policy so that it compacts the topic.

This topic is explored more deeply in the module on Changelog Processing in the course on Apache Flink SQL, here on Confluent Developer, and in the Flink SQl Examples in the documentation.

Finish

To delete the tables you've created, and their associated topics and schemas, you can first list the tables,

> SHOW TABLES;

+-----------------------------+
|         Table Name          |
+-----------------------------+
| simple_table                |
| updating_table              |
+-----------------------------+

and then drop them. E.g.,

DROP TABLE simple_table;
DROP TABLE updating_table;

In Confluent Cloud, dropping a table removes associated resources, like topics and schemas. This is contrast to how this works in open source Apache Flink, where SQL tables are more strongly decoupled from their underlying storage.

Resources

Do you have questions or comments? Join us in the #confluent-developer community Slack channel to engage in discussions with the creators of this content.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.