Integrate Mainframe Data

Edit this page
Mainframes have historically underpinned many mission-critical applications as the de facto solution for many high performance batch processes. However, as organizations look to modernize their data infrastructure, it becomes increasingly expensive and complex to access and integrate mainframe data to other more modern distributed applications, microservices, and data platforms. Thus, many organizations have turned to Kafka to become a modern datastore that is in real-time sync with the mainframe. This recipe showcases how to leverage a real-time cache in ksqlDB, which can be used to offload mainframe calls to Kafka.

To see this tutorial in action, click here to launch it now. It will pre-populate the ksqlDB code in the Confluent Cloud Console and provide mock data or stubbed out code to connect to a real data source. For more detailed instructions, follow the steps below.

Run it

Set up your environment

1

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial. ksqlDB supports SQL language for extracting, transforming, and loading events within your Kafka cluster.

Execute ksqlDB code

2

ksqlDB processes data in realtime, and you can also import and export data straight from ksqlDB from popular data sources and end systems in the cloud. This tutorial shows you how to run the recipe in one of two ways: using connector(s) to any supported data source or using ksqlDB’s INSERT INTO functionality to mock the data.

If you cannot connect to a real data source with properly formatted data, or if you just want to execute this tutorial without external dependencies, no worries! Remove the CREATE SOURCE CONNECTOR commands and insert mock data into the streams.

This code creates a real-time cache in ksqlDB of mainframe account data, which can be used to offload mainframe calls to Kafka. The sample source connector below is the IBM MQ Source Connector since often the IBM MQ messaging platform is running on the mainframe.

When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.

-- Substitute your parameter values in the connector configurations below.
-- If you do not want to connect to a real data source, remove the CREATE SOURCE CONNECTOR commands,
-- and add the INSERT INTO commands to insert mock data into the streams

CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_ibmmq_transactions WITH (
  'connector.class'          = 'io.confluent.connect.ibm.mq.IbmMQSourceConnector',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'kafka.topic'              = 'mq_transactions',
  'output.data.format'       = 'JSON',
  'jms.destination.name'     = '<destination-name>',
  'mq.username'              = '<authorized-user>',
  'mq.password'              = '<user-password>',
  'mq.hostname'              = '<server-hostname>',
  'mq.queue.manager'         = '<queue-name>',
  'tasks.max'                = '1'
);

SET 'auto.offset.reset' = 'earliest';

-- Create stream of transactions from the Kafka topic
CREATE STREAM mq_transactions (
  dep_account_no VARCHAR,
  dep_balance_dollars BIGINT,
  dep_balance_cents BIGINT,
  timestamp BIGINT
) WITH (
  KAFKA_TOPIC = 'mq_transactions',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);

-- Normalize the data and calculate timestamp deltas
CREATE STREAM mq_transactions_normalized WITH (KAFKA_TOPIC = 'mq_transactions_normalized')
  AS SELECT
    dep_account_no,
    CAST(dep_balance_dollars AS DECIMAL(10,2)) + CAST(dep_balance_cents AS DECIMAL(10,2))/CAST(100 as DECIMAL(10,2)) as balance,
    timestamp AS ts_stream,
    UNIX_TIMESTAMP() AS ts_cache,
    (UNIX_TIMESTAMP() - timestamp) AS ts_delta
FROM mq_transactions
PARTITION BY dep_account_no
EMIT CHANGES;

CREATE SOURCE TABLE mq_cache (
    dep_account_no VARCHAR PRIMARY KEY,
    balance BIGINT,
    ts_stream BIGINT,
    ts_cache BIGINT,
    ts_delta BIGINT
) WITH (
    KAFKA_TOPIC = 'mq_transactions_normalized',
    KEY_FORMAT = 'KAFKA',
    VALUE_FORMAT = 'JSON'
);

Test with mock data

3

If you are you not running source connectors to produce events, you can use ksqlDB INSERT INTO statements to insert mock data into the source topics:

INSERT INTO mq_transactions (dep_account_no, dep_balance_dollars, dep_balance_cents, timestamp) VALUES ('N839327', 1930, 99, UNIX_TIMESTAMP());
INSERT INTO mq_transactions (dep_account_no, dep_balance_dollars, dep_balance_cents, timestamp) VALUES ('S938807', 89025, 70, UNIX_TIMESTAMP());
INSERT INTO mq_transactions (dep_account_no, dep_balance_dollars, dep_balance_cents, timestamp) VALUES ('N839327', 5075, 05, UNIX_TIMESTAMP());
INSERT INTO mq_transactions (dep_account_no, dep_balance_dollars, dep_balance_cents, timestamp) VALUES ('S938807', 49700, 61, UNIX_TIMESTAMP());
INSERT INTO mq_transactions (dep_account_no, dep_balance_dollars, dep_balance_cents, timestamp) VALUES ('S938807', 7721, 82, UNIX_TIMESTAMP());

To validate that this recipe is working, run the following query:

SELECT * FROM mq_cache;

Your output should resemble:

+----------------------+----------------------+----------------------+----------------------+----------------------+
|DEP_ACCOUNT_NO        |BALANCE               |TS_STREAM             |TS_CACHE              |TS_DELTA              |
+----------------------+----------------------+----------------------+----------------------+----------------------+
|N839327               |5075                  |1646762067263         |1646762067298         |35                    |
|S938807               |7721                  |1646762067456         |1646762067489         |33                    |
Query terminated

Cleanup

4

To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is asynchronously deleted as well.

DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, remove those as well (substitute connector name).

DROP CONNECTOR IF EXISTS <connector_name>;