Get Started Free
‹ Back to courses
course: Apache Flink® Table API: Processing Data Streams in Java

Writing to a Table with the Apache Flink® Table API

6 min
Wade Waldron

Wade Waldron

Staff Software Practice Lead

Writing to a Table with the Apache Flink® Table API

On first glance, inserting data into a table using the Flink Table API seems trivial. It's a single function call with a single parameter. However, the consequences of that change are much more broad.

For example, that simple change can force you to restructure the data. It can impact the lifecycle of the Flink statement. It can also affect whether Confluent Cloud can enable auto-scaling.

In this video, you will learn more about how each of these things is impacted when you use the insertInto clause in the Flink Table API.

Topics:

  • How to structure data in preparation for writing to another table.
  • How to use the insertInto clause.
  • Why the insertInto clause changes the lifecycle of a statement.
  • Why reading a TableResult after insertInto causes the application to hang.
  • How the insertInto clause impacts auto-scaling.

Resources

Code

A sample query that selects blue cars from a table and inserts them into a new table, modifying the structure of the data along the way.

env.from("`dealership`.`inventory`.`cars`")
  .where($("color").isEqual("Blue"))
  .select(
    $("vin"),
    row(
      $("make"),
      $("model"),
      $("year")
    ).as("details")
  )
  .insertInto("`dealership`.`inventory`.`blue_cars`")
  .execute();

Use the promo codes FLINKTABLEAPIJAVA & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Writing to a Table with the Apache Flink® Table API

Using the Flink Table API to write to an output table, or sink, seems relatively trivial.

However, as I discovered, a lot is happening behind the scenes, and it's important to understand.

I'll start by showing you the trivial bits, in other words, we'll write the code.

But stick around to the end because that's where it gets interesting.

Imagine a Kafka topic named cars that contains vehicle inventory data.

Each record contains a vehicle identification number,

as well as the make,

model,

year,

and color of the vehicle.

A query like this would allow us to get a stream of only the blue cars from our table.

However, rather than consuming the data in our Java application, we want to push it into a new Kafka topic.

But, what if downstream consumers expect a different format?

Because the topic only contains blue cars, the downstream doesn't need the color.

Meanwhile, the make, model, and year have been grouped into a "details" object.

We can accommodate this by altering our query to include a select clause mapping our data to the appropriate format.

This is the first thing to remember when we write data to another table.

The format has to match the schema of the table or the write will fail.

To perform the write, we can use the insertInto clause as shown here.

It accepts the name of the destination table.

In this example, we are using fully qualified names.

However, if we set the correct catalog and database ahead of time, we can use simple names instead.

The code for writing to a table is really that simple, but, despite how easy it looks, there are some details we need to keep in mind.

The first obvious one is that the destination table has to exist.

In Confluent Cloud, this means creating both the topic and associated schemas.

This can be done manually, or using the createTable statement through the Table API.

A less obvious consequence of these writes is how they change the lifecycle of our statement.

Statements in Flink can only exist as long as they have:

a data source to draw from,

and a data sink to write to.

But wait...Where was the sink before we added the insertInto clause?

In that scenario, the sink was our Java application.

As long as the application consumes the TableResult, the statement continues to execute.

But what would happen if we were to terminate the Java application?

In that case, Confluent Cloud would eventually detect that the sink is no longer available and shut down the statement.

However, the behavior changes once the insertInto clause is included.

In this case, the sink is no longer our Java app.

Instead, it is the destination Kafka topic.

And as long as the Kafka topic exists, Flink can continue to write data.

So even if we terminate our Java application,

the statement will continue to run for as long as the source and sink exist.

This is an important consideration because if we aren't careful, we can end up with statements that continue to run when we think they have been terminated.

This can lead to duplication in the statements and the data they produce.

When building streams, keep an eye on the active statements to ensure this doesn't come back to bite later.

Another important change is how the result of the statement behaves.

The example here selects all blue cars and iterates over the results, printing them to the console.

This is backed by an active Kafka topic and would print each blue car as it is read from the source.

It would run for as long as the source exists, continuing to print new records.

But what happens when we add the insertInto clause?

In this case, the individual records aren't sent to our TableResult.

Instead, they are pushed into the destination topic.

The TableResult would indicate how many rows were impacted by the query...in theory.

But, this is where it gets tricky.

Remember, our stream will continue to run, as long as the source and sink exist.

That means it's unbounded.

And because it is unbounded, the stream never completes.

This means that trying to access the number of rows affected would have to wait forever.

In other words, it would hang until the stream is terminated.

A best practice, then, is to avoid trying to scan the results of an unbounded insert statement, because it is unlikely to yield anything.

Now, there's one final difference I want to talk about, and it relates to autoscaling.

When a query is consumed in a Java application, it's single-threaded.

We could try to write the application in a multi-threaded manner,

however, in this scenario, Confluent Cloud only has access to the data source, and the SQL generated by the Table API code.

It lacks critical information about the data sink.

As a result, for these types of queries, it disables auto-scaling.

However, once an insertInto clause is added, Confluent Cloud will control the source, the sink, and the SQL statement.

This gives it the power to decide how the job should scale.

This allows Confluent Cloud to enable auto-scaling and more efficiently use the resources available.

There are other differences that I haven't gone into here.

For example, checkpointing becomes enabled when using the insertInto statement.

The key thing to recognize is that queries that dump data to the console or do other processing inside your Java app are rare and often inefficient.

Instead, most Flink statement in Confluent Cloud operate as a pipeline, with both source and sink Kafka topics.

Rather than thinking of a Java application as the endpoint for the data, we need to think of it as a stop along the way.

The end goal is to build rich pipelines that transform data and move it from one topic to another.

Hopefully, you've learned a few things about how to write data using the Flink Table API.

If you have questions about anything discussed, feel free to drop me a comment.

There's more content like this coming, so keep an eye open.

Thanks for watching.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.