You can insert directly into ksqlDB streams or tables using the CLI:
INSERT INTO orders(product, quantity, price) VALUES ('widget', 3, 19.99);
You can also insert with the Confluent web UI:
Finally, you can use a programming language client, such as Java:
KsqlObject row = new KqslObject() .put("product", "widget") .put("quantity", 3); .put("price", 19.99); //insert data into stream client.insertInto("orders", row).get();
If you have data in an existing Apache Kafka topic, you can create a stream or a table backed by that topic and begin streaming the data into ksqlDB:
CREATE STREAM people WITH (KAFKA_TOPIC='topic1', VALUE_FORMAT='AVRO');
Any subsequent data produced to the topic will be streamed into ksqlDB, and any data inserted into the new stream will be written to the Kafka topic automagically.
If you don't have an existing topic, you can create a new stream or table, and ksqlDB will create the backing topic for you.
CREATE TABLE departments( id INT PRIMARY KEY, name VARCHAR) WITH (KAFKA_TOPIC='dept_topic', PARTITIONS=3, VALUE_FORMAT='AVRO');
Again, the two-way data flow is automatic.
ksqlDB can ingest data from systems for processing, and can push data down to other systems. It does this using Kafka Connect, which can be run embedded or separately. Since ksqlDB uses Kafka Connect, users have access to the hundreds of integrations available on Confluent Hub. Some of these include:
Here’s an example of ingesting data from MongoDB into an Apache Kafka topic, which can be used to build a table or stream in ksqlDB:
CREATE SOURCE CONNECTOR SOURCE_MONGODB_UNIFI_01 WITH ( 'connector.class' = 'io.debezium.connector.mongodb.MongoDbConnector', 'mongodb.hosts' = 'rs0/mongodb:27017', 'mongodb.name' = 'unifi', 'collection.whitelist' = 'ace.device, ace.user' );
Similarly, ksqlDB can create connectors to stream data out to other systems. Here’s an example of pushing the
ORDERS_ENRICHED stream, which we created above, to Elasticsearch:
CREATE SINK CONNECTOR SINK_ELASTIC_ORDERS_01 WITH ( 'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector', 'topics' = 'ORDERS_ENRICHED', 'Connection.url' = 'http://elasticsearch:9200', 'type.name' = '_doc' );
Confluent Cloud makes it easy to get data into and out of ksqlDB, with fully managed connectors for most popular databases, analytics products, and SaaS applications.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.