Detect a Slowloris DDoS attack

Edit this page
Distributed denial-of-service (DDoS) attacks are a specific type of cyberattack in which a targeted system is flooded with malicious network requests using multiple hosts and IP addresses. The distributed nature of these attacks, along with their increased occurrence, sophistication, and strength, makes them one of the most severe forms of cyber attacks that’s getting progressively difficult to mitigate. This recipe shows a strategy for ingesting and processing network packet data to quickly detect a specific DDoS attack known as Slowloris.

To see this tutorial in action, click here to launch it now. It will pre-populate the ksqlDB code in the Confluent Cloud Console and provide mock data or stubbed out code to connect to a real data source. For more detailed instructions, follow the steps below.

Run it

Set up your environment

1

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial. ksqlDB supports SQL language for extracting, transforming, and loading events within your Kafka cluster.

Execute ksqlDB code

2

ksqlDB processes data in realtime, and you can also import and export data straight from ksqlDB from popular data sources and end systems in the cloud. This tutorial shows you how to run the recipe in one of two ways: using connector(s) to any supported data source or using ksqlDB’s INSERT INTO functionality to mock the data.

If you cannot connect to a real data source with properly formatted data, or if you just want to execute this tutorial without external dependencies, no worries! Remove the CREATE SOURCE CONNECTOR commands and insert mock data into the streams.

This application takes the raw network packet data and creates a structured stream of events that can be processed using SQL. Using windows and filters, the application detects a high number of connection RESET events from the server and isolates the potentially offending source.

This tutorial assumes that you have captured your network packet data and published it to a RabbitMQ queue in JSON format. An example packet may look like the following:

{
  "timestamp": "1590682723239",
  "layers": {
    "frame": {
      "time": "May 28, 2020 11:48:43.239564000 CST",
      "protocols": "eth:ethertype:ip:tcp"
    },
    "eth": {
      "src": "FF:AA:C9:83:C0:21",
      "dst": "DF:ED:E3:91:D4:13"
    },
    "ip": {
      "src": "192.168.33.11",
      "src_host": "192.168.33.11",
      "dst": "192.168.33.77",
      "dst_host": "192.168.33.77"
    },
    "tcp": {
      "srcport": "59202",
      "dstport": "443",
      "flags_ack": "1",
      "flags_reset": "0"
    }
  }
}

Note: For brevity, some fields have been removed and some names have been simplified from a typical packet capture event.

The connector will source the data into a Kafka topic for stream processing in ksqlDB.

When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.

-- Substitute your parameter values in the connector configurations below.
-- If you do not want to connect to a real data source, remove the CREATE SOURCE CONNECTOR commands,
-- and add the INSERT INTO commands to insert mock data into the streams

-- Example
CREATE SOURCE CONNECTOR IF NOT EXISTS network_traffic_source WITH (
  'connector.class'   = 'RabbitMQSource',
  'name'              = 'network_traffic_source',
  'kafka.api.key'     = '<my-kafka-api-key>',
  'kafka.api.secret'  = '<my-kafka-api-secret>',
  'kafka.topic'       = 'network-traffic',
  'rabbitmq.host'     = '<my-rabbitmq-host>',
  'rabbitmq.username' = '<my-rabbitmq-username>',
  'rabbitmq.password' = '<my-rabbitmq-password>',
  'rabbitmq.queue'    = '<my-rabbitmq-queue>',
  'tasks.max'         = '1'
);

SET 'auto.offset.reset' = 'earliest';

CREATE STREAM network_traffic (
  timestamp BIGINT,
  layers STRUCT<
   frame STRUCT<
      time VARCHAR,
      protocols VARCHAR >,
   eth STRUCT<
      src VARCHAR,
      dst VARCHAR >,
   ip STRUCT<
      src VARCHAR,
      src_host VARCHAR,
      dst VARCHAR,
      dst_host VARCHAR,
      proto VARCHAR >,
   tcp STRUCT<
      srcport VARCHAR,
      dstport VARCHAR,
      flags_ack VARCHAR,
      flags_reset VARCHAR >>
) WITH (
  KAFKA_TOPIC = 'network-traffic',
  TIMESTAMP = 'timestamp',
  VALUE_FORMAT = 'JSON',
  PARTITIONS = 6
);


CREATE TABLE potential_slowloris_attacks AS
SELECT
  layers->ip->src, count(*) as count_connection_reset
FROM network_traffic
  WINDOW TUMBLING (SIZE 60 SECONDS)
WHERE layers->tcp->flags_ack = '1' AND layers->tcp->flags_reset = '1'
GROUP BY
  layers->ip->src
HAVING
  count(*) > 10;

Test with mock data

3

If you are you not running source connectors to produce events, you can use ksqlDB INSERT INTO statements to insert mock data into the source topics:

INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642016000', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.12', srch_host := '192.168.33.12', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '0')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642016100', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.11', srch_host := '192.168.33.11', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '1')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642016200', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.12', srch_host := '192.168.33.12', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '0')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642016205', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.11', srch_host := '192.168.33.11', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '1')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642016300', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.12', srch_host := '192.168.33.12', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '0')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642016184', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.11', srch_host := '192.168.33.11', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '1')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642016400', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.12', srch_host := '192.168.33.12', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '0')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642016500', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.11', srch_host := '192.168.33.11', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '1')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642016600', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.12', srch_host := '192.168.33.12', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '0')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642016700', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.11', srch_host := '192.168.33.11', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '1')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642016800', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.12', srch_host := '192.168.33.12', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '0')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642016900', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.11', srch_host := '192.168.33.11', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '1')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642017000', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.12', srch_host := '192.168.33.12', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '0')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642017100', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.11', srch_host := '192.168.33.11', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '1')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642017200', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.12', srch_host := '192.168.33.12', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '0')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642017300', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.11', srch_host := '192.168.33.11', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '1')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642017400', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.12', srch_host := '192.168.33.12', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '0')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642017500', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.11', srch_host := '192.168.33.11', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '1')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642017600', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.12', srch_host := '192.168.33.12', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '0')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642017700', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.11', srch_host := '192.168.33.11', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '1')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642017700', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.12', srch_host := '192.168.33.12', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '0')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642017700', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.11', srch_host := '192.168.33.11', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '1')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642017700', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.12', srch_host := '192.168.33.12', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '0')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642017700', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.11', srch_host := '192.168.33.11', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '1')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642017700', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.12', srch_host := '192.168.33.12', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '0')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642017700', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.11', srch_host := '192.168.33.11', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '1')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642017700', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.12', srch_host := '192.168.33.12', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '0')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642017700', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.11', srch_host := '192.168.33.11', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '1')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642017700', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.12', srch_host := '192.168.33.12', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '0')));
INSERT INTO network_traffic (TIMESTAMP, layers) VALUES (UNIX_TIMESTAMP(), STRUCT(frame := STRUCT(time := '1642017700', protocols := 'eth:ethertype:ip:tcp'), eth := STRUCT(src := 'FF:AA:C9:83:C0:21', dst := 'DF:ED:E3:91:D4:13'), ip := STRUCT(src := '192.168.33.11', srch_host := '192.168.33.11', dst := '192.168.33.77', dst_host := '192.168.33.77', proto := '1'), tcp := STRUCT(srcport := '59202', dstport := '443', flags_ack := '1',flags_reset := '1')));

To validate that this recipe is working, run the following query:

SELECT * FROM potential_slowloris_attacks EMIT CHANGES LIMIT 1;

Your output should resemble:

+---------------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+
|SRC                                          |WINDOWSTART                                  |WINDOWEND                                    |COUNT_CONNECTION_RESET                       |
+---------------------------------------------+---------------------------------------------+---------------------------------------------+---------------------------------------------+
|192.168.33.11                                |1646315280000                                |1646315340000                                |11                                           |
Limit Reached
Query terminated

Explanation

4

This solution uses ksqlDB’s ability to model and query structured data.

Streaming JSON

Let’s break down the commands in this application and explain the individual parts.

The first step is to model the packet capture data using the ksqlDB CREATE STREAM command, giving our new stream the name network_traffic:

CREATE STREAM network_traffic

We then define the schema for events in the topic by declaring field names and data types using standard SQL syntax. In this snippet from the full statement:

  timestamp BIGINT,
  layers STRUCT<
   ...
   ip STRUCT<
      src VARCHAR,
      src_host VARCHAR,
      dst VARCHAR,
      dst_host VARCHAR,
      proto VARCHAR >,
   ...

We declare an event structure that contains a timestamp field, and then a child nested data structure named layers. Comparing the sample packet capture event with the declared structure, we see the relationships between the data and the field names and types:

{
  "timestamp": "1590682723239",
  "layers": {
    ...
    "ip": {
      "src": "192.168.33.11",
      "src_host": "192.168.33.11",
      "dst": "192.168.33.77",
      "dst_host": "192.168.33.77"
    },
    ...
  }
}

The CREATE STREAM …​ WITH …​ command marries the event schema with the Kafka topic. The WITH clause in the statement allows us to specify details about the stream.

WITH (
  KAFKA_TOPIC='network-traffic',
  TIMESTAMP='timestamp',
  VALUE_FORMAT='JSON',
  PARTITIONS=6
);

We also indicate the data format of the events on the topic, using the VALUE_FORMAT property. Finally, we use the TIMESTAMP property to indicate an event field that can be used as the rowtime of the event. This would allow us to perform time-based operations based on the actual event time as provided by the captured packet data.

Materialized view

Now that we have a useful stream of packet capture data, we’re ready to try to detect potential DDoS attacks from the events.

We’re going to use the ksqlDB CREATE TABLE command, which will create a new materialized view of the packet data.

Let’s tell ksqlDB to create the table with a name of potential_slowloris_attacks:

CREATE TABLE potential_slowloris_attacks AS

Next, we’ll define the values that we want to materialize into the table. We are capturing two values:

  • The source IP address, read from the layers→ip→src nested value in the JSON event

  • The count of rows that satisfy conditions defined later in the command (obtained Using the count function)

SELECT
  layers->ip->src, count(*) as count_connection_reset

Next, we tell ksqlDB about the event source from which to build the table: the network_traffic stream we defined above.

FROM network_traffic

Because the stream of packet capture events is continuous, we need a way to aggregate them into a bucket that is both meaningful to our business case and useful enough that we can perform calculations with it. Here, we want to know if there are a large number of connection reset events within a given period of time. So let’s tell ksqlDB that we want to create a window of events based on time:

WINDOW TUMBLING (SIZE 60 SECONDS)

A tumbling window specifies a bucket of events in a fixed time, non-overlapping, gap-less window. Here, we’ve specified 60-second windows.

Now that we have our events aggregated into time buckets with the fields that interest us, how do we specify that a connection has been reset? We use the ksqlDB WHERE clause to extract the relevant events. In this case, we define a connection as reset if the tcp flags_ack and flag_reset fields are set to "true".

WHERE
  layers->tcp->flags_ack = '1' AND layers->tcp->flags_reset = '1'

We will define a potential Slowloris attack as multiple connection reset events coming from the same source IP address. In order to properly aggregate (via the count function above), we need to group the qualifying events by the source IP:

GROUP BY layers->ip->src

And finally, we want to count the number of matching events within our window. In this example, we consider 10 events to signify a potential attack, but for real-world scenarios, you should adjust this variable.

HAVING count(*) > 10;

The end result is a TABLE that can be queried for information useful in alerting administrators of a potential attack. For example, you could execute a push query against the table as part of a monitoring and alerting pipeline.

First, for this example, we need to set the auto.offset.reset flag to earliest, which will ensure that our query runs from the beginning of the topic to produce an expected result. In a production query, you may choose to use latest and only capture events going forward from the time you execute the push query.

SET 'auto.offset.reset' = 'earliest';

This query will select all records from our materialized view, including the source IP address and count. We can use these to investigate the issue.

select * from POTENTIAL_SLOWLORIS_ATTACKS EMIT CHANGES;
+----------------------------+----------------------------+----------------------------+----------------------------+
|SRC                         |WINDOWSTART                 |WINDOWEND                   |COUNT_CONNECTION_RESET      |
+----------------------------+----------------------------+----------------------------+----------------------------+
|192.168.33.11               |1642017660000               |1642017720000               |14                          |

Cleanup

5

To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is asynchronously deleted as well.

DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, remove those as well (substitute connector name).

DROP CONNECTOR IF EXISTS <connector_name>;