docker build \
-t localbuild/connect_distributed_with_syslog:1.3.4 \
-f Dockerfile .
Provision a Kafka cluster in Confluent Cloud.
Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial.
ksqlDB supports SQL
language for extracting, transforming, and loading events within your Kafka cluster.
Confluent Cloud offers pre-built, fully managed connectors that make it easy to quickly connect to popular data sources and end systems in the cloud. However, if you want to stream specific events that don’t have an available connector in Confluent Cloud, you can run your own connector and send the data to your Kafka cluster. This tutorial shows how you can run your own connector locally.
If you cannot connect to a real data source with properly formatted data, or if you just want to execute this tutorial without external dependencies, no worries! After you create the ksqlDB application, insert mock data into the streams.
To stream syslog data into a Kafka topic called syslog
, create the Dockerfile
below to bundle a Kafka Connect worker with the kafka-connect-syslog
connector:
FROM confluentinc/cp-server-connect-base:7.3.0
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-syslog:1.3.4
Build the custom Docker image with this command:
docker build \
-t localbuild/connect_distributed_with_syslog:1.3.4 \
-f Dockerfile .
Next, create a docker-compose.yml
file with the following content, substituting your Confluent Cloud connection information:
---
version: '2'
services:
connect:
image: localbuild/connect_distributed_with_syslog:1.3.4
hostname: connect
container_name: connect
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: < BOOTSTRAP SERVER >
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: "connect"
CONNECT_CONFIG_STORAGE_TOPIC: recipe-connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: recipe-connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: recipe-connect-status
CONNECT_REPLICATION_FACTOR: 3
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
CONNECT_LOG4J_LOGGERS: org.reflections=ERROR
# Connect worker
CONNECT_SECURITY_PROTOCOL: SASL_SSL
CONNECT_SASL_JAAS_CONFIG: < SASL JAAS CONFIG >
CONNECT_SASL_MECHANISM: PLAIN
# Connect producer
CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_PRODUCER_SASL_JAAS_CONFIG: < SASL JAAS CONFIG >
CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
# Connect consumer
CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL
CONNECT_CONSUMER_SASL_JAAS_CONFIG: < SASL JAAS CONFIG >
CONNECT_CONSUMER_SASL_MECHANISM: PLAIN
Run the container with this:
docker compose up -d
Create a Syslog Source connector configuration file called connector-syslog.config
:
{
"connector.class" : "io.confluent.connect.syslog.SyslogSourceConnector",
"name" : "recipe-syslog-source",
"syslog.port" : "<port>",
"syslog.listener" : "TCP",
"kafka.topic" : "syslog",
"tasks.max" : "1"
}
Submit that connector to the connect worker:
curl -X POST -H "Content-Type: application/json" --data @connector-syslog.config http://localhost:8083/connectors
Now you should have Syslog messages being written to the syslog
topic in Confluent Cloud.
Process the syslog events by flagging events with invalid users, stripping out all the other unnecessary fields, and creating just a stream of relevant information. There are many ways to customize the resulting stream to fit the business needs: this example also demonstrates how to enrich the stream with a new field FACILITY_DESCRIPTION
with human-readable content.
When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.
|
SET 'auto.offset.reset' = 'earliest';
-- Extract relevant fields from log messages
CREATE STREAM syslog (
ts BIGINT,
host VARCHAR,
facility INT,
message VARCHAR,
remote_address VARCHAR
) WITH (
KAFKA_TOPIC = 'syslog',
VALUE_FORMAT = 'JSON',
PARTITIONS = 6,
TIMESTAMP='ts'
);
-- Create actionable stream of SSH attacks, filtering syslog messages where user is invalid,
-- and enriched with user and IP
CREATE STREAM ssh_attacks AS
SELECT
FORMAT_TIMESTAMP(FROM_UNIXTIME(ts), 'yyyy-MM-dd HH:mm:ss') AS syslog_timestamp,
host,
facility,
CASE WHEN facility = 0 THEN 'kernel messages'
WHEN facility = 1 THEN 'user-level messages'
WHEN facility = 2 THEN 'mail system'
WHEN facility = 3 THEN 'system daemons'
WHEN facility = 4 THEN 'security/authorization messages'
WHEN facility = 5 THEN 'messages generated internally by syslogd'
WHEN facility = 6 THEN 'line printer subsystem'
ELSE '<unknown>'
END AS facility_description,
SPLIT(REPLACE(message, 'Invalid user ', ''), ' from ')[1] AS attack_user,
remote_address AS attack_ip
FROM syslog
WHERE message LIKE 'Invalid user%'
EMIT CHANGES;
If you are you not running source connectors to produce events, you can use ksqlDB INSERT INTO
statements to insert mock data into the source topics:
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES (UNIX_TIMESTAMP(), 'asgard.example.com', 0, 'Line protocol on Interface GigabitEthernet0/1, changed state to up', NULL);
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES (UNIX_TIMESTAMP(), 'asgard.example.com', 0, 'partition health measures for /var did not suffice - still using 96% of partition space', NULL);
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES (UNIX_TIMESTAMP(), 'asgard.example.com', 4, 'Invalid user asmith', '192.168.10.83');
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES (UNIX_TIMESTAMP(), 'asgard.example.com', 4, 'Invalid user bsmith', '192.168.10.83');
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES (UNIX_TIMESTAMP(), 'asgard.example.com', 4, 'Invalid user csmith', '192.168.10.83');
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES (UNIX_TIMESTAMP(), 'asgard.example.com', 4, 'Invalid user dsmith', '192.168.10.83');
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES (UNIX_TIMESTAMP(), 'asgard.example.com', 4, 'Invalid user esmith', '192.168.10.83');
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES (UNIX_TIMESTAMP(), 'asgard.example.com', 4, 'Invalid user fsmith', '192.168.10.83');
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES (UNIX_TIMESTAMP(), 'asgard.example.com', 0, 'partition health measures for /var did not suffice - still using 96% of partition space', NULL);
To validate that this recipe is working, run the following query:
SELECT * FROM ssh_attacks;
Your output should resemble:
+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
|SYSLOG_TIMESTAMP |HOST |FACILITY |FACILITY_DESCRIPTION |ATTACK_USER |ATTACK_IP |
+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
|2022-03-14 14:44:48 |asgard.example.com |4 |security/authorization mess|bsmith |192.168.10.83 |
| | | |ages | | |
|2022-03-14 14:44:48 |asgard.example.com |4 |security/authorization mess|asmith |192.168.10.83 |
| | | |ages | | |
|2022-03-14 14:44:48 |asgard.example.com |4 |security/authorization mess|dsmith |192.168.10.83 |
| | | |ages | | |
|2022-03-14 14:44:48 |asgard.example.com |4 |security/authorization mess|csmith |192.168.10.83 |
| | | |ages | | |
|2022-03-14 14:44:48 |asgard.example.com |4 |security/authorization mess|fsmith |192.168.10.83 |
| | | |ages | | |
|2022-03-14 14:44:48 |asgard.example.com |4 |security/authorization mess|esmith |192.168.10.83 |
| | | |ages | | |
Query terminated
To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate).
By including the DELETE TOPIC
clause, the topic backing the stream or table is asynchronously deleted as well.
DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;
If you also created connectors, remove those as well (substitute connector name).
DROP CONNECTOR IF EXISTS <connector_name>;