In this tutorial, you will implement a Retrieval-Augmented Generation (RAG) use case. You’ll use vector search over embeddings generated in Part 1 of this tutorial series to augment a user's query in order to understand uncommon terminology.
To create your database and collection:
You'll now land the embeddings generated in Part 1 of this tutorial series in your Atlas cluster.
confluent connect cluster create --config-file ./mongodb-atlas-sink-connector.json
To enable vector search in Confluent Cloud, create a vector index in MongoDB Atlas:
It may take a moment for the index to build and be queryable.
Before you can create a MongoDB external table in Confluent Cloud, first create a connection to the vector index. Use the same admin user password that you used when provisioning the sink connector earlier. Note that the endpoint needs mongodb+srv:// prepended. If you need your environment ID of the form env-123456, run confluent environment list on the command line. Finally, if you are running Atlas in a different cloud provider or region, change that here.
confluent flink connection create mongodb-connection \
--cloud AWS \
--region us-east-1 \
--type mongodb \
--endpoint mongodb+srv://ATLAS_HOST/ \
--username admin \
--password ATLAS_ADMIN_USER_PASSWORD \
--environment ENVIRONMENT_ID
Create a MongoDB external table in the Flink SQL shell that references the connection created in the previous step. For the sake of clarity, we are only going to return the articleType field. Keep in mind, though, that the vector search will run against embeddings generated from the concatenation of all product catalog fields.
CREATE TABLE mongodb_products (
articleType STRING,
vector ARRAY<FLOAT>
) WITH (
'connector' = 'mongodb',
'mongodb.connection' = 'mongodb-connection',
'mongodb.database' = 'store',
'mongodb.collection' = 'products',
'mongodb.index' = 'vector_index',
'mongodb.embedding_column' = 'vector',
'mongodb.numCandidates' = '5'
);
In the Flink SQL shell, simulate a stream of uncommon slang user queries for footwear (kicks, footies), shorts (cutoffs), and a hat (lid):
CREATE TABLE queries (
query STRING
) WITH (
'value.format' = 'json-registry'
);
INSERT INTO queries values ('kicks'), ('footies'), ('cutoffs'), ('lid');
Create a table for query vectors using the same embedding model used for product embeddings in Part 1:
CREATE TABLE query_vectors (
query STRING,
vector ARRAY<FLOAT>
) WITH (
'value.format' = 'json-registry'
);
INSERT INTO query_vectors
(
SELECT query, vector
FROM queries,
LATERAL TABLE(ML_PREDICT('vector_embedding', query))
);
Query the vector index to find products for each slang query:
SELECT query, search_results
FROM query_vectors,
LATERAL TABLE(VECTOR_SEARCH_AGG(mongodb_products, DESCRIPTOR(vector), query_vectors.vector, 3));
Your results will vary from the following depending on the products that you generated in Part 1 of this tutorial series, but you should see sensible search results that match the slang queries:
query search_results
footies [(shoes, [0.020776896, 0.023529341, ...
cutoffs [(shorts, [0.0352604, -0.0070558237, ...
kicks [(sandals, [0.030727627, -0.009568145, ...
lid [(hat, [0.007294555, 0.022405202, ...
When you finish experimenting, delete the confluent-rag_environment environment in order to clean up the Confluent Cloud infrastructure created for this tutorial. Run the following command in your terminal to get the environment ID of the form env-123456 corresponding to the environment named confluent-rag_environment:
confluent environment list
Delete the environment:
confluent environment delete <ENVIRONMENT_ID>