Get Started Free
course: ksqlDB 101

Creating, Exporting, and Importing Data

2 min
Allison

Allison Walther

Integration Architect (Presenter)

Robin Moffatt

Robin Moffatt

Principal Developer Advocate (Author)

Creating and Importing Data

You can insert directly into ksqlDB streams or tables using the CLI:

 INSERT INTO orders(product, quantity, price) VALUES ('widget', 3, 19.99);

You can also insert with the Confluent web UI:

inserting-ksqldb

Finally, you can use a programming language client, such as Java:

 KsqlObject row = new KqslObject()
    .put("product", "widget")
    .put("quantity", 3);
    .put("price", 19.99);

    //insert data into stream
    client.insertInto("orders", row).get();

Streams/Tables Backed by an Existing Kafka Topic

If you have data in an existing Apache Kafka topic, you can create a stream or a table backed by that topic and begin streaming the data into ksqlDB:

 CREATE STREAM people
    WITH (KAFKA_TOPIC='topic1',
    VALUE_FORMAT='AVRO');

Any subsequent data produced to the topic will be streamed into ksqlDB, and any data inserted into the new stream will be written to the Kafka topic automagically.

Streams/Tables with a New Kafka Topic

If you don't have an existing topic, you can create a new stream or table, and ksqlDB will create the backing topic for you.

 CREATE TABLE departments(
    id INT PRIMARY KEY,
    name VARCHAR)
 WITH (KAFKA_TOPIC='dept_topic',
    PARTITIONS=3,
    VALUE_FORMAT='AVRO');

Again, the two-way data flow is automatic.

Importing and Exporting Data from Other Systems Using ksqlDB

ksqlDB can ingest data from systems for processing, and can push data down to other systems. It does this using Kafka Connect, which can be run embedded or separately. Since ksqlDB uses Kafka Connect, users have access to the hundreds of integrations available on Confluent Hub. Some of these include:

  • Databases
  • NoSQL stores
  • Message queues
  • Object stores

Here’s an example of ingesting data from MongoDB into an Apache Kafka topic, which can be used to build a table or stream in ksqlDB:

CREATE SOURCE CONNECTOR SOURCE_MONGODB_UNIFI_01 WITH (
    'connector.class' = 'io.debezium.connector.mongodb.MongoDbConnector',
    'mongodb.hosts' = 'rs0/mongodb:27017',
    'mongodb.name' = 'unifi',
    'collection.whitelist' = 'ace.device, ace.user'
);

With a source connector, you snapshot a database into ksqlDB and then stream any subsequent changes by Change Data Capture (CDC).

Similarly, ksqlDB can create connectors to stream data out to other systems. Here’s an example of pushing the ORDERS_ENRICHED stream, which we created above, to Elasticsearch:

CREATE SINK CONNECTOR SINK_ELASTIC_ORDERS_01 WITH (
    'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
    'topics' = 'ORDERS_ENRICHED',
    'Connection.url' = 'http://elasticsearch:9200',
    'type.name' = '_doc'
);

Importing and Exporting Data from Other Systems Using Confluent Cloud

Confluent Cloud makes it easy to get data into and out of ksqlDB, with fully managed connectors for most popular databases, analytics products, and SaaS applications.

Systems-Using-Confluent-Cloud

Use the promo code KSQLDB101 to get $25 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.