Course: ksqlDB 101

Creating, Exporting, and Importing Data

2 min
Allison WaltherIntegration Architect (Course Presenter)
Robin MoffattStaff Developer Advocate (Course Author)

Creating and Importing/Exporting Data

You can insert directly into ksqlDB streams or tables using the CLI or the Confluent Cloud WebUI:

   INSERT INTO people VALUES('Mary', 'Smith');

You can also insert with the web UI:

   INSERT INTO orders(product, quantity, price)
	 VALUES('widget', 3, 19.99);

Finally, you can use a programming language client, such as Java:

   KsqlObject row = new KqslObject()
   		.put("PERSON", "robin")
   		.put("LOCATION", "Manchester");

   //insert data into stream
   client.insertInto("MOVEMENTS", row).get();

Streams/Tables Backed by an Existing Kafka Topic

If you have data in an existing Apache Kafka topic, you can create a stream or a table backed by that topic and begin streaming the data into ksqlDB:

   CREATE STREAM people
     WITH (KAFKA_TOPIC='topic1',
     VALUE_FORMAT='AVRO');

Any subsequent data produced to the topic will be streamed into ksqlDB, and any data inserted into the new stream will be written to the Kafka topic automagically.

Streams/Tables with a New Kafka Topic

If you don't have an existing topic, you can create a new stream or table, and ksqlDB will create the backing topic for you.

    CREATE TABLE departments(
    	id INT PRIMARY KEY,
    	name VARCHAR)
    WITH (KAFKA_TOPIC='dept_topic',
    	PARTITIONS=3,
    	VALUE_FORMAT='AVRO');

Again, the two-way data flow is automatic.

Importing and Exporting Data from Other Systems Using ksqlDB

ksqlDB can ingest data from systems for processing, and can push data down to other systems. It does this using Kafka Connect, which can be run embedded or separately. Since ksqlDB uses Kafka Connect, users have access to the hundreds of integrations available on Confluent Hub. Some of these include:

  • Databases
  • NoSQL stores
  • Message queues
  • Object stores

Here’s an example of ingesting data from MongoDB into an Apache Kafka topic, which can be used to build a table or stream in ksqlDB:

CREATE SOURCE CONNECTOR SOURCE_MONGODB_UNIFI_01 WITH (
    'connector.class' = 'io.debezium.connector.mongodb.MongoDbConnector',
    'mongodb.hosts' = 'rs0/mongodb:27017',
    'mongodb.name' = 'unifi',
    'collection.whitelist' = 'ace.device, ace.user'
);

With a source connector, you snapshot a database into ksqlDB and then stream any subsequent changes by Change Data Capture (CDC).

Similarly, ksqlDB can create connectors to stream data out to other systems. Here’s an example of pushing the ORDERS_ENRICHED stream, which we created above, to Elasticsearch:

CREATE SINK CONNECTOR SINK_ELASTIC_ORDERS_01 WITH (
    'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
    'topics' = 'ORDERS_ENRICHED',
    'Connection.url' = 'http://elasticsearch:9200',
    'type.name' = '_doc'
);

Importing and Exporting Data from Other Systems Using Confluent Cloud

Confluent Cloud makes it easy to get data into and out of ksqlDB, with fully managed connectors for most popular databases, analytics products, and SaaS applications.

Systems-Using-Confluent-Cloud

Use the promo code KSQLDB101 to get $101 of free Confluent Cloud usage

Be the first to get updates and new content

We will only share developer content and updates, including notifications when new content is added. We will never send you sales emails. 🙂 By subscribing, you understand we will process your personal information in accordance with our Privacy Statement.