Get Started Free
‹ Back to courses
course: Apache Flink® Table API: Processing Data Streams in Java

Filtering Data with the Apache Flink® Table API

7 min
Wade Waldron

Wade Waldron

Staff Software Practice Lead

Filtering Data with the Apache Flink® Table API

One of the things that sets Confluent Cloud apart from Open Source Flink is how it handles the createTable statement. In the open source, this statement is a way to point Flink at your existing Kafka topics and schemas. Despite the name, it doesn't create all of the resources necessary for the table to operate. Some resources, such as the Kafka topic, still need to be created manually. However, in Confluent Cloud calling a createTable statement will construct the underlying resources if necessary. This includes the schemas and Kafka topics.

This makes it quite powerful because it allows us to use the Table API in Java to construct the data stream and the underlying resources needed for that stream. It means we can benefit from the power and flexibility of the Java language at all stages of the pipeline rather than just in the middle.

In this video, we'll show how to use the createTable statement to construct a new Kafka topic and its corresponding schemas. You'll see how the statement is translated into SQL code to be executed by Confluent Cloud and how that eventually creates Protobuf, Avro, and JSON schemas when required.

Topics:

  • Why defining a schema for a Kafka topic can be difficult.
  • How the Flink Table API unifies multiple schema formats.
  • How to use the TableDescriptor to specify the schema format.
  • How to configure your Kafka topic.
  • How Confluent Cloud improves the createTable statement and creates resources for you.

Resources

Code

A query to select all blue cars from the cars topic and insert them into another topic named blue_cars.

env.from("cars")
	.select($("*"))
	.where($("color").isEqual("Blue"))
	.insertInto("blue_cars");

A query to select all cars made in 2022 or later.

env.from("cars")
	.select($("*"))
	.where($("year").isGreaterOrEqual(2022))
	.insertInto("cars_from_2022_or_later");

A query to select all cars manufactured in the 1900s.

env.from("cars")
	.select($("*"))
	.where(
		and(
			$("year").isGreaterOrEqual(1900),
			$("year").isLess(2000)
		)
	)
	.insertInto("cars_from_the_1900s");

A variation on the query to select all cars manufactured in the 1900s. This one uses string operations to do the work.

env.from("cars")
	.select($("*"))
	.where($("year").cast(DataTypes.STRING()).like("19%"))
	.insertInto("cars_from_the_1900s");

Use the promo codes FLINKTABLEAPIJAVA & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Filtering Data with the Apache Flink® Table API

One of the challenges I encounter when teaching Apache Flink is explaining how streaming data differs from a traditional database.

The behavior of a WHERE clause is a good example.

I'd like to show you how to implement a WHERE clause using the Flink Table API and highlight some key differences in its behavior.

Let me start by outlining how a WHERE clause works in a traditional database.

Suppose we had a table containing car details such as their make, model, and color.

Here, we see a SQL query that selects all rows and columns from the table.

If we executed this query, the results might look like the table shown.

However, what if we wanted to filter the results and only look at the blue cars?

In that case, we could modify the query using a WHERE clause to select only blue cars.

Now, let's rewrite this query as a Flink Table API statement.

We start by looking at the cars table.

Then we select all fields using a wildcard.

Finally, the where clause uses an API Expression to check if the value in the color field is equal to blue.

Let me make one final adjustment.

Usually, when we work with Flink, we are taking data from one stream and inserting it into another.

So let's take the results of our query and insert them into a destination stream, also known as a sink.

We'll name this stream blue_cars.

Consumers of the blue_cars stream will see something like this with only the highlighted rows from the source appearing downstream.

Now, let's look at a slightly different where clause.

This one checks that the year is greater or equal to 2022.

It gives us a different set of results, again only sending the rows highlighted in blue.

And, one final example.

Suppose, I was looking for all cars made in the 1900s.

I can use an and clause to check that the year falls within the correct range.

Just for fun, let me show you an alternative way to write this query.

We could cast the year to a STRING and then use the like operator

to check if the string starts with 19.

There are many ways to create API expressions for a where clause.

If you are working in an interactive development environment or IDE, it can give hints about what expressions you can create.

However, there's more to this story.

I have shown a simplified view that would apply if we were using a database table.

However, with streaming data, things work differently.

Let's go back to looking for blue cars.

This time, rather than assuming that the table is fully populated, we'll stream the data from the beginning.

Notice that I have added a time field.

This represents the number of seconds since the beginning of the stream.

At the beginning of the stream, or a time of zero, we encounter a Red car.

It gets rejected because it doesn't match our criteria.

I'll highlight the rejected elements in red.

If we treated this like a database query and ran it now, it would return nothing because the only element in the table doesn't match.

But this isn't a database query.

Streaming statements run continuously, emitting new results as they find them.

That means that when a blue car arrives five seconds after we executed the statement, it's still watching for changes.

Which means it can emit the new record to the destination stream.

Then it continues to scan the stream for additional records.

A few more cars go by but they don't match our criteria,

so they are rejected.

Eventually, 60 seconds after the stream started, the statement encounters a second blue car.

and emits it to the destination.

The statement will continue to run like this, constantly looking for new results and emitting them as they are found.

Do you see the difference from how a typical database query operates?

When you run a database query, it looks for the state of the database at the time of the query.

It scans all known records and returns a result, after which the query terminates.

For future results, you have to keep re-running the query.

With a data stream, the statement continuously monitors for new data and emits matching results as they are found.

There's no need to re-run the statement because it runs for as long as the stream exists.

However, that's not the only difference.

So far, we've looked at append-only streams, where data is only added and we never update previous records.

In a database, existing records are often updated.

To support updates, Flink includes retract and upsert streams, but they create challenges.

When an update arrives, previous results may have already been emitted downstream.

If the update would change the results, we need a way to communicate that to consumers.

Here, we see the previous results of our statement, except I have added an extra op column.

This is a special column that indicates the type of operation being performed and it is critical for retract or upsert streams.

In our previous results, all of the operations were inserts, represented by +I.

But what would happen if we encountered the same vehicle a second time except it's been painted a different color?

In that case the stream can emit a retraction event.

This is represented by the -U that you see here.

It indicates that the previous value for this record is no longer valid.

Then, an update can be emitted to supply the new color.

Updates are represented by the +U syntax.

We can see that the car in question has changed from its original color of Blue to a new color of Red.

Except, our query only emits blue cars, and we've already emitted this one.

That means we have to notify downstream consumers that the previously emitted record is being retracted or deleted.

Thankfully, our where clause does this automatically.

It will emit the -U record because it matches our criteria.

And it will drop the +U because it doesn't match.

The end result is that the blue car gets retracted and not replaced.

Some statements, will result in deletions such as the one highlighted in red.

These are represented by the -D operation, and occur in situations where a record is removed and not replaced by anything new.

Regardless of whether a record has been retracted or deleted, consumers will need to handle it appropriately.

The good news is that Flink will automatically do some of the work for you.

It keeps track of the state of your stream and issues corrections where appropriate.

You don't need to worry about it.

However, it's still important to understand how it works in case you need to consume these records outside of Flink.

We also need to be aware that Flink statements run continuously and results may be updated as new information is available.

This is different from a standard database query.

I hope you have learned a lot from this video.

We've got more coming so if something doesn't make sense, or you would like to see us cover a particular subject, leave me a comment and let me know what you are looking for.

Thanks for watching.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.