Staff Software Practice Lead
Writing a Flink select statement using the Table API is relatively simple. However, despite how easy it looks, working with streaming data has challenges you won't encounter when working with traditional database tables.
Streaming data is often unbounded. This means that although a data stream has a beginning, it isn't expected to have an end. Queries have to be written with this in mind. If you attempt to query the entire data set, your query will never end because it has to wait for future records.
This means most queries must be written to allow data to be processed as it's received, rather than batching it for future processing.
In this short video, we'll explore how to write simple select statements and some consequences of working with streaming data.
A simple select statement that pulls all fields and all records from a table named cars.
TableResult result = tableEnvironment
.from("cars")
.select($("*"))
.execute();
The results of the select statement can be printed using the print method.
result.print();
Alternatively, the results can be converted to an iterator using collect method.
CloseableIterator<Row> rows = result.collect();
Or, the results can be sent to another table using insertInto.
TableResult result = tableEnvironment
.from("cars")
.select($("*"))
.insertInto("all_cars")
.execute();
Using the $ syntax, individual fields can be selected, to refine the results.
TableResult result = tableEnvironment
.from("cars")
.select(
$("make"),
$("model"),
$("color")
)
.execute();
It's even possible to programmatically determine what fields to select.
Expression[] fields = getQueryFields();
TableResult result = tableEnvironment
.from("cars")
.select(fields)
.execute();
The as expression can be used to rename a field in the result set.
TableResult result = tableEnvironment
.from("cars")
.select(
$("vin").as("vehicle_identification_number"),
$("make"),
$("model"),
$("year")
)
.execute();
And, the row object can be used to add nested structures to the results.
TableResult result = tableEnvironment
.from("cars")
.select(
$("vin"),
row(
$("make"),
$("model"),
$("year")
).as("details")
)
Did you know, that you can use Apache Flink to query a Kafka topic, the same way you might query a database?
So when your boss comes to you and asks for some arbitrary data you've never had to think about before, there might be a way to extract what you need.
I think that's kind of cool.
But, there are consequences that are different from querying a database.
Let me show you how it works, and we can talk about a few of those differences.
Take a look at this basic SQL statement.
It looks at the table named cars, selects every record, and then extracts all fields from each record.
You can imagine the results might look something like this.
To populate this table, someone could do an inventory at a used car lot.
As they wander through the rows, they write down the details of each car they see.
Eventually, they will run out of cars, and the inventory will be complete.
But what would happen if instead of doing an inventory of a car lot, we asked them to do it for every car on a busy highway?
The data for each vehicle would look roughly the same.
However, on a busy highway, the traffic never stops.
No matter whether it is rush hour, or the middle of the night, traffic continues to flow.
So how do we decide when the inventory is complete?
This is the difference between writing queries for a database table, and writing them for a streaming data source.
Database tables are bounded, with a beginning and an end.
On the other hand, streaming data sources are unbounded.
They have a beginning, but we don't expect them to end.
This has consequences for how we query them.
We can't just grab the entire data set and perform operations against it.
Instead, they have to be executed in an ongoing fashion as the records flow through the system.
With that in mind, let's go back to our original SQL statement.
It could be executed as is, using the SQL API for Apache Flink.
However, I want to focus on the Table API because as we will see, it can do some things, the SQL API can't.
This is the equivalent query written in the Table API.
The from method takes the name of the table we want to query.
It can be just the simple table name or it can include the fully qualified name if necessary.
The select clause takes a series of Expressions which are created using the dollar sign symbol.
Here, we provide an asterisk which acts as a wild card indicating we want the entire record.
However, as we will see in this, and future videos, these Expressions can get significantly more complex.
Finally, we execute the statement.
This translates it into the equivalent SQL before sending it to Confluent Cloud to be executed.
What we get back is a TableResult.
And this is where the streaming nature of the query begins to matter.
The TableResult includes methods such as print which does what you would expect.
It prints the results to the standard output.
However, remember, this is streaming data.
It has no ending.
That means that once we start printing we never stop, and we can never move on to the next line of code.
This makes the print statement useful for debugging, but not much else.
Alternatively, we can obtain an iterator for the stream.
Then, we can do everything we normally do with an iterator, such as looping over the elements and performing actions on each.
But again, remember, this iterator is probably infinite.
If you want to perform other operations while iterating over the records, you might need to build your application in a multi-threaded fashion.
However, the truth is that we rarely use the Table API for read-only queries.
Instead, the results of the queries are written to a separate data stream as we can see here.
In that case, the statement is executed by the Flink engine, and our Java code can move on without waiting for results.
Of course, with this trivial example where we haven't changed the data, writing to a separate data stream seems like a waste of time.
So let's look at ways to modify the data as it is streaming.
If we were tracking cars on a busy highway, we wouldn't keep track of details such as the vin or the year.
These aren't easily visible and we have to limit ourselves to things that are clearly marked.
This would include details such as the make, model, and color.
We can modify our query to reflect this as shown.
Here, you see the the select clause accepts a comma-separated list of the fields we want to examine.
Alternatively, the list can be passed as an array.
This can be useful for programmatically determining the fields, rather than relying on something hard-coded.
That's something that the Table API excels at which can't be done with raw SQL.
But what if the name of the field isn't quite what we want it to be?
Here, we see the as expression being used to modify the name.
We are expanding the vin acronym to the full name of vehicle_identification_number.
The as expression is just one small example.
Many operations can be applied to fields using expressions like this.
In addition, we can also make structural changes to the data.
Here, the make, model, and year are nested inside a row object named details.
When we print the results to the console, the details field is rendered as a JSON object containing the make, model, and year.
However, if we push the results into a new topic, the representation would depend on the type of serializer being used.
These are a few ways we can transform the data inside a simple Flink SELECT statement.
However, these transformations have all been applied to individual records, which makes them easy to use in a streaming setting.
Other transformations, such as aggregations, require a little more care.
In future videos, we will dive into how to apply more complex logic such as filtering, aggregation, and even joining multiple streams.
In the meantime, I hope you found this video helpful.
Feel free to leave a comment and let me know what you think.
Thanks for watching.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.