Integration Architect (Presenter)
You can insert directly into ksqlDB streams or tables using the CLI:
INSERT INTO orders(product, quantity, price) VALUES ('widget', 3, 19.99);
You can also insert with the Confluent web UI:
Finally, you can use a programming language client, such as Java:
KsqlObject row = new KqslObject()
.put("product", "widget")
.put("quantity", 3);
.put("price", 19.99);
//insert data into stream
client.insertInto("orders", row).get();
If you have data in an existing Apache Kafka topic, you can create a stream or a table backed by that topic and begin streaming the data into ksqlDB:
CREATE STREAM people
WITH (KAFKA_TOPIC='topic1',
VALUE_FORMAT='AVRO');
Any subsequent data produced to the topic will be streamed into ksqlDB, and any data inserted into the new stream will be written to the Kafka topic automagically.
If you don't have an existing topic, you can create a new stream or table, and ksqlDB will create the backing topic for you.
CREATE TABLE departments(
id INT PRIMARY KEY,
name VARCHAR)
WITH (KAFKA_TOPIC='dept_topic',
PARTITIONS=3,
VALUE_FORMAT='AVRO');
Again, the two-way data flow is automatic.
ksqlDB can ingest data from systems for processing, and can push data down to other systems. It does this using Kafka Connect, which can be run embedded or separately. Since ksqlDB uses Kafka Connect, users have access to the hundreds of integrations available on Confluent Hub. Some of these include:
Here’s an example of ingesting data from MongoDB into an Apache Kafka topic, which can be used to build a table or stream in ksqlDB:
CREATE SOURCE CONNECTOR SOURCE_MONGODB_UNIFI_01 WITH (
'connector.class' = 'io.debezium.connector.mongodb.MongoDbConnector',
'mongodb.hosts' = 'rs0/mongodb:27017',
'mongodb.name' = 'unifi',
'collection.whitelist' = 'ace.device, ace.user'
);
Similarly, ksqlDB can create connectors to stream data out to other systems. Here’s an example of pushing the ORDERS_ENRICHED stream, which we created above, to Elasticsearch:
CREATE SINK CONNECTOR SINK_ELASTIC_ORDERS_01 WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'topics' = 'ORDERS_ENRICHED',
'Connection.url' = 'http://elasticsearch:9200',
'type.name' = '_doc'
);
Confluent Cloud makes it easy to get data into and out of ksqlDB, with fully managed connectors for most popular databases, analytics products, and SaaS applications.
We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.
Hi, I'm Alison Walther with Confluent. In this KSQLDB lesson, we'll talk about creating, importing and exporting data. We can insert data directly into ksqlDB streams or tables using the UI CLI or the Java Client. Or if we already have data in an existing Kafka topic, we can create a stream or table on that topic and begin streaming that data into KSQLDB. Any data produced to that topic will be streamed into KSQLDB and any data inserted into our new stream will be written to that Kafka topic automatically. But we don't have to have an existing topic. We can create a new stream or table and KSQLDB will create the backing topic for us. Again, the two way data flow is automatic. We can also read data from and write data to a myriad of other systems, taking advantage of KSQLDB's integration with Kafka connect. Confluent Cloud makes it easy to get data into and out of KSQLDB with fully managed connectors, for many popular databases, analytics engines, and SAS applications. For streaming data into KSQLDB, you can create a source connector. With a source connector, you can snapshot a database into KSQLDB and stream any subsequent changes like change data capture. You can also hook up to dozens of other types of systems, including message queues, flat files, and network endpoints. Using the create source connector syntax and a smattering of configuration. You can do exactly that, create a source connector. At the other end of the pipe, KSQLDB can use Kafka Connect to stream data out to anywhere you want to write it, including NoSQL stores, blobstores, relational databases and more. And that's the end of this lesson.