In-Memory Analytics for Kafka using DuckDB

Today’s data scientist has a plethora of options when processing data in Apache Kafka. In terms of stream processing, popular options are Kafka Streams, ksqlDB, and Apache Flink. In terms of real-time analytics, one can choose from Apache Druid, Clickhouse, Apache Pinot, and Apache Doris. One aspect of these systems is that they all support distributed processing, and so are capable of scaling to arbitrary amounts of data. However, the distributed nature of these offerings can lead to operational complexity.

Often a data scientist wants to quickly explore the data to get a better understanding of it without setting up a lot of infrastructure. This has led to the emergence of embedded OLAP databases, which support in-process exploratory data analysis. Databases in this category include Apache DataFusion, chDB, and DuckDB. DuckDB in particular has become especially popular, due to its ease of use and rich feature set, which includes:

  • support for querying and writing files in CSV, JSON and Parquet format
  • dynamic column selection, including use of regular expressions
  • PIVOT and UNPIVOT operations
  • windowing functions
  • full-text search
  • lambda functions, function chaining, and list comprehensions in SQL
  • rich type system, including JSON
  • enhancements to SQL to make it even easier to use
  • APIs for Java, go, C, C++, Node.js, Python, Julia, R, and others
  • etc.

The most popular formats for storing data in Kafka are Apache Avro, Protobuf, and JSON, all of which are supported by the Confluent Schema Registry. All of these formats support composing types using records, arrays, and unions. Unions are particularly interesting in the context of Kafka, because they can be used to support multiple types in the same topic. However, while relational databases often support the record type (usually called row type) and the array type, they usually do not support the union type. DuckDB is the only database that I know of that supports the union type, which makes it a natural target for storing and analyzing data which is originally in either Avro, Protobuf, or JSON format.

One way to load Kafka data into a relational database is to use the JDBC sink connector, which is part of the Apache Kafka Connect framework. However, setting up Kafka Connect involves a lot of moving parts, and the JDBC sink connector does not support DuckDB-specific functionality. For these reasons, I’ve created a small utility called kwack, which aims to do one thing well: load Kafka data into DuckDB, while making use of it’s rich type system, including support for unions.

Once Kafka data is loaded into DuckDB, one can make use of the myriad features of DuckDB. For example, you can quickly run SQL queries on the data, or even export DuckDB tables in Apache Parquet format. This means that by using kwack, you have a quick way to convert Kafka data in Apache Avro, Protobuf, and JSON to Apache Parquet files.

One can start kwack by simply specifying the Kafka broker, topic, and Schema Registry URL:

% kwack -b localhost:9092 -t mytopic -r http://localhost:8081
Welcome to kwack!
Enter "!help" for usage hints.

      ___(.)>
~~~~~~\___)~~~~~~

jdbc:duckdb::memory:>

By default, kwack will use a simple binary deserializer for the key, and for the value kwack will use a deserializer corresponding to the latest subject version in Schema Registry, where the subject is the topic name suffixed with “-value”. The deserializer can be customized to one that does not rely on Schema Registry (short, int, long, float, double, string, or binary) or one that does (latest or a specific schema ID).

For data that has a schema, kwack will map each type in Avro, Protobuf, and JSON Schema to the appropriate type in DuckDB, as follows:

Avro Protobuf JSON Schema DuckDB
boolean boolean boolean BOOLEAN
int int32, sint32, sfixed32 INTEGER
uint32, fixed32 UINTEGER
long int64. sint64, sfixed64 integer BIGINT
uint64, fixed64 UBIGINT
float float FLOAT
double double number DOUBLE
string string string VARCHAR
bytes, fixed bytes BLOB
enum enum enum ENUM
record message object STRUCT
array repeated array LIST
map map MAP
union oneof oneOf,anyOf UNION
decimal confluent.type.Decimal DECIMAL
date google.type.Date DATE
time-millis, time-micros google.type.TimeOfDay TIME
timestamp-millis TIMESTAMP_MS
timestamp-micros TIMESTAMP
timestamp-nanos google.protobuf.Timestamp TIMESTAMP_NS
duration google.protobuf.Duration INTERVAL
uuid UUID

Once kwack has been started, one can enter SQL commands at the prompt. (Note that if your topic name has a hyphen, you’ll need to surround it with quotes in the SQL query.)

% kwack -b localhost:9092 -t mytopic -r http://localhost:8081
Welcome to kwack!
Enter "!help" for usage hints.

      ___(.)>
~~~~~~\___)~~~~~~

jdbc:duckdb::memory:> select * from mytopic;
+--------+-----+-----+-----------------------------------------------------------+
| rowkey | f1  | f2  |                          rowinfo                          |
+--------+-----+-----+-----------------------------------------------------------+
| null   | hi  | 123 | {ksi=null, vsi=1, par=0, off=0, ts=1720286713492, hdr={}} |
| null   | bye | 456 | {ksi=null, vsi=1, par=0, off=1, ts=1720286719746, hdr={}} |
+--------+-----+-----+-----------------------------------------------------------+
2 rows selected (0.012 seconds)

By default all data from the topic will be loaded. One can use the -p option to specify a subset of partitions to load, or the -o option to specify an absolute offset, a relative offset from the end, or a timestamp (in ms) from which to start loading data. (Type kwack -h to see all options.)

The table created for the topic will have one column to represent the Kafka record key (rowkey) and one column to represent the Kafka record metadata (rowinfo). The rest of the columns will be the fields of the Kafka record value assuming that it is a record (Avro), message (Protobuf), or object (JSON); otherwise it will be a single column (rowval).

One can omit attributes of rowinfo, or even rowkey, by specifying exactly which attributes to add to the DuckDB table. Below we include only the partition and offset in rowinfo.

% kwack -b localhost:9092 -t mytopic -r http://localhost:8081 -a par -a off
Welcome to kwack!
Enter "!help" for usage hints.

      ___(.)>
~~~~~~\___)~~~~~~

jdbc:duckdb::memory:> select * from mytopic;
+-----+-----+----------------+
| f1  | f2  |    rowinfo     |
+-----+-----+----------------+
| hi  | 123 | {par=0, off=0} |
| bye | 456 | {par=0, off=1} |
+-----+-----+----------------+
2 rows selected (0.01 seconds)

One can also load multiple topics, and perform joins between the resulting DuckDB tables. Below we load two topics and perform a join on a shared column name.

% kwack -b localhost:9092 -t mytopic -t mytopic2 -r http://localhost:8081 -a none
Welcome to kwack!
Enter "!help" for usage hints.

      ___(.)>
~~~~~~\___)~~~~~~

jdbc:duckdb::memory:> select * from mytopic;
+-----+-----+
| f1  | f2  |
+-----+-----+
| hi  | 123 |
| bye | 456 |
+-----+-----+
2 rows selected (0.002 seconds)
jdbc:duckdb::memory:> select * from mytopic2;
+-----+-------+
| f1  |  f3   |
+-----+-------+
| hi  | world |
| bye | folks |
+-----+-------+
2 rows selected (0.003 seconds)
jdbc:duckdb::memory:> select * from mytopic join mytopic2 using (f1);
>
+-----+-----+-------+
| f1  | f2  |  f3   |
+-----+-----+-------+
| hi  | 123 | world |
| bye | 456 | folks |
+-----+-----+-------+
2 rows selected (0.002 seconds)

As mentioned, one can write DuckDB tables in Parquet format.

% kwack -b localhost:9092 -t mytopic -r http://localhost:8081
Welcome to kwack!
Enter "!help" for usage hints.

      ___(.)>
~~~~~~\___)~~~~~~

jdbc:duckdb::memory:> COPY mytopic to 'mytopic.parquet' (FORMAT 'parquet');
2 rows affected (0.007 seconds)

Additionally, one can persist the DuckDB database to a file, and then use the official DuckDB command line to perform analytical queries.

% kwack -b localhost:9092 -t mytopic -t mytopic2 -r http://localhost:8081 -a none -d /tmp/mydb
Welcome to kwack!
Enter "!help" for usage hints.

      ___(.)>
~~~~~~\___)~~~~~~

jdbc:duckdb:/tmp/mydb> !exit

% duckdb /tmp/mydb
v1.0.0 1f98600c2c
Enter ".help" for usage hints.
D .tables
mytopic mytopic2
D select * from mytopic;
┌─────────┬───────┐
│   f1    │  f2   │
│ varchar │ int32 │
├─────────┼───────┤
│ hi      │   123 │
│ bye     │   456 │
└─────────┴───────┘

One can pass a SQL query on the command line, and instead of entering interactive mode, kwack will output the results of the query.

% kwack -b localhost:9092 -t mytopic -r http://localhost:8081 -a par -a off -q "select * from mytopic"
{"f1":"hi","f2":123,"rowinfo":{"par":0,"off":0}}
{"f1":"bye","f2":456,"rowinfo":{"par":0,"off":1}}

Note that the output is in JSON format. This allows kwack to be used in conjunction with other tools like jq.

% kwack -b localhost:9092 -t mytopic -r http://localhost:8081 -a par -a off -q "select * from mytopic" | jq .f1
"hi"
"bye"

When using kwack with Confluent Cloud, it may be easier to pass all of the properties including credentials in a file, as follows:

% kwack -F kwack.properties
Welcome to kwack!
Enter "!help" for usage hints.

      ___(.)>
~~~~~~\___)~~~~~~

jdbc:duckdb::memory:>

With kwack, using Kafka to perform data science at the command line becomes easier than ever.

In-Memory Analytics for Kafka using DuckDB

6 thoughts on “In-Memory Analytics for Kafka using DuckDB

  1. Shawn Gordon says:

    This is really cool. I’m trying to get it working, and I’m getting stuck on the schema part of it. I want to do it without using a schema registry. is that possible? The docs kind of seem to indicate it, but I can’t quite work out how it would work.

    1. rayokota says:

      Thanks! You can pass an external Avro, Protobuf, or JSON schema. For example, to pass an external Protobuf schema, use this command:

      $ bin/kwack -b mybroker -t mytopic -v mytopic=proto:@/path/to/myschema.proto

      1. Shawn Gordon says:

        Thank you very much, that helps a lot. This will be a JSON file, and I’m rather new to this. Do you have a little example of what that would look like if my records look like this?
        {“carId”:”car-11″,”timestamp”:1721677574508,”latitude”:47.614966056179775,”longitude”:-122.3295268764045}

  2. Shawn Gordon says:

    oh, I meant to option that I’m putting this in a ‘properties’ file, so just would like to make sure I have that right.

    1. rayokota says:

      To specify in a properties file, you would use:

      value.serdes=mytopic=json:@/path/to/myschema.json

      For the JSON Schema, try

      {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "type": "object",
        "properties": {
          "carId": {
            "type": "string"
          },
          "timestamp": {
            "type": "integer"
          },
          "latitude": {
            "type": "number"
          },
          "longitude": {
            "type": "number"
          }
        }
      }
      

Leave a Reply to Shawn GordonCancel reply