Get Started Free
‹ Back to courses
course: ksqlDB 101

Creating, Exporting, and Importing Data

2 min
Allison

Allison Walther

Integration Architect (Presenter)

Creating and Importing Data

You can insert directly into ksqlDB streams or tables using the CLI:

 INSERT INTO orders(product, quantity, price) VALUES ('widget', 3, 19.99);

You can also insert with the Confluent web UI:

inserting-ksqldb

Finally, you can use a programming language client, such as Java:

 KsqlObject row = new KqslObject()
    .put("product", "widget")
    .put("quantity", 3);
    .put("price", 19.99);

    //insert data into stream
    client.insertInto("orders", row).get();

Streams/Tables Backed by an Existing Kafka Topic

If you have data in an existing Apache Kafka topic, you can create a stream or a table backed by that topic and begin streaming the data into ksqlDB:

 CREATE STREAM people
    WITH (KAFKA_TOPIC='topic1',
    VALUE_FORMAT='AVRO');

Any subsequent data produced to the topic will be streamed into ksqlDB, and any data inserted into the new stream will be written to the Kafka topic automagically.

Streams/Tables with a New Kafka Topic

If you don't have an existing topic, you can create a new stream or table, and ksqlDB will create the backing topic for you.

 CREATE TABLE departments(
    id INT PRIMARY KEY,
    name VARCHAR)
 WITH (KAFKA_TOPIC='dept_topic',
    PARTITIONS=3,
    VALUE_FORMAT='AVRO');

Again, the two-way data flow is automatic.

Importing and Exporting Data from Other Systems Using ksqlDB

ksqlDB can ingest data from systems for processing, and can push data down to other systems. It does this using Kafka Connect, which can be run embedded or separately. Since ksqlDB uses Kafka Connect, users have access to the hundreds of integrations available on Confluent Hub. Some of these include:

  • Databases
  • NoSQL stores
  • Message queues
  • Object stores

Here’s an example of ingesting data from MongoDB into an Apache Kafka topic, which can be used to build a table or stream in ksqlDB:

CREATE SOURCE CONNECTOR SOURCE_MONGODB_UNIFI_01 WITH (
    'connector.class' = 'io.debezium.connector.mongodb.MongoDbConnector',
    'mongodb.hosts' = 'rs0/mongodb:27017',
    'mongodb.name' = 'unifi',
    'collection.whitelist' = 'ace.device, ace.user'
);

With a source connector, you snapshot a database into ksqlDB and then stream any subsequent changes by Change Data Capture (CDC).

Similarly, ksqlDB can create connectors to stream data out to other systems. Here’s an example of pushing the ORDERS_ENRICHED stream, which we created above, to Elasticsearch:

CREATE SINK CONNECTOR SINK_ELASTIC_ORDERS_01 WITH (
    'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
    'topics' = 'ORDERS_ENRICHED',
    'Connection.url' = 'http://elasticsearch:9200',
    'type.name' = '_doc'
);

Importing and Exporting Data from Other Systems Using Confluent Cloud

Confluent Cloud makes it easy to get data into and out of ksqlDB, with fully managed connectors for most popular databases, analytics products, and SaaS applications.

Systems-Using-Confluent-Cloud

Use the promo code KSQLDB101 & CONFLUENTDEV1 to get $25 of free Confluent Cloud usage and skip credit card entry.

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.

Creating, Exporting, and Importing Data

Hi, I'm Alison Walther with Confluent. In this KSQLDB lesson, we'll talk about creating, importing and exporting data. We can insert data directly into ksqlDB streams or tables using the UI CLI or the Java Client. Or if we already have data in an existing Kafka topic, we can create a stream or table on that topic and begin streaming that data into KSQLDB. Any data produced to that topic will be streamed into KSQLDB and any data inserted into our new stream will be written to that Kafka topic automatically. But we don't have to have an existing topic. We can create a new stream or table and KSQLDB will create the backing topic for us. Again, the two way data flow is automatic. We can also read data from and write data to a myriad of other systems, taking advantage of KSQLDB's integration with Kafka connect. Confluent Cloud makes it easy to get data into and out of KSQLDB with fully managed connectors, for many popular databases, analytics engines, and SAS applications. For streaming data into KSQLDB, you can create a source connector. With a source connector, you can snapshot a database into KSQLDB and stream any subsequent changes like change data capture. You can also hook up to dozens of other types of systems, including message queues, flat files, and network endpoints. Using the create source connector syntax and a smattering of configuration. You can do exactly that, create a source connector. At the other end of the pipe, KSQLDB can use Kafka Connect to stream data out to anywhere you want to write it, including NoSQL stores, blobstores, relational databases and more. And that's the end of this lesson.