Principal Software Practice Lead
This exercise will help you understand the special relationship between Flink and Kafka in Confluent Cloud.
With Confluent Cloud for Apache Flink, you can directly access all of the Kafka environments, clusters, and topics that you already have in Confluent Cloud, without creating any additional metadata. You never need to declare tables manually for existing topics. Moreover, creating a table in Flink automatically creates the associated topic and schema.
To gain a deeper appreciation of the value Confluent Cloud brings to Flink SQL, you may find it interesting to first go through the companion exercise based on open source Flink.
If you haven't already done so, you can begin this exercise by following the instructions for getting started with Confluent Cloud.
Note: The instructions below will guide you in using the Confluent CLI, but you can easily do all of this using the web UI at https://confluent.cloud instead, if you'd prefer.
In Confluent Cloud for Apache Flink, a table is a combination of some or all of these components:
If it's not already running, begin by using confluent flink shell to start a Flink shell, and then create a table:
CREATE TABLE simple_table (
`key` STRING,
`value` STRING
);
Note: if this fails, wait a moment and try again. (It can happen that some of the cloud resources provisioned by the quickstart won't be immediately available.)
Now let's go take a look at the resources that were just created in Confluent Cloud. To do this, first determine the IDs of the catalog and database in which this table has been created:
show catalogs;
show databases;
Now open another terminal window, and do the following to set your context to the Kafka cluster in which the topic for simple_table was just created:
Using the Catalog ID obtained above, execute
confluent environment use {Catalog ID}
Then using the Database ID obtained above, execute
confluent kafka cluster use {Database ID}
Create an API key to use with this cluster,
confluent api-key create --resource {Database ID}
and set that key into the context of the Confluent CLI
confluent api-key use {API Key} --resource {Database ID}
Now we can poke around inside Kakfa:
confluent kafka topic list
Start consuming from this topic by executing the following:
confluent kafka topic consume --value-format avro --from-beginning simple_table
Note: avro is the default format for Flink tables in Confluent Cloud.
Leave that Kafka consumer running, and use the Flink shell in the other terminal to produce data to the simple_table topic by inserting data into the simple_table table:
INSERT INTO simple_table VALUES ('foo','one'), ('foo', 'two');
The Kafka consumer will then display something like this:
{"key":{"string":"foo"},"value":{"string":"one"}}
{"key":{"string":"foo"},"value":{"string":"two"}}
When you're done with this, type Ctrl-C (^C) to exit the Kafka consumer.
This table has just one schema stored in the Schema Registry, named simple_table_value. If this table had a primary key, that key would be used as the Kafka key, and it would have its own schema.
To find the schema(s):
confluent schema-registry schema list
To examine a schema, using the ID obtained above:
confluent schema-registry schema describe {ID}
Schema ID: 100001
Type: AVRO
Schema:
{
"type": "record",
"name": "simple_table_value",
"namespace": "org.apache.flink.avro.generated.record",
"fields": [
{
"name": "key",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "value",
"type": [
"null",
"string"
],
"default": null
}
]
}
If you've been through the related content on open source Flink, you will have learned that Apache Flink has two Kafka connectors, one for handling append-only streams, and another for streams that may include updates and/or deletions.
In Confluent Cloud, this is handled differently. Instead, there is a single confluent connector, which can be configured to use various changelog modes.
In the SQL Shell, use show create table simple_table to reveal the full specification of simple_table, including all of the defaults:
> show create table simple_table;
+-------------------------------------------------------------------+
| SHOW CREATE TABLE |
+-------------------------------------------------------------------+
| CREATE TABLE `flink101_environment`.`lkc-w8077w`.`simple_table` ( |
| `key` VARCHAR(2147483647), |
| `value` VARCHAR(2147483647) |
| ) |
| DISTRIBUTED INTO 6 BUCKETS |
| WITH ( |
| 'changelog.mode' = 'append', |
| 'connector' = 'confluent', |
| 'kafka.cleanup-policy' = 'delete', |
| 'kafka.compaction.time' = '0 ms', |
| 'kafka.max-message-size' = '2097164 bytes', |
| 'kafka.retention.size' = '0 bytes', |
| 'kafka.retention.time' = '0 ms', |
| 'scan.bounded.mode' = 'unbounded', |
| 'scan.startup.mode' = 'earliest-offset', |
| 'value.format' = 'avro-registry' |
| ) |
| |
+-------------------------------------------------------------------+
There's a lot going on here, including the specification of various properties for the underlying Kafka topic, the number of partitions (the 6 BUCKETS), the value format, and the changelog mode (which is append).
To create an updating table instead:
CREATE TABLE updating_table (
`key` STRING PRIMARY KEY NOT ENFORCED,
`value` STRING
) WITH (
'changelog.mode' = 'upsert'
);
Using SHOW CREATE TABLE updating_table, we can see how this has changed the defaults:
+---------------------------------------------------------------------+
| SHOW CREATE TABLE |
+---------------------------------------------------------------------+
| CREATE TABLE `flink101_environment`.`lkc-w8077w`.`updating_table` ( |
| `key` VARCHAR(2147483647) NOT NULL, |
| `value` VARCHAR(2147483647), |
| CONSTRAINT `PK_key` PRIMARY KEY (`key`) NOT ENFORCED |
| ) |
| DISTRIBUTED BY HASH(`key`) INTO 6 BUCKETS |
| WITH ( |
| 'changelog.mode' = 'upsert', |
| 'connector' = 'confluent', |
| 'kafka.cleanup-policy' = 'compact', |
| 'kafka.compaction.time' = '7 d', |
| 'kafka.max-message-size' = '2097164 bytes', |
| 'kafka.retention.size' = '0 bytes', |
| 'kafka.retention.time' = '0 ms', |
| 'key.format' = 'avro-registry', |
| 'scan.bounded.mode' = 'unbounded', |
| 'scan.startup.mode' = 'earliest-offset', |
| 'value.format' = 'avro-registry' |
| ) |
| |
+---------------------------------------------------------------------+
Note that if you want to retain the full history, you can. It's not necessary to set the cleanup-policy so that it compacts the topic.
This topic is explored more deeply in the module on Changelog Processing in the course on Apache Flink SQL, here on Confluent Developer, and in the Flink SQl Examples in the documentation.
To delete the tables you've created, and their associated topics and schemas, you can first list the tables,
> SHOW TABLES;
+-----------------------------+
| Table Name |
+-----------------------------+
| simple_table |
| updating_table |
+-----------------------------+
and then drop them. E.g.,
DROP TABLE simple_table;
DROP TABLE updating_table;
In Confluent Cloud, dropping a table removes associated resources, like topics and schemas. This is contrast to how this works in open source Apache Flink, where SQL tables are more strongly decoupled from their underlying storage.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.