Integrate legacy messaging systems with Kafka

Edit this page
Traditional message queues (MQs) such as RabbitMQ, TIBCO EMS, IBM MQ, and ActiveMQ have been widely used for decades to handle message distribution and inter-service communication across distributed applications. However, they can no longer keep up with the needs of modern applications across hybrid and multicloud environments for high volume throughput, asynchronicity, and heterogeneous datasets. They are riddled with many challenges – they lack persistence and were designed to support point-to-point message delivery. This recipe shows how to perform real-time stream processing in-flight by leveraging a connector to read critical data from legacy messaging systems into Kafka.

To see this tutorial in action, click here to launch it now. It will pre-populate the ksqlDB code in the Confluent Cloud Console and provide mock data or stubbed out code to connect to a real data source. For more detailed instructions, follow the steps below.

Run it

Set up your environment

1

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial. ksqlDB supports SQL language for extracting, transforming, and loading events within your Kafka cluster.

Execute ksqlDB code

2

ksqlDB processes data in realtime, and you can also import and export data straight from ksqlDB from popular data sources and end systems in the cloud. This tutorial shows you how to run the recipe in one of two ways: using connector(s) to any supported data source or using ksqlDB’s INSERT INTO functionality to mock the data.

The concept in this tutorial can be applicable to any of the traditional message systems (RabbitMQ, Tibco, IBM MQ, ActiveMQ, etc.), and this specific tutorial uses the RabbitMQ Source Connector for Confluent Cloud which uses the AMQP protocol to communicate with RabbitMQ servers and persists the data in a Kafka topic. Then you can process the data in a variety of ways.

When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.

SET 'auto.offset.reset' = 'earliest';

-- Register the initial stream
CREATE STREAM rabbit (userid VARCHAR,
                      timestamp BIGINT,
                      transaction VARCHAR,
                      amount VARCHAR
) WITH (
  KAFKA_TOPIC = 'from-rabbit',
  VALUE_FORMAT = 'json',
  PARTITIONS = 6
);

-- Convert the stream to typed fields
CREATE STREAM rabbit_transactions
  WITH (KAFKA_TOPIC = 'rabbit_transactions') AS
  SELECT TRANSACTION AS TX_TYPE,
         TIMESTAMP AS TX_TIMESTAMP,
         SUBSTRING(AMOUNT,1,1) AS CURRENCY,
         CAST(SUBSTRING(AMOUNT,2,LEN(AMOUNT)-1) AS DECIMAL(9,2)) AS TX_AMOUNT,
         CAST(USERID AS INT) AS USERID
  FROM rabbit
  WHERE TIMESTAMP IS NOT NULL
  EMIT CHANGES;

-- Count the number of transactions
CREATE TABLE number_transactions
  WITH (KAFKA_TOPIC = 'number_transactions') AS
  SELECT USERID,
         COUNT(USERID)
  FROM rabbit_transactions
  GROUP BY USERID
  EMIT CHANGES;

-- Add the INSERT INTO commands to insert mock data

INSERT INTO rabbit (userid, timestamp, transaction, amount) VALUES ('33', UNIX_TIMESTAMP(), 'CREDIT', '$141.73');
INSERT INTO rabbit (userid, timestamp, transaction, amount) VALUES ('89', UNIX_TIMESTAMP(), 'CREDIT', '$99.28');
INSERT INTO rabbit (userid, timestamp, transaction, amount) VALUES ('61', UNIX_TIMESTAMP(), 'CREDIT', '$31.29');
INSERT INTO rabbit (userid, timestamp, transaction, amount) VALUES ('33', UNIX_TIMESTAMP(), 'DEBIT', '$121.50');
INSERT INTO rabbit (userid, timestamp, transaction, amount) VALUES ('61', UNIX_TIMESTAMP(), 'CREDIT', '$105.00');
INSERT INTO rabbit (userid, timestamp, transaction, amount) VALUES ('89', UNIX_TIMESTAMP(), 'CREDIT', '$55.13');
INSERT INTO rabbit (userid, timestamp, transaction, amount) VALUES ('33', UNIX_TIMESTAMP(), 'DEBIT', '$67.14');

To validate that this recipe is working, run the following query:

SELECT * FROM rabbit_transactions EMIT CHANGES LIMIT 7;

Your output should resemble:

+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
|TX_TYPE                  |TX_TIMESTAMP             |CURRENCY                 |TX_AMOUNT                |USERID                   |
+-------------------------+-------------------------+-------------------------+-------------------------+-------------------------+
|DEBIT                    |1649181180947            |$                        |121.50                   |33                       |
|CREDIT                   |1649181180567            |$                        |141.73                   |33                       |
|CREDIT                   |1649181181035            |$                        |105.00                   |61                       |
|CREDIT                   |1649181181130            |$                        |55.13                    |89                       |
|CREDIT                   |1649181180712            |$                        |99.28                    |89                       |
|CREDIT                   |1649181180814            |$                        |31.29                    |61                       |
|DEBIT                    |1649181181216            |$                        |67.14                    |33                       |
Limit Reached
Query terminated

Test with real data

3

If you have real end systems to connect to, adapt the sample connector configuration(s) below and run them from ksqlDB with CREATE SOURCE CONNECTOR.

The concept in this tutorial can be applicable to any of the traditional message systems (RabbitMQ, Tibco, IBM MQ, ActiveMQ, etc.), and this specific tutorial uses the RabbitMQ Source Connector for Confluent Cloud which uses the AMQP protocol to communicate with RabbitMQ servers and persists the data in a Kafka topic.

CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_rabbitmq_transactions WITH (
  'connector.class'          = 'RabbitMQSource',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'kafka.topic'              = 'from-rabbit',
  'rabbitmq.host'            = '<host>',
  'rabbitmq.username'        = '<username>',
  'rabbitmq.password'        = '<password>',
  'rabbitmq.queue'           = 'transactions',
  'tasks.max'                = '1'
);

Cleanup

4

To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is asynchronously deleted as well.

DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, remove those as well (substitute connector name).

DROP CONNECTOR IF EXISTS <connector_name>;