Analyze political campaign fundraising performance

Edit this page
Oftentimes political analysts need to perform in-depth financial analysis to get insights into campaign performance, predict future contributions from their base of support, and optimize their ongoing strategy. Data from the Federal Election Commission of the United States of America provides all the statements and reports filed with the Commission to help feed this analysis. This recipe walks through how to leverage Kafka and ksqlDB to analyze and slice this campaign financial data in various ways, such as grouping by party affiliation, counting contributions for particular candidate, and more.

To see this tutorial in action, click here to launch it now. It will pre-populate the ksqlDB code in the Confluent Cloud Console and provide mock data or stubbed out code to connect to a real data source. For more detailed instructions, follow the steps below.

Run it

Setup your environment

1

Provision a Kafka cluster in Confluent Cloud.

Once your Confluent Cloud cluster is available, create a ksqlDB application and navigate to the ksqlDB editor to execute this tutorial. ksqlDB supports SQL language for extracting, transforming, and loading events within your Kafka cluster.

Execute ksqlDB code

2

ksqlDB processes data in realtime, and you can also import and export data straight from ksqlDB from popular data sources and end systems in the cloud. This tutorial shows you how to run the recipe in one of two ways: using connector(s) to any supported data source or using ksqlDB’s INSERT INTO functionality to mock the data.

If you cannot connect to a real data source with properly formatted data, or if you just want to execute this tutorial without external dependencies, no worries! Remove the CREATE SOURCE CONNECTOR commands and insert mock data into the streams.

When creating the initial STREAM or TABLE, if the backing Kafka topic already exists, then the PARTITIONS property may be omitted.

-- Substitute your parameter values in the connector configurations below.
-- If you do not want to connect to a real data source, remove the CREATE SOURCE CONNECTOR commands,
-- and add the INSERT INTO commands to insert mock data into the streams

CREATE SOURCE CONNECTOR IF NOT EXISTS recipe_postgres_campaign_finance WITH (
  'connector.class'          = 'PostgresSource',
  'kafka.api.key'            = '<my-kafka-api-key>',
  'kafka.api.secret'         = '<my-kafka-api-secret>',
  'connection.host'          = '<database-host>',
  'connection.port'          = '5432',
  'connection.user'          = 'postgres',
  'connection.password'      = '<database-password>',
  'db.name'                  = '<db-name>',
  'table.whitelist'          = 'contributions',
  'timestamp.column.name'    = 'created_at',
  'output.data.format'       = 'JSON',
  'db.timezone'              = 'UTC',
  'tasks.max'                = '1'
);

SET 'auto.offset.reset' = 'earliest';

-- Create a Stream for the campaign finance data
CREATE STREAM campaign_finance (
  time BIGINT,
  cand_id VARCHAR,
  party_affiliation VARCHAR,
  contribution BIGINT
) WITH (
  KAFKA_TOPIC = 'campaign_finance',
  PARTITIONS = 6,
  VALUE_FORMAT = 'JSON'
);

-- Categorize donations by amount
CREATE STREAM categorization_donations WITH (KAFKA_TOPIC = 'categorization_donations') AS
SELECT
  FORMAT_TIMESTAMP(FROM_UNIXTIME(time), 'yyyy-MM-dd HH:mm:ss') AS ts,
  party_affiliation,
  cand_id,
  CASE
    WHEN contribution < 500 THEN 'small'
    WHEN contribution < 2900 THEN 'medium'
    ELSE 'large'
  END AS category
FROM campaign_finance
EMIT CHANGES;

-- Get count of "small" contributions
CREATE TABLE contributions_small_count WITH (KAFKA_TOPIC = 'contributions_small_count', KEY_FORMAT='JSON') AS
SELECT
  category,
  party_affiliation,
  COUNT(category) AS count_contributions
FROM categorization_donations
WHERE category = 'small'
GROUP BY category, party_affiliation;

Test with mock data

3

If you are you not running source connectors to produce events, you can use ksqlDB INSERT INTO statements to insert mock data into the source topics:

INSERT INTO campaign_finance (time, cand_id, party_affiliation, contribution) VALUES (UNIX_TIMESTAMP(), 'V87676771', 'REP', 40);
INSERT INTO campaign_finance (time, cand_id, party_affiliation, contribution) VALUES (UNIX_TIMESTAMP(), 'W84973353', 'DEM', 2700);
INSERT INTO campaign_finance (time, cand_id, party_affiliation, contribution) VALUES (UNIX_TIMESTAMP(), 'W69512271', 'IND', 1200);
INSERT INTO campaign_finance (time, cand_id, party_affiliation, contribution) VALUES (UNIX_TIMESTAMP(), 'C89316269', 'IND', 400);
INSERT INTO campaign_finance (time, cand_id, party_affiliation, contribution) VALUES (UNIX_TIMESTAMP(), 'Y91816393', 'REP', 900);
INSERT INTO campaign_finance (time, cand_id, party_affiliation, contribution) VALUES (UNIX_TIMESTAMP(), 'F64223842', 'DEM', 10);
INSERT INTO campaign_finance (time, cand_id, party_affiliation, contribution) VALUES (UNIX_TIMESTAMP(), 'G11293665', 'REP', 60);
INSERT INTO campaign_finance (time, cand_id, party_affiliation, contribution) VALUES (UNIX_TIMESTAMP(), 'I41689173', 'REP', 3500);
INSERT INTO campaign_finance (time, cand_id, party_affiliation, contribution) VALUES (UNIX_TIMESTAMP(), 'N15754213', 'IND', 2000);
INSERT INTO campaign_finance (time, cand_id, party_affiliation, contribution) VALUES (UNIX_TIMESTAMP(), 'J11313219', 'REP', 30);

To validate that this recipe is working, you can run the following query:

SELECT * FROM contributions_small_count;

Your output should resemble:

+------------------------------------------------+------------------------------------------------+------------------------------------------------+
|CATEGORY                                        |PARTY_AFFILIATION                               |COUNT_CONTRIBUTIONS                             |
+------------------------------------------------+------------------------------------------------+------------------------------------------------+
|small                                           |IND                                             |1                                               |
|small                                           |DEM                                             |1                                               |
|small                                           |REP                                             |3                                               |
Query terminated

Cleanup

4

To clean up the ksqlDB resources created by this tutorial, use the ksqlDB commands shown below (substitute stream or topic name, as appropriate). By including the DELETE TOPIC clause, the topic backing the stream or table is asynchronously deleted as well.

DROP STREAM IF EXISTS <stream_name> DELETE TOPIC;
DROP TABLE IF EXISTS <table_name> DELETE TOPIC;

If you also created connectors, remove those as well (substitute connector name).

DROP CONNECTOR IF EXISTS <connector_name>;