Monitor security threats by analyzing and filtering Confluent Cloud audit logs

Edit this page
Audit logs are often captured and stored in SIEM tools to find malicious insiders, monitor changes to policies, protect against data leakage, and ensure regulatory compliance. However, indefinitely storing all audit logs in SIEM tools, irrespective of whether they are of high or low value can be an expensive proposition, often forcing tradeoffs between cost, flexibility, and visibility.

This recipe demonstrates how to aggregate Confluent Cloud audit logs in a Kafka topic, filter for specific events, and forward them to Splunk for indexing via the Splunk Sink connector or route your data to a more cost-effective destination for long-term storage to reduce data indexing, analysis, and storage costs.

To see this tutorial in action, click here to launch it now. It will pre-populate the ksqlDB code in the Confluent Cloud Console and provide mock data or stubbed out code to connect to a real data source. For more detailed instructions, follow the steps below.

Run it

Set up your environment

1

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial. ksqlDB supports SQL language for extracting, transforming, and loading events within your Kafka cluster.

Read the data in

2

This tutorial assumes that the audit log records are already located in a Confluent Cloud cluster directly controlled by the end user. If not, then the user will need to copy the audit logs from the original cluster to the one where they have their ksqlDB cluster running. If you have a dedicated cluster one option would be to use cluster linking.

If you cannot connect to a real data source with properly formatted data, or if you just want to execute this tutorial without external dependencies, no worries! After you create the ksqlDB application, insert mock data into the streams.

Execute ksqlDB code

3

The stream processing application filters for events involving operations on topics, but you can review the structure of Confluent audit logs and extend this solution to filter for any auditable event. The first stream will set a schema for the Confluent Cloud audit log data, which will enable you to selectively pull out only the parts that you need to analyze. We’ll discuss this more in the Explanation section.

When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.
SET 'auto.offset.reset' = 'earliest';

CREATE STREAM audit_log_events (
  id VARCHAR,
  source VARCHAR,
  specversion VARCHAR,
  type VARCHAR,
  time VARCHAR,
  datacontenttype VARCHAR,
  subject VARCHAR,
  confluentRouting STRUCT<route VARCHAR>,
  data STRUCT<
    serviceName VARCHAR,
    methodName VARCHAR,
    resourceName VARCHAR,
    authenticationInfo STRUCT<principal VARCHAR>,
    authorizationInfo STRUCT<
        granted BOOLEAN,
        operation VARCHAR,
        resourceType VARCHAR,
        resourceName VARCHAR,
        patternType VARCHAR,
        superUserAuthorization BOOLEAN
        >,
    request STRUCT<
        correlation_id VARCHAR,
        client_id VARCHAR
        >,
    requestMetadata STRUCT<client_address VARCHAR>
    >
) WITH (
  KAFKA_TOPIC = 'confluent-audit-log-events',
  VALUE_FORMAT = 'JSON',
  TIMESTAMP = 'time',
  TIMESTAMP_FORMAT = 'yyyy-MM-dd''T''HH:mm:ss.SSSX',
  PARTITIONS = 6
);


-- Application logic
CREATE STREAM audit_log_topics WITH (KAFKA_TOPIC = 'topic-operations-audit-log') AS
SELECT
  time,
  data
FROM  audit_log_events
WHERE data->authorizationinfo->resourcetype = 'Topic'
EMIT CHANGES;

Test with mock data

4

If you are you not running source connectors to produce events, you can use ksqlDB INSERT INTO statements to insert mock data into the source topics:

INSERT INTO audit_log_events (id, source, specversion, type, time, datacontenttype, subject, confluentrouting, data) VALUES('889bdcd9-a378-4bfe-8860-180ef8efd208', 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q', '1.0', 'io.confluent.kafka.server/authorization', '2019-10-24T16:15:48.355Z', 'application/json',  'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic',  STRUCT(route := 'confluent-audit-log-events'),  STRUCT(serviceName := 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q', methodName := 'kafka.CreateTopics', resourceName := 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic', authenticationInfo := STRUCT(principal := 'User:admin'), authorizationInfo := STRUCT(granted := true, operation := 'Create', resourceType := 'Topic', resourceName := 'app3-topic', patternType := 'LITERAL', superUserAuthorization := true), request := STRUCT(correlation_id := '3', client_id := 'adminclient-6'), requestMetadata := STRUCT(client_address := '/127.0.0.1')));
INSERT INTO audit_log_events (id, source, specversion, type, time, datacontenttype, subject, confluentrouting, data) VALUES('889bdcd9-a378-4bfe-8860-180ef8efd209', 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q', '1.0', 'io.confluent.kafka.server/authorization', '2019-10-24T17:16:43.355Z', 'application/json',  'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic',  STRUCT(route := 'confluent-audit-log-events'),  STRUCT(serviceName := 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q', methodName := 'kafka.CreateTopics', resourceName := 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic', authenticationInfo := STRUCT(principal := 'User:admin'), authorizationInfo := STRUCT(granted := true, operation := 'Create', resourceType := 'Broker', resourceName := 'app3-topic', patternType := 'LITERAL', superUserAuthorization := true), request := STRUCT(correlation_id := '3', client_id := 'adminclient-6'), requestMetadata := STRUCT(client_address := '/127.0.0.1')));
INSERT INTO audit_log_events (id, source, specversion, type, time, datacontenttype, subject, confluentrouting, data) VALUES('889bdcd9-a378-4bfe-8860-180ef8efd206', 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q', '1.0', 'io.confluent.kafka.server/authorization', '2019-10-25T18:17:32.355Z', 'application/json',  'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic',  STRUCT(route := 'confluent-audit-log-events'),  STRUCT(serviceName := 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q', methodName := 'kafka.CreateTopics', resourceName := 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic', authenticationInfo := STRUCT(principal := 'User:admin'), authorizationInfo := STRUCT(granted := true, operation := 'Delete', resourceType := 'Topic', resourceName := 'app4-topic', patternType := 'LITERAL', superUserAuthorization := true), request := STRUCT(correlation_id := '3', client_id := 'adminclient-8'), requestMetadata := STRUCT(client_address := '/127.0.0.1')));
INSERT INTO audit_log_events (id, source, specversion, type, time, datacontenttype, subject, confluentrouting, data) VALUES('889bdcd9-a378-4bfe-8860-180ef8efd205', 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q', '1.0', 'io.confluent.kafka.server/authorization', '2019-10-25T19:18:21.355Z', 'application/json',  'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic',  STRUCT(route := 'confluent-audit-log-events'),  STRUCT(serviceName := 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q', methodName := 'kafka.CreateTopics', resourceName := 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic', authenticationInfo := STRUCT(principal := 'User:admin'), authorizationInfo := STRUCT(granted := true, operation := 'Create', resourceType := 'Topic', resourceName := 'app5-topic', patternType := 'LITERAL', superUserAuthorization := true), request := STRUCT(correlation_id := '3', client_id := 'adminclient-6'), requestMetadata := STRUCT(client_address := '/127.0.0.1')));
INSERT INTO audit_log_events (id, source, specversion, type, time, datacontenttype, subject, confluentrouting, data) VALUES('889bdcd9-a378-4bfe-8860-180ef8efd204', 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q', '1.0', 'io.confluent.kafka.server/authorization', '2019-10-26T20:19:33.355Z', 'application/json',  'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic',  STRUCT(route := 'confluent-audit-log-events'),  STRUCT(serviceName := 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q', methodName := 'kafka.CreateTopics', resourceName := 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic', authenticationInfo := STRUCT(principal := 'User:admin'), authorizationInfo := STRUCT(granted := true, operation := 'Delete', resourceType := 'Topic', resourceName := 'app9-topic', patternType := 'LITERAL', superUserAuthorization := true), request := STRUCT(correlation_id := '3', client_id := 'adminclient-5'), requestMetadata := STRUCT(client_address := '/127.0.0.1')));
INSERT INTO audit_log_events (id, source, specversion, type, time, datacontenttype, subject, confluentrouting, data) VALUES('889bdcd9-a378-4bfe-8860-180ef8efd203', 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q', '1.0', 'io.confluent.kafka.server/authorization', '2019-10-26T21:20:34.355Z', 'application/json',  'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic',  STRUCT(route := 'confluent-audit-log-events'),  STRUCT(serviceName := 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q', methodName := 'kafka.CreateTopics', resourceName := 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic', authenticationInfo := STRUCT(principal := 'User:admin'), authorizationInfo := STRUCT(granted := true, operation := 'Modify', resourceType := 'Topic', resourceName := 'app5-topic', patternType := 'LITERAL', superUserAuthorization := true), request := STRUCT(correlation_id := '3', client_id := 'adminclient-6'), requestMetadata := STRUCT(client_address := '/127.0.0.1')));
INSERT INTO audit_log_events (id, source, specversion, type, time, datacontenttype, subject, confluentrouting, data) VALUES('889bdcd9-a378-4bfe-8860-180ef8efd202', 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q', '1.0', 'io.confluent.kafka.server/authorization', '2019-10-24T16:15:12.355Z', 'application/json',  'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic',  STRUCT(route := 'confluent-audit-log-events'),  STRUCT(serviceName := 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q', methodName := 'kafka.CreateTopics', resourceName := 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic', authenticationInfo := STRUCT(principal := 'User:admin'), authorizationInfo := STRUCT(granted := true, operation := 'Create', resourceType := 'Broker', resourceName := 'app3-topic', patternType := 'LITERAL', superUserAuthorization := true), request := STRUCT(correlation_id := '3', client_id := 'adminclient-6'), requestMetadata := STRUCT(client_address := '/127.0.0.1')));
INSERT INTO audit_log_events (id, source, specversion, type, time, datacontenttype, subject, confluentrouting, data) VALUES('889bdcd9-a378-4bfe-8860-180ef8efd201', 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q', '1.0', 'io.confluent.kafka.server/authorization', '2019-10-24T16:15:16.355Z', 'application/json',  'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic',  STRUCT(route := 'confluent-audit-log-events'),  STRUCT(serviceName := 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q', methodName := 'kafka.CreateTopics', resourceName := 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic', authenticationInfo := STRUCT(principal := 'User:admin'), authorizationInfo := STRUCT(granted := true, operation := 'Create', resourceType := 'Broker', resourceName := 'app3-topic', patternType := 'LITERAL', superUserAuthorization := true), request := STRUCT(correlation_id := '3', client_id := 'adminclient-6'), requestMetadata := STRUCT(client_address := '/127.0.0.1')));
INSERT INTO audit_log_events (id, source, specversion, type, time, datacontenttype, subject, confluentrouting, data) VALUES('889bdcd9-a378-4bfe-8860-180ef8efd200', 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q', '1.0', 'io.confluent.kafka.server/authorization', '2019-10-24T16:15:17.355Z', 'application/json',  'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic',  STRUCT(route := 'confluent-audit-log-events'),  STRUCT(serviceName := 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q', methodName := 'kafka.CreateTopics', resourceName := 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic', authenticationInfo := STRUCT(principal := 'User:admin'), authorizationInfo := STRUCT(granted := true, operation := 'Create', resourceType := 'Broker', resourceName := 'app3-topic', patternType := 'LITERAL', superUserAuthorization := true), request := STRUCT(correlation_id := '3', client_id := 'adminclient-6'), requestMetadata := STRUCT(client_address := '/127.0.0.1')));
INSERT INTO audit_log_events (id, source, specversion, type, time, datacontenttype, subject, confluentrouting, data) VALUES('889bdcd9-a378-4bfe-8860-180ef8efd218', 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q', '1.0', 'io.confluent.kafka.server/authorization', '2019-10-24T16:15:48.355Z', 'application/json',  'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic',  STRUCT(route := 'confluent-audit-log-events'),  STRUCT(serviceName := 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q', methodName := 'kafka.CreateTopics', resourceName := 'crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic', authenticationInfo := STRUCT(principal := 'User:admin'), authorizationInfo := STRUCT(granted := true, operation := 'Create', resourceType := 'Broker', resourceName := 'app3-topic', patternType := 'LITERAL', superUserAuthorization := true), request := STRUCT(correlation_id := '3', client_id := 'adminclient-6'), requestMetadata := STRUCT(client_address := '/127.0.0.1')));

To validate that this recipe is working, run the following query:

SELECT * FROM audit_log_topics EMIT CHANGES LIMIT 5;

Your output should resemble:

+---------------------------------------------------------------------------+---------------------------------------------------------------------------+
|TIME                                                                       |DATA                                                                       |
+---------------------------------------------------------------------------+---------------------------------------------------------------------------+
|2019-10-24T16:15:48.355Z                                                   |{SERVICENAME=crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q, METHODNAME=kafka.CreateTo|
|                                                                           |pics, RESOURCENAME=crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic, AU|
|                                                                           |THENTICATIONINFO={PRINCIPAL=User:admin}, AUTHORIZATIONINFO={GRANTED=true, O|
|                                                                           |PERATION=Create, RESOURCETYPE=Topic, RESOURCENAME=app3-topic, PATTERNTYPE=L|
|                                                                           |ITERAL, SUPERUSERAUTHORIZATION=true}, REQUEST={CORRELATION_ID=3, CLIENT_ID=|
|                                                                           |adminclient-6}, REQUESTMETADATA={CLIENT_ADDRESS=/127.0.0.1}}               |
|2019-10-25T18:17:32.355Z                                                   |{SERVICENAME=crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q, METHODNAME=kafka.CreateTo|
|                                                                           |pics, RESOURCENAME=crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic, AU|
|                                                                           |THENTICATIONINFO={PRINCIPAL=User:admin}, AUTHORIZATIONINFO={GRANTED=true, O|
|                                                                           |PERATION=Delete, RESOURCETYPE=Topic, RESOURCENAME=app4-topic, PATTERNTYPE=L|
|                                                                           |ITERAL, SUPERUSERAUTHORIZATION=true}, REQUEST={CORRELATION_ID=3, CLIENT_ID=|
|                                                                           |adminclient-8}, REQUESTMETADATA={CLIENT_ADDRESS=/127.0.0.1}}               |
|2019-10-26T20:19:33.355Z                                                   |{SERVICENAME=crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q, METHODNAME=kafka.CreateTo|
|                                                                           |pics, RESOURCENAME=crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic, AU|
|                                                                           |THENTICATIONINFO={PRINCIPAL=User:admin}, AUTHORIZATIONINFO={GRANTED=true, O|
|                                                                           |PERATION=Delete, RESOURCETYPE=Topic, RESOURCENAME=app9-topic, PATTERNTYPE=L|
|                                                                           |ITERAL, SUPERUSERAUTHORIZATION=true}, REQUEST={CORRELATION_ID=3, CLIENT_ID=|
|                                                                           |adminclient-5}, REQUESTMETADATA={CLIENT_ADDRESS=/127.0.0.1}}               |
|2019-10-26T21:20:34.355Z                                                   |{SERVICENAME=crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q, METHODNAME=kafka.CreateTo|
|                                                                           |pics, RESOURCENAME=crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic, AU|
|                                                                           |THENTICATIONINFO={PRINCIPAL=User:admin}, AUTHORIZATIONINFO={GRANTED=true, O|
|                                                                           |PERATION=Modify, RESOURCETYPE=Topic, RESOURCENAME=app5-topic, PATTERNTYPE=L|
|                                                                           |ITERAL, SUPERUSERAUTHORIZATION=true}, REQUEST={CORRELATION_ID=3, CLIENT_ID=|
|                                                                           |adminclient-6}, REQUESTMETADATA={CLIENT_ADDRESS=/127.0.0.1}}               |
|2019-10-25T19:18:21.355Z                                                   |{SERVICENAME=crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q, METHODNAME=kafka.CreateTo|
|                                                                           |pics, RESOURCENAME=crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic, AU|
|                                                                           |THENTICATIONINFO={PRINCIPAL=User:admin}, AUTHORIZATIONINFO={GRANTED=true, O|
|                                                                           |PERATION=Create, RESOURCETYPE=Topic, RESOURCENAME=app5-topic, PATTERNTYPE=L|
|                                                                           |ITERAL, SUPERUSERAUTHORIZATION=true}, REQUEST={CORRELATION_ID=3, CLIENT_ID=|
|                                                                           |adminclient-6}, REQUESTMETADATA={CLIENT_ADDRESS=/127.0.0.1}}               |
Limit Reached
Query terminated

Write the data out

5

After processing the data, send it to Splunk.

-- Send data to Splunk
CREATE SINK CONNECTOR IF NOT EXISTS recipe_splunk_filter_logs WITH (
  'connector.class'          = 'SplunkSink',
  'input.data.format'        = 'JSON',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'topics'                   = 'topic-operations-audit-log',
  'splunk.hec.uri'           = '<splunk-indexers>',
  'splunk.hec.token'         = '<Splunk HTTP Event Collector token>',
  'tasks.max'                = '1'
);

Explanation

6

The first step of this tutorial was to create a stream from the Confluent Cloud audit log topic. When creating a stream, you can assign a schema or model the JSON. After applying a model or schema, you can directly query nested sections of the data, which allows you to apply filters and only pull back the log entries that interest you. Additionally, you can cherry-pick selected fields from the nested structure, so you can limit the amount of data you retrieve with the query. The second part of the tutorial leverages the applied schema to specify that you want only log entries pertaining to events on Kafka topics.

The JSON structure of the Confluent log entries contains several fields:

{   "id": "889bdcd9-a378-4bfe-8860-180ef8efd208",
    "source": "crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q",
    "specversion": "1.0",
    "type": "io.confluent.kafka.server/authorization",
    "time": "2019-10-24T16:15:48.355Z",  <---Time of the event
    "datacontenttype": "application/json",
    "subject": "crn:///kafka=8caBa-0_Tu-2k3rKSxY64Q/topic=app3-topic",
    "confluentRouting": {
        "route": "confluent-audit-log-events"
    },
    "data": {  <--- Relevant data of the event
      ...
    "authorizationInfo": {
        "granted": true,
        "operation": "Create",
        "resourceType": "Topic",  <--- You only want events involving topics
        "resourceName": "app3-topic",
        "patternType": "LITERAL",
        "superUserAuthorization": true
      }
     ...
    }
}

Of these fields, you’re only interested in the time of the event and the data field. The data field contains the specifics of the log event, which in this case is any operation where the resourceType is Topic. So the first step is to apply a schema to this JSON:

CREATE STREAM audit_log_events (
  id VARCHAR,
  source VARCHAR,
  specversion VARCHAR,
  type VARCHAR,
  time VARCHAR,
  datacontenttype VARCHAR,
  subject VARCHAR,
  confluentRouting STRUCT<route VARCHAR >,
  data STRUCT<
    serviceName VARCHAR,
    methodName VARCHAR,
    resourceName VARCHAR,
    authenticationInfo STRUCT<principal VARCHAR>,
....

) WITH (
  KAFKA_TOPIC = 'confluent-audit-log-events',
  VALUE_FORMAT='JSON',
  TIMESTAMP='time',
  TIMESTAMP_FORMAT='yyyy-MM-dd''T''HH:mm:ss.SSSX',
  PARTITIONS = 6
);

By supplying a schema to the ksqlDB STREAM, you are describing the structure of the data to ksqlDB. The top-level fields (id to data) correspond to column names. You’ll notice that there are nested STRUCT fields representing nested JSON objects within the structure. In the WITH statement you specify that ksqlDB should use the time field for the record timestamp and the format to parse it-TIMESTAMP_FORMAT.

Now that you’ve described the structure of the data (by applying a schema), you can create another STREAM that will contain only the data of interest. Let’s review this query in two parts—the CREATE statement and the SELECT statement.

CREATE STREAM audit_log_topics
  WITH (
  KAFKA_TOPIC='topic-operations-audit-log',
  PARTITIONS=6
);

This CREATE STREAM statement specifies to use (or create, if it doesn’t exist yet) a Kafka topic to store the results of the stream.

The SELECT part of the query is where you can drill down in the original stream and pull out only the records that interest you. Let’s take a look at each line:

SELECT time, data

This specifies that you want only the time field and the nested data entry from the original JSON. In ksqlDB, you can access nested JSON objects using the operator.

FROM  audit_log_events

The FROM clause simply tells ksqlDB to pull the records from the original stream that you created to model the Confluent log data.

WHERE data->authorizationinfo->resourcetype = 'Topic'

In this WHERE statement, you use the operator to drill down through several layers of nested JSON. This statement specifies that the new stream will contain only entries involving topic operations.

Cleanup

7

To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is asynchronously deleted as well.

DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, remove those as well (substitute connector name).

DROP CONNECTOR IF EXISTS <connector_name>;