docker build \
-t localbuild/connect_distributed_with_splunk-s2s:1.0.5 \
-f Dockerfile .
Provision a Kafka cluster in Confluent Cloud.
Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial.
ksqlDB supports SQL
language for extracting, transforming, and loading events within your Kafka cluster.
Confluent Cloud offers pre-built, fully managed connectors that make it easy to quickly connect to popular data sources and end systems in the cloud. However, if you want to stream specific events that don’t have an available connector in Confluent Cloud, you can run your own connector and send the data to your Kafka cluster. This tutorial shows how you can run your own connector locally.
If you cannot connect to a real data source with properly formatted data, or if you just want to execute this tutorial without external dependencies, no worries! After you create the ksqlDB application, insert mock data into the streams.
This tutorial shows the source as Cisco Adaptive Security Appliance (ASA) and the Splunk S2S Source connector should be run on the same host as the Splunk UF, but the same logic can be applied to any type of device.
To stream ASA data into a Kafka topic called splunk
, create the Dockerfile
below to bundle a Kafka Connect worker with the kafka-connect-splunk-s2s
connector:
FROM confluentinc/cp-server-connect-base:7.3.0
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-splunk-s2s:1.0.5
Build the custom Docker image with this command:
docker build \
-t localbuild/connect_distributed_with_splunk-s2s:1.0.5 \
-f Dockerfile .
Next, create a docker-compose.yml
file with the following content, substituting your Confluent Cloud connection information:
---
version: '2'
services:
connect:
image: localbuild/connect_distributed_with_splunk-s2s:1.0.5
hostname: connect
container_name: connect
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: < BOOTSTRAP SERVER >
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: "connect"
CONNECT_CONFIG_STORAGE_TOPIC: recipe-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: recipe-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: recipe-connect-status
CONNECT_REPLICATION_FACTOR: 3
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
CONNECT_LOG4J_LOGGERS: org.reflections=ERROR
# Connect worker
CONNECT_SECURITY_PROTOCOL: SASL_SSL
CONNECT_SASL_JAAS_CONFIG: < SASL JAAS CONFIG >
CONNECT_SASL_MECHANISM: PLAIN
# Connect producer
CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_PRODUCER_SASL_JAAS_CONFIG: < SASL JAAS CONFIG >
CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
# Connect consumer
CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_CONSUMER_SASL_JAAS_CONFIG: < SASL JAAS CONFIG >
CONNECT_CONSUMER_SASL_MECHANISM: PLAIN
Run the container with the Connect worker:
docker compose up -d
Create a configuration file, connector-splunk-s2s.config
, for the Splunk S2S Source connector, specifying the port that the connector will use:
{
"connector.class" : "io.confluent.connect.splunk.s2s.SplunkS2SSourceConnector",
"name" : "recipe-splunk-s2s-source",
"splunk.s2s.port" : "<port>",
"kafka.topic" : "splunk",
"tasks.max" : "1"
}
Submit that connector to the Connect worker:
curl -X POST -H "Content-Type: application/json" --data @connector-splunk-s2s.config http://localhost:8083/connectors
You now should have ASA events being written to the splunk
topic in Confluent Cloud.
When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.
|
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM splunk (
event VARCHAR,
time BIGINT,
host VARCHAR,
source VARCHAR,
index VARCHAR,
sourcetype VARCHAR
) WITH (
KAFKA_TOPIC = 'splunk-s2s-events',
VALUE_FORMAT = 'json',
PARTITIONS = 6
);
-- Split message into array
CREATE STREAM splunk_parsed WITH (KAFKA_TOPIC = 'splunk_parsed') AS SELECT
time,
host,
sourcetype,
REGEXP_SPLIT_TO_ARRAY(event, ' ')
FROM splunk
WHERE sourcetype = 'cisco:asa'
EMIT CHANGES;
-- Filter messages for Cisco ASA and where action is Deny
CREATE STREAM splunk_filtered WITH (KAFKA_TOPIC = 'splunk_filtered') AS SELECT
time,
host,
sourcetype,
KSQL_COL_0[2] AS action,
KSQL_COL_0[3] AS protocol,
SPLIT(REGEXP_SPLIT_TO_ARRAY(KSQL_COL_0[5], 'inside:')[2], '/')[1] as inside_ip,
SPLIT(REGEXP_SPLIT_TO_ARRAY(KSQL_COL_0[5], 'inside:')[2], '/')[2] as inside_port,
SPLIT(REGEXP_SPLIT_TO_ARRAY(KSQL_COL_0[7], 'outside:')[2], '/')[1] as outside_ip,
SPLIT(REGEXP_SPLIT_TO_ARRAY(KSQL_COL_0[7], 'outside:')[2], '/')[2] as outside_port
FROM splunk_parsed
WHERE KSQL_COL_0[2] = 'Deny'
EMIT CHANGES;
If you are you not running source connectors to produce events, you can use ksqlDB INSERT INTO
statements to insert mock data into the source topics:
INSERT INTO splunk (event, time, host, source, index, sourcetype) VALUES ('%ASA-6-302016: Teardown UDP connection 841023 for outside:75.75.75.75/53 to inside:192.168.9.50/60995 duration 0:00:00 bytes 104', UNIX_TIMESTAMP(), 'firewall-1', '/opt/splunkforwarder/splunk-s2s-test.log', 'default', 'cisco:asa');
INSERT INTO splunk (event, time, host, source, index, sourcetype) VALUES ('%ASA-2-106001: Inbound TCP connection denied from 34.215.24.225/443 to 192.168.10.18/34312 flags FIN PSH ACK on interface outside', UNIX_TIMESTAMP(), 'firewall-1', '/opt/splunkforwarder/splunk-s2s-test.log', 'default', 'cisco:asa');
INSERT INTO splunk (event, time, host, source, index, sourcetype) VALUES ('%ASA-4-106023: Deny tcp src inside:192.168.9.50/52596 dst outside:204.107.141.110/8443 by access-group "inside_access_in" [0x0, 0x0]', UNIX_TIMESTAMP(), 'firewall-1', '/opt/splunkforwarder/splunk-s2s-test.log', 'default', 'cisco:asa');
INSERT INTO splunk (event, time, host, source, index, sourcetype) VALUES ('%ASA-4-106023: Deny tcp src inside:192.168.9.50/52597 dst outside:204.107.141.110/8443 by access-group "inside_access_in" [0x0, 0x0]', UNIX_TIMESTAMP(), 'firewall-1', '/opt/splunkforwarder/splunk-s2s-test.log', 'default', 'cisco:asa');
INSERT INTO splunk (event, time, host, source, index, sourcetype) VALUES ('%ASA-6-302013: Built outbound TCP connection 841025 for outside:34.215.24.225/443 (34.215.24.225/443) to inside:192.168.9.30/50398 (192.168.10.18/4550)', UNIX_TIMESTAMP(), 'firewall-1', '/opt/splunkforwarder/splunk-s2s-test.log', 'default', 'cisco:asa');
To validate that this recipe is working, run the following query:
SELECT * FROM splunk_filtered EMIT CHANGES LIMIT 2;
Your output should resemble:
+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+
|TIME |HOST |SOURCETYPE |ACTION |PROTOCOL |INSIDE_IP |INSIDE_PORT |OUTSIDE_IP |OUTSIDE_PORT |
+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+
|1646857365018 |firewall-1 |cisco:asa |Deny |tcp |192.168.9.50 |52597 |204.107.141.11|8443 |
| | | | | | | |0 | |
|1646857364915 |firewall-1 |cisco:asa |Deny |tcp |192.168.9.50 |52596 |204.107.141.11|8443 |
| | | | | | | |0 | |
Limit Reached
Query terminated
After processing the data, send the targeted set of events to Splunk for indexing:
-- Send data to Splunk
CREATE SINK CONNECTOR IF NOT EXISTS recipe_splunk_cybersecurity WITH (
'connector.class' = 'SplunkSink',
'input.data.format' = 'JSON',
'kafka.api.key' = '<my-kafka-api-key>',
'kafka.api.secret' = '<my-kafka-api-secret>',
'topics' = 'splunk_filtered',
'splunk.hec.uri' = '<splunk-indexers>',
'splunk.hec.token' = '<Splunk HTTP Event Collector token>',
'tasks.max' = '1'
);
To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate).
By including the DELETE TOPIC
clause, the topic backing the stream or table is asynchronously deleted as well.
DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;
If you also created connectors, remove those as well (substitute connector name).
DROP CONNECTOR IF EXISTS <connector_name>;