In this tutorial, you will set up an S3 Sink Connector in Confluent Cloud to sink topic data to a private S3 bucket. We will create an AWS egress access point in Confluent Cloud that the connector will use to connect to S3, create a private S3 bucket to stream data into, and configure the bucket to only allow connectivity from a Confluent Cloud egress access point. This tutorial is partner to the Confluent Cloud AWS egress access points documentation here.
Before we can sink data into S3, we need some data! Let's use the Datagen Source Connector to populate a topic of product catalog updates.
confluent login --prompt --save
confluent environment list
confluent environment use <ENVIRONMENT_ID>
confluent kafka cluster list
confluent kafka cluster use <CLUSTER_ID>
confluent api-key create --resource <CLUSTER_ID>
{
"name" : "datagen-products",
"connector.class": "DatagenSource",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<API_KEY>",
"kafka.api.secret" : "<API_SECRET>",
"kafka.topic" : "products",
"output.data.format" : "AVRO",
"quickstart" : "PRODUCT",
"tasks.max" : "1"
}
confluent connect cluster create --config-file /tmp/datagen-connector.json
Follow these steps to create an AWS egress access point. Once the access point is Ready, note the Endpoint ID and the VPC endpoint DNS name. We will use these in our S3 bucket policy and in the S3 Sink Connector configuration.
Create a private S3 bucket in the same AWS region where your cluster is running. Note that the region must be specified both in the --region argument and as a location constraint. You can pick any bucket name; here we use products-private in the us-east-2 AWS region:
aws s3api create-bucket \
--bucket products-private \
--region us-east-2 \
--acl private \
--create-bucket-configuration LocationConstraint=us-east-2
Next, create the following bucket policy in a file /tmp/policy.json. Be sure to specify your bucket name in the Resource array, and the VPC endpoint ID that was generated when you provisioned the egress access point in Confluent Cloud:
{
"Version": "2012-10-17",
"Id": "Policy1415115909152",
"Statement": [
{
"Sid": "Access-to-specific-VPCE-only",
"Effect": "Deny",
"Principal": "*",
"Action": "s3:*",
"Resource": [
"arn:aws:s3:::products-private",
"arn:aws:s3:::products-private/*"
],
"Condition": {
"StringNotEquals": {
"aws:SourceVpce": "vpce-01234567890123456"
}
}
}
]
}
Now, use the AWS CLI to apply the policy to your bucket:
aws s3api put-bucket-policy --bucket products-private --policy file:///tmp/policy.json
Create an AWS access key and secret for use in the S3 Sink Connector configuration by following the instructions here.
Create a JSON file /tmp/s3-connector.json. Be sure to populate values specific to your setup:
{
"config": {
"topics": "products",
"schema.context.name": "default",
"input.data.format": "AVRO",
"connector.class": "S3_SINK",
"name": "S3-sink-products",
"kafka.auth.mode": "KAFKA_API_KEY",
"kafka.api.key": "<API_KEY>>",
"kafka.api.secret": "<API_SECRET>",
"aws.access.key.id": "<AWS_ACCESS_KEY_ID>",
"aws.secret.access.key": "<AWS_SECRET_ACCESS_KEY>",
"s3.bucket.name": "products-private",
"store.url": "https://bucket.vpce-01234567890123456-01234567.s3.us-east-2.vpce.amazonaws.com",
"s3.part.size": "5242880",
"s3.wan.mode": "false",
"output.data.format": "JSON",
"json.decimal.format": "BASE64",
"topics.dir": "topics",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"time.interval": "HOURLY",
"rotate.schedule.interval.ms": "-1",
"flush.size": "1000",
"behavior.on.null.values": "ignore",
"timezone": "UTC",
"subject.name.strategy": "TopicNameStrategy",
"tombstone.encoded.partition": "tombstone",
"locale": "en",
"s3.schema.partition.affix.type": "NONE",
"schema.compatibility": "NONE",
"store.kafka.keys": "false",
"value.converter.connect.meta.data": "true",
"store.kafka.headers": "false",
"s3.object.tagging": "false",
"max.poll.interval.ms": "300000",
"max.poll.records": "500",
"tasks.max": "1"
}
}
To validate that the connector is sinking data into S3, navigate to the Connectors page in the Confluent Cloud Console. Check that the connector state is Running and processing messages:
It may take some time for data to show up since the connector sinks on an hourly basis. You can also check in S3 to make sure that data is showing up in the bucket that you created above.
Once you have finished, clean up any resources created for this tutorial that you no longer need: