Detect and analyze SSH attacks

Edit this page
There are lots of ways SSH can be abused, but one of the most straightforward ways to detect suspicious activity is to monitor for rejected logins. This tutorial processes Syslog data to detect failed logins, while streaming out those pairs of usernames and IP addresses. With ksqlDB, you can filter and react to unwanted events in real time to minimize damage rather than performing historical analysis of Syslog data from cold storage.

To see this tutorial in action, click here to launch it now. It will pre-populate the ksqlDB code in the Confluent Cloud Console and provide mock data or stubbed out code to connect to a real data source. For more detailed instructions, follow the steps below.

Run it

Set up your environment

1

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial. ksqlDB supports SQL language for extracting, transforming, and loading events within your Kafka cluster.

Read the data in

2

Confluent Cloud offers pre-built, fully managed connectors that make it easy to quickly connect to popular data sources and end systems in the cloud. However, if you want to stream specific events that don’t have an available connector in Confluent Cloud, you can run your own connector and send the data to your Kafka cluster. This tutorial shows how you can run your own connector locally.

If you cannot connect to a real data source with properly formatted data, or if you just want to execute this tutorial without external dependencies, no worries! After you create the ksqlDB application, insert mock data into the streams.

To stream syslog data into a Kafka topic called syslog, create the Dockerfile below to bundle a Kafka Connect worker with the kafka-connect-syslog connector:

FROM confluentinc/cp-server-connect-base:7.3.0

ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"

RUN confluent-hub install --no-prompt confluentinc/kafka-connect-syslog:1.3.4

Build the custom Docker image with this command:

docker build \
   -t localbuild/connect_distributed_with_syslog:1.3.4 \
   -f Dockerfile .

Next, create a docker-compose.yml file with the following content, substituting your Confluent Cloud connection information:

---
version: '2'
services:

  connect:
    image: localbuild/connect_distributed_with_syslog:1.3.4
    hostname: connect
    container_name: connect
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: < BOOTSTRAP SERVER >
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: "connect"
      CONNECT_CONFIG_STORAGE_TOPIC: recipe-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: recipe-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: recipe-connect-status
      CONNECT_REPLICATION_FACTOR: 3
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
      CONNECT_LOG4J_LOGGERS: org.reflections=ERROR
      # Connect worker
      CONNECT_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_SASL_JAAS_CONFIG: < SASL JAAS CONFIG >
      CONNECT_SASL_MECHANISM: PLAIN
      # Connect producer
      CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_PRODUCER_SASL_JAAS_CONFIG: < SASL JAAS CONFIG >
      CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
      # Connect consumer
      CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_CONSUMER_SASL_JAAS_CONFIG: < SASL JAAS CONFIG >
      CONNECT_CONSUMER_SASL_MECHANISM: PLAIN

Run the container with this:

docker compose up -d

Create a Syslog Source connector configuration file called connector-syslog.config:

{
  "connector.class"          : "io.confluent.connect.syslog.SyslogSourceConnector",
  "name"                     : "recipe-syslog-source",
  "syslog.port"              : "<port>",
  "syslog.listener"          : "TCP",
  "kafka.topic"              : "syslog",
  "tasks.max"                : "1"
}

Submit that connector to the connect worker:

curl -X POST -H "Content-Type: application/json" --data @connector-syslog.config http://localhost:8083/connectors

Now you should have Syslog messages being written to the syslog topic in Confluent Cloud.

Execute ksqlDB code

3

Process the syslog events by flagging events with invalid users, stripping out all the other unnecessary fields, and creating just a stream of relevant information. There are many ways to customize the resulting stream to fit the business needs: this example also demonstrates how to enrich the stream with a new field FACILITY_DESCRIPTION with human-readable content.

When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.
SET 'auto.offset.reset' = 'earliest';

-- Extract relevant fields from log messages
CREATE STREAM syslog (
  ts BIGINT,
  host VARCHAR,
  facility INT,
  message VARCHAR,
  remote_address VARCHAR
) WITH (
  KAFKA_TOPIC = 'syslog',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6,
  TIMESTAMP='ts'
);

-- Create actionable stream of SSH attacks, filtering syslog messages where user is invalid,
-- and enriched with user and IP
CREATE STREAM ssh_attacks AS
  SELECT
    FORMAT_TIMESTAMP(FROM_UNIXTIME(ts), 'yyyy-MM-dd HH:mm:ss') AS syslog_timestamp,
    host,
    facility,
    CASE WHEN facility = 0 THEN 'kernel messages'
         WHEN facility = 1 THEN 'user-level messages'
         WHEN facility = 2 THEN 'mail system'
         WHEN facility = 3 THEN 'system daemons'
         WHEN facility = 4 THEN 'security/authorization messages'
         WHEN facility = 5 THEN 'messages generated internally by syslogd'
         WHEN facility = 6 THEN 'line printer subsystem'
         ELSE '<unknown>'
       END AS facility_description,
    SPLIT(REPLACE(message, 'Invalid user ', ''), ' from ')[1] AS attack_user,
    remote_address AS attack_ip
  FROM syslog
  WHERE message LIKE 'Invalid user%'
  EMIT CHANGES;

Test with mock data

4

If you are you not running source connectors to produce events, you can use ksqlDB INSERT INTO statements to insert mock data into the source topics:

INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES (UNIX_TIMESTAMP(), 'asgard.example.com', 0, 'Line protocol on Interface GigabitEthernet0/1, changed state to up', NULL);
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES (UNIX_TIMESTAMP(), 'asgard.example.com', 0, 'partition health measures for /var did not suffice - still using 96% of partition space', NULL);
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES (UNIX_TIMESTAMP(), 'asgard.example.com', 4, 'Invalid user asmith', '192.168.10.83');
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES (UNIX_TIMESTAMP(), 'asgard.example.com', 4, 'Invalid user bsmith', '192.168.10.83');
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES (UNIX_TIMESTAMP(), 'asgard.example.com', 4, 'Invalid user csmith', '192.168.10.83');
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES (UNIX_TIMESTAMP(), 'asgard.example.com', 4, 'Invalid user dsmith', '192.168.10.83');
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES (UNIX_TIMESTAMP(), 'asgard.example.com', 4, 'Invalid user esmith', '192.168.10.83');
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES (UNIX_TIMESTAMP(), 'asgard.example.com', 4, 'Invalid user fsmith', '192.168.10.83');
INSERT INTO syslog (ts, host, facility, message, remote_address) VALUES (UNIX_TIMESTAMP(), 'asgard.example.com', 0, 'partition health measures for /var did not suffice - still using 96% of partition space', NULL);

To validate that this recipe is working, run the following query:

SELECT * FROM ssh_attacks;

Your output should resemble:

+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
|SYSLOG_TIMESTAMP           |HOST                       |FACILITY                   |FACILITY_DESCRIPTION       |ATTACK_USER                |ATTACK_IP                  |
+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
|2022-03-14 14:44:48        |asgard.example.com         |4                          |security/authorization mess|bsmith                     |192.168.10.83              |
|                           |                           |                           |ages                       |                           |                           |
|2022-03-14 14:44:48        |asgard.example.com         |4                          |security/authorization mess|asmith                     |192.168.10.83              |
|                           |                           |                           |ages                       |                           |                           |
|2022-03-14 14:44:48        |asgard.example.com         |4                          |security/authorization mess|dsmith                     |192.168.10.83              |
|                           |                           |                           |ages                       |                           |                           |
|2022-03-14 14:44:48        |asgard.example.com         |4                          |security/authorization mess|csmith                     |192.168.10.83              |
|                           |                           |                           |ages                       |                           |                           |
|2022-03-14 14:44:48        |asgard.example.com         |4                          |security/authorization mess|fsmith                     |192.168.10.83              |
|                           |                           |                           |ages                       |                           |                           |
|2022-03-14 14:44:48        |asgard.example.com         |4                          |security/authorization mess|esmith                     |192.168.10.83              |
|                           |                           |                           |ages                       |                           |                           |
Query terminated

Cleanup

5

To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is asynchronously deleted as well.

DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, remove those as well (substitute connector name).

DROP CONNECTOR IF EXISTS <connector_name>;