Identify firewall deny events

Edit this page
In the security information and event management (SIEM) world, it's critical to have a scalable cyber intelligence platform to swiftly identify potential security threats and vulnerabilities, such as firewall deny events. But with each source having its own set of collectors generating different data flows, there is often too much aggregate information for you to analyze and take action in a timely and cost-effective manner. This recipe demonstrates how you can intercept those data flows as they arrive from their sources, and analyze or filter the data before sending to an aggregator.

Learn to optimize Splunk data ingestion by using the Splunk S2S Source connector to offload data that would normally be sent to a Splunk HTTP Event Collector (HEC). The stream processing application filters for “deny” events, removes unnecessary fields to reduce message size, and sends the new, targeted set of events to Splunk for cost-effective indexing.

You can also extend this solution to intercept data from a variety of SIEM vendors and create a vendor-agnostic solution that leverages multiple tools and analytics solutions.

Run it

Setup your environment

1

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial. ksqlDB supports SQL language for extracting, transforming, and loading events within your Kafka cluster.

Read the data in

2

Confluent Cloud offers pre-built, fully managed connectors that make it easy to quickly connect to popular data sources and end systems in the cloud. However, if you want to stream specific events that don’t have an available connector in Confluent Cloud, you can run your own connector and send the data to your Kafka cluster. This tutorial shows how you can run your own connector locally.

If you cannot connect to a real data source with properly formatted data, or if you just want to execute this tutorial without external dependencies, no worries! After you create the ksqlDB application, insert mock data into the streams.

This tutorial shows the source as Cisco Adaptive Security Appliance (ASA) and the Splunk S2S Source connector should be run on the same host as the Splunk UF, but the same logic can be applied to any type of device.

To stream ASA data into a Kafka topic called splunk, create the Dockerfile below to bundle a Kafka Connect worker with the kafka-connect-splunk-s2s connector:

FROM confluentinc/cp-server-connect-base:7.1.0

ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"

RUN confluent-hub install --no-prompt confluentinc/kafka-connect-splunk-s2s:1.0.5

Build the custom Docker image with this command:

docker build \
   -t localbuild/connect_distributed_with_splunk-s2s:1.0.5 \
   -f Dockerfile .

Next, create a docker-compose.yml file with the following content, substituting your Confluent Cloud connection information:

---
version: '2'
services:

  connect:
    image: localbuild/connect_distributed_with_splunk-s2s:1.0.5
    hostname: connect
    container_name: connect
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: < BOOTSTRAP SERVER >
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: "connect"
      CONNECT_CONFIG_STORAGE_TOPIC: recipe-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: recipe-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: recipe-connect-status
      CONNECT_REPLICATION_FACTOR: 3
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 3
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
      CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
      CONNECT_LOG4J_LOGGERS: org.reflections=ERROR
      # Connect worker
      CONNECT_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_SASL_JAAS_CONFIG: < SASL JAAS CONFIG >
      CONNECT_SASL_MECHANISM: PLAIN
      # Connect producer
      CONNECT_PRODUCER_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_PRODUCER_SASL_JAAS_CONFIG: < SASL JAAS CONFIG >
      CONNECT_PRODUCER_SASL_MECHANISM: PLAIN
      # Connect consumer
      CONNECT_CONSUMER_SECURITY_PROTOCOL: SASL_SSL
      CONNECT_CONSUMER_SASL_JAAS_CONFIG: < SASL JAAS CONFIG >
      CONNECT_CONSUMER_SASL_MECHANISM: PLAIN

Run the container with the Connect worker:

docker-compose up -d

Create a configuration file, connector-splunk-s2s.config, for the Splunk S2S Source connector, specifying the port that the connector will use:

{
  "connector.class"          : "io.confluent.connect.splunk.s2s.SplunkS2SSourceConnector",
  "name"                     : "recipe-splunk-s2s-source",
  "splunk.s2s.port"          : "<port>",
  "kafka.topic"              : "splunk",
  "tasks.max"                : "1"
}

Submit that connector to the Connect worker:

curl -X POST -H "Content-Type: application/json" --data @connector-splunk-s2s.config http://localhost:8083/connectors

You now should have ASA events being written to the splunk topic in Confluent Cloud.

Execute ksqlDB code

3
When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.
SET 'auto.offset.reset' = 'earliest';

CREATE STREAM splunk (
  event VARCHAR,
  time BIGINT,
  host VARCHAR,
  source VARCHAR,
  index VARCHAR,
  sourcetype VARCHAR
) WITH (
  KAFKA_TOPIC = 'splunk-s2s-events',
  VALUE_FORMAT = 'json',
  PARTITIONS = 6
);

-- Split message into array
CREATE STREAM splunk_parsed WITH (KAFKA_TOPIC = 'splunk_parsed') AS SELECT
  time,
  host,
  sourcetype,
  REGEXP_SPLIT_TO_ARRAY(event, ' ')
FROM splunk
WHERE sourcetype = 'cisco:asa'
EMIT CHANGES;

-- Filter messages for Cisco ASA and where action is Deny
CREATE STREAM splunk_filtered WITH (KAFKA_TOPIC = 'splunk_filtered') AS SELECT
  time,
  host,
  sourcetype,
  KSQL_COL_0[2] AS action,
  KSQL_COL_0[3] AS protocol,
  SPLIT(REGEXP_SPLIT_TO_ARRAY(KSQL_COL_0[5], 'inside:')[2], '/')[1] as inside_ip,
  SPLIT(REGEXP_SPLIT_TO_ARRAY(KSQL_COL_0[5], 'inside:')[2], '/')[2] as inside_port,
  SPLIT(REGEXP_SPLIT_TO_ARRAY(KSQL_COL_0[7], 'outside:')[2], '/')[1] as outside_ip,
  SPLIT(REGEXP_SPLIT_TO_ARRAY(KSQL_COL_0[7], 'outside:')[2], '/')[2] as outside_port
FROM splunk_parsed
WHERE KSQL_COL_0[2] = 'Deny'
EMIT CHANGES;

Test with mock data

4

If you are you not running source connectors to produce events, you can use ksqlDB INSERT INTO statements to insert mock data into the source topics:

INSERT INTO splunk (event, time, host, source, index, sourcetype) VALUES ('%ASA-6-302016: Teardown UDP connection 841023 for outside:75.75.75.75/53 to inside:192.168.9.50/60995 duration 0:00:00 bytes 104', UNIX_TIMESTAMP(), 'firewall-1', '/opt/splunkforwarder/splunk-s2s-test.log', 'default', 'cisco:asa');
INSERT INTO splunk (event, time, host, source, index, sourcetype) VALUES ('%ASA-2-106001: Inbound TCP connection denied from 34.215.24.225/443 to 192.168.10.18/34312 flags FIN PSH ACK  on interface outside', UNIX_TIMESTAMP(), 'firewall-1', '/opt/splunkforwarder/splunk-s2s-test.log', 'default', 'cisco:asa');
INSERT INTO splunk (event, time, host, source, index, sourcetype) VALUES ('%ASA-4-106023: Deny tcp src inside:192.168.9.50/52596 dst outside:204.107.141.110/8443 by access-group "inside_access_in" [0x0, 0x0]', UNIX_TIMESTAMP(), 'firewall-1', '/opt/splunkforwarder/splunk-s2s-test.log', 'default', 'cisco:asa');
INSERT INTO splunk (event, time, host, source, index, sourcetype) VALUES ('%ASA-4-106023: Deny tcp src inside:192.168.9.50/52597 dst outside:204.107.141.110/8443 by access-group "inside_access_in" [0x0, 0x0]', UNIX_TIMESTAMP(), 'firewall-1', '/opt/splunkforwarder/splunk-s2s-test.log', 'default', 'cisco:asa');
INSERT INTO splunk (event, time, host, source, index, sourcetype) VALUES ('%ASA-6-302013: Built outbound TCP connection 841025 for outside:34.215.24.225/443 (34.215.24.225/443) to inside:192.168.9.30/50398 (192.168.10.18/4550)', UNIX_TIMESTAMP(), 'firewall-1', '/opt/splunkforwarder/splunk-s2s-test.log', 'default', 'cisco:asa');

To validate that this recipe is working, run the following query:

SELECT * FROM splunk_filtered EMIT CHANGES LIMIT 2;

Your output should resemble:

+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+
|TIME          |HOST          |SOURCETYPE    |ACTION        |PROTOCOL      |INSIDE_IP     |INSIDE_PORT   |OUTSIDE_IP    |OUTSIDE_PORT  |
+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+--------------+
|1646857365018 |firewall-1    |cisco:asa     |Deny          |tcp           |192.168.9.50  |52597         |204.107.141.11|8443          |
|              |              |              |              |              |              |              |0             |              |
|1646857364915 |firewall-1    |cisco:asa     |Deny          |tcp           |192.168.9.50  |52596         |204.107.141.11|8443          |
|              |              |              |              |              |              |              |0             |              |
Limit Reached
Query terminated

Write the data out

5

After processing the data, send the targeted set of events to Splunk for indexing:

-- Send data to Splunk
CREATE SINK CONNECTOR IF NOT EXISTS cybersecurity_splunk WITH (
  'connector.class'          = 'SplunkSink',
  'name'                     = 'recipe-cybersecurity-splunk',
  'input.data.format'        = 'JSON',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'topics'                   = 'splunk_filtered',
  'splunk.hec.uri'           = '<splunk-indexers>',
  'splunk.hec.token'         = '<Splunk HTTP Event Collector token>',
  'tasks.max'                = '1'
);

Cleanup

6

To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is asynchronously deleted as well.

DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, remove those as well (substitute connector name).

DROP CONNECTOR IF EXISTS <connector_name>;