In this tutorial, you will generate vector embeddings on retail product catalog data. A source connector ingests and writes unstructured source data to a topic. Flink SQL then converts this data into vector embeddings and inserts into a new topic.
This tutorial is a building block for real-time GenAI applications including RAG and is based on the webinar How to Build RAG Using Confluent with Flink AI Model Inference and MongoDB.
Once vector encoding is complete, you can leverage vector embeddings and vector search to build use cases including:
git clone git@github.com:confluentinc/tutorials.git
cd tutorials/gen-ai-vector-embedding/flinksql
The confluent-flink-quickstart CLI plugin creates all the resources that you need to get started with Confluent Cloud for Apache Flink. Install it by running:
confluent plugin install confluent-flink-quickstart
Run the plugin as follows to create the Confluent Cloud resources needed for this tutorial and generate a Table API client configuration file. Note that you may specify a different cloud provider (gcp or azure) or region. You can find supported regions for a given cloud provider by running confluent flink region list --cloud <CLOUD>.
confluent flink quickstart \
--name confluent-rag \
--max-cfu 10 \
--region us-east-1 \
--cloud aws
Once the infrastructure is provisioned, you will drop into a Flink SQL shell. Leave this terminal window open as we will return to it in later steps to execute SQL statements.
Before converting data into vectors, let's generate some sample retail data by adding a Datagen Source Connector.
First, create an API key for the connector to connect to Kafka:
confluent api-key create --resource $(confluent kafka cluster describe -o json | jq -r .id)
Associate that API key with the Kafka cluster in order to make it more convenient to run later commands:
confluent api-key use <API KEY>
Substitute the API key and secret for YOUR_API_KEY and YOUR_API_SECRET, respectively, in datagen-product-updates-connector.json.
Provision the connector:
confluent connect cluster create --config-file ./datagen-product-updates-connector.json
Once the connector is provisioned, verify that the product-updates topic is populated. You may need to wait a minute if you get an error Error: topic "product-updates" does not exist.
confluent kafka topic consume product-updates --from-beginning --value-format jsonschema
You should see messages like:
{"ageGroup":"adult","articleType":"shirt","baseColor":"blue","brandName":"Lalonde","count":9,"fashionType":"core","gender":"female","price":67.22,"product_id":443150,"season":"fall","size":"petite","store_id":21}
Enter Ctrl+C to exit the console consumer.
Paste your OpenAI API key in the following command to create a connection named openai-vector-connection that we will use in Flink SQL to generate vector embeddings. This connection resides in AWS region us-east-1. Substitute your infrastructure location if you provisioned in a different cloud provider or region.
confluent flink connection create openai-embedding-connection \
--cloud aws \
--region us-east-1 \
--environment $(confluent environment describe -o json | jq -r .id) \
--type openai \
--endpoint 'https://api.openai.com/v1/embeddings' \
--api-key '<OPENAI API KEY>'
Return to the Flink SQL shell that the confluent flink quickstart ... command opened. If you closed out of it you can open a new shell session by running confluent flink shell.
Before we can generate vector embeddings, we need to define a remote model in Confluent Cloud for Apache Flink:
CREATE MODEL `vector_embedding`
INPUT (input STRING)
OUTPUT (vector ARRAY<FLOAT>)
WITH(
'TASK' = 'embedding',
'PROVIDER' = 'openai',
'OPENAI.CONNECTION' = 'openai-embedding-connection'
);
Test that you can generate vector embeddings on the articleType field of the product-updates table:
SELECT articleType, vector
FROM `product-updates`,
LATERAL TABLE(ML_PREDICT('vector_embedding', articleType))
LIMIT 1;
You should see output like:
articleType vector
sweater [-0.005236239, -0.016405802, ...
In the SQL shell, create a derived product catalog table that includes a new column to hold the concatenated text for each product and a column for the vector embedding:
CREATE TABLE product_vector (
store_id INT,
product_id INT,
`count` INT,
articleType STRING,
size STRING,
fashionType STRING,
brandName STRING,
baseColor STRING,
gender STRING,
ageGroup STRING,
price DOUBLE,
season STRING,
content STRING,
vector ARRAY<FLOAT>
) WITH (
'value.format' = 'json-registry'
);
Now populate this table via the following INSERT SELECT statement:
INSERT INTO product_vector
(
WITH product_content as (
SELECT
*,
concat_ws(' ', size, ageGroup, gender, season, fashionType, brandName, baseColor, articleType,
', price: ' || cast(price as string), ', store number: ' || cast(store_id as string),
', product id: ' || cast(product_id as string)
) as content
FROM `product-updates`
)
SELECT store_id, product_id, `count`, articleType, size, fashionType, brandName,
baseColor, gender, ageGroup, price, season, content, vector
FROM product_content,
LATERAL TABLE(ML_PREDICT('vector_embedding', content))
);
This command will run continuously, so press Enter to detach once prompted to do so.
As the last step, query the generated embeddings in the product_vector table:
SELECT vector, content
FROM product_vector;
You should see output like:
vector content
[0.060939293, -0.022118… petite adult female su…
[0.022782113, -0.020454… medium infant female f…
[0.057449587, 0.0012455… small infant female su…
[0.025510406, -0.007447… extralarge child male …
...
In this tutorial, you learned how to generate vector embeddings from string data in Kafka messages using Flink SQL.
As a next step, you can further build out this streaming data pipeline by adding a sink connector in Confluent Cloud to store these embeddings in a vector database. To deploy a sink connector on Confluent Cloud, navigate to the Connectors page in the Confluent Cloud Console or use the Confluent CLI. This setup enables you to continuously stream real-time vector embeddings from Flink SQL into your vector database.
For guidance on setting up a vector database sink connector, refer to the following resources:
When you finish experimenting, delete the confluent-rag_environment environment in order to clean up the Confluent Cloud infrastructure created for this tutorial. Run the following command in your terminal to get the environment ID of the form env-123456 corresponding to the environment named confluent-rag_environment:
confluent environment list
Delete the environment:
confluent environment delete <ENVIRONMENT_ID>