JSONata: The Missing Declarative Language for Kafka Connect

JSONata is an expressive, declarative functional language that is as powerful as older transformation technologies such as XSLT, but with a concise syntax. When working with Confluent Schema Registry, JSONata can be used to define complex schema migration rules that can ensure that schema evolution does not break existing consumers.

Another area where JSONata can be useful is when defining transformations that need to occur in the context of Kafka Connect. Typically such transformations are performed using Simple Message Transforms (SMT). An SMT is typically implemented in Java and performs one small function, such as adding, modifying, or removing data or metadata in a Kafka Connect record. However, if an SMT does not exist for a desired transformation, one usually has to create a new custom SMT.

I’ve created a general-purpose JSONata SMT that can take a JSONata expression, so that instead of creating a new custom SMT, a user can simply provide a custom JSONata expression. Using the JSONata SMT, one can replace the behavior of most existing Kafka Connect SMTs. At the end of this article, I provide JSONata expressions for some of the more common SMTs that currently exist.

When using JSONata to transform a Kafka Connect record, the record is converted to the following structure, as an example:

{
  "topic" : "test",
  "kafkaPartition" : 1,
  "keySchema" : {
    "type" : "STRING"
  },
  "key" : "mykey",
  "valueSchema" : {
    "type" : "STRUCT",
    "fields" : {
      "first" : {
        "name" : "first",
        "index" : 0,
        "schema" : {
          "type" : "STRING"
        }
      },
      "last" : {
        "name" : "last",
        "index" : 1,
        "schema" : {
          "type" : "STRING"
        }
      },
      "email" : {
        "name" : "email",
        "index" : 2,
        "schema" : {
          "type" : "STRING"
        }
      }
    }
  },
  "value" : {
    "first" : "test",
    "last" : "user",
    "email" : "none@none.com"
  },
  "timestamp" : 1234,
  "headers" : [ {
    "key" : "key1",
    "value" : "value1",
    "schema" : {
      "type" : "STRING"
    }
  }, {
    "key" : "key2",
    "value" : "value2",
    "schema" : {
      "type" : "STRING"
    }
  } ]
}

JSONata operates by simply transforming the above structure. After the transformation, the structure is converted back to a Kafka Connect record. I won’t go into the details of understanding JSONata (instead see this blog), but I will provide some tips for writing JSONata expressions to transform Kafka Connect records.

When modifying JSON objects, the two most useful object functions are the following:

  • The $sift function removes properties from an object.
  • The $merge function merges two objects, and can be used to add properties to an existing object.

When modifying JSON arrays, the two most useful array functions are the following:

  • The $filter function removes items from an array.
  • The $append function concatenates two arrays, and can be used to add items to an existing array.

Furthermore, multiple expressions can be specified, which will be evaluated in sequential order, and each expression can be assigned to a variable for use in subsequent expressions. The result of the composite expression will be the last expression in the sequence.

($x := expr1; $y := expr2; $z := expr3)

The behavior of most Kafka Connect SMTs can be expressed using the above four functions for objects and arrays, with occasional use of variables to hold intermediate results. However, the power of JSONata is its ability to express custom transformations that aren’t yet embodied by existing SMTs.

The following sections show how JSONata can be used to replace the most common Kafka Connect SMTs. Click on each link below to see the JSONata transformation in action.

Cast Transform

To cast a field from a number to a string:

$merge([$, {'value': $merge([value, {'ts': $string(value.ts)}])}])

See https://try.jsonata.org/Po8DixmBo.

Drop Transform

To drop the record key and its schema:

$sift($, function($v, $k) {$k != 'keySchema' and $k != 'key'})

See https://try.jsonata.org/9nLK_0Idf.

To drop the record value and its schema:

$sift($, function($v, $k) {$k != 'valueSchema' and $k != 'value'})

See https://try.jsonata.org/FPc1ZXnUj.

Drop Headers Transform

To drop a header with a specific key:

$merge([$, {'headers': $filter($$.headers, function($v, $i, $a) {$v.key != 'key1'})}])

See https://try.jsonata.org/lX9kk_GHD.

Extract Field Transform

To replace the record value with a field extracted from the value:

$merge([$, {'value': value.email, 'valueSchema': {'type': 'STRING'}}])

See https://try.jsonata.org/AXjHLD3PJ.

Extract Topic Transform

To extract a field in the record value and use it as the topic name:

$merge([$, {'topic': value.last}])

See https://try.jsonata.org/ePMcD5LV3.

To extract a value in the header and use it as the topic name:

$merge([$, {'topic': headers[1].value}])

See https://try.jsonata.org/IJDMrs6DN.

Filter Transform

To include or drop records that match a specific condition:

key = 'mykey' ? null : $

See https://try.jsonata.org/y61a9dl20.

Flatten Transform

To flatten a nested data structure, generating names for each field by concatenating the field names at each level with a configurable delimiter character:

(
    /* Define the flatten function */
    $fn := function($o, $prefix) { 
        $each($o, function($v, $k) {(
            $name := $join([$prefix,$k], '.');
            $type($v) = 'object' ? $fn($v, $name): {
                $name: $v
            }
        )}) ~> $merge()
    };

    /* Flatten the record value */
    $merge([$$, {'value': $fn(value)}])
)

See https://try.jsonata.org/iGq7znZCi.

Header From Transform

To move a field in the record key into the record’s header:

( 
    /* Add the new header based on a field in the record value */
    $newHeaders := $append(headers, [{'key': 'email', 'value': value.email}]);

    /* Remove the field from the record value */
    $newValue := $sift(value, function($v, $k) {$k != 'email'});

    /* Merge the new headers and new value into the root document */
    $merge([$$, {'headers': $newHeaders}, {'value': $newValue}])
)

See https://try.jsonata.org/gZc0WVdNu.

Hoist Field Transform

To wrap the record value in a struct:

$merge([$, {'value': {'email': value}}])

See https://try.jsonata.org/Uo7uGfjjt.

Insert Field Transform

To insert the topic name as a field in the record value:

$merge([$, {'value': $merge([value, {'topic': topic}])}])

See ttps://try.jsonata.org/-kY4iZVPr.

Insert Header Transform

To insert a header into each record:

$merge([$, {'headers': $append(headers, {'key': 'topic', 'value': topic})}])

See https://try.jsonata.org/alPNXJUca.

Mask Field Transform

To mask a field in the record value:

$merge([$, {'value': $merge([value, {'email': 'xxxx@xxxx.com'}])}])

See https://try.jsonata.org/F1ADrO740.

Regex Router Transform

To update the topic using the configured regular expression and replacement string:

$merge([$, {'topic': $replace(topic, /(\w+)stream_(\w+)/, '$1$2')}])

See https://try.jsonata.org/rjtE_ctNb.

Replace Field Transform

To rename a field within a struct:

$merge([$, {'value': $merge([$sift(value, function($v, $k) {$k != 'email'}), {'emailAddr': $.value.email}])}])

See https://try.jsonata.org/LBxwpGgVe.

Set Schema Metadata Transform

To set the name and version of the schema:

$merge([$, {'valueSchema': $merge([valueSchema, {'name': 'order-value', 'version': 2}])}])

See https://try.jsonata.org/MlprCwePu.

Timestamp Converter Transform

To convert a field from milliseconds to a string date.

$merge([$, {'value': $merge([value, {'ts': $fromMillis(value.ts)}])}])

See https://try.jsonata.org/RC_nwPKcZ.

Timestamp Router Transform

To update the record’s topic as a function of the original topic and a timestamp:

$merge([$, {'topic': topic & '-' & $now('[Y0001][M01]')}])

See https://try.jsonata.org/OWuyidFcQ.

Tombstone Handler Transform

To ignore tombstone records:

value = null ? null : $

See https://try.jsonata.org/c9vDpnh71.

Value To Key Transform

To replace the record key with a new key formed from a subset of fields in the record value:

$merge([$, {'key': {'first': value.first, 'last': value.last}}])

See https://try.jsonata.org/kmTAXMrE6.

JSONata: The Missing Declarative Language for Kafka Connect

In-Memory Analytics for Kafka using DuckDB

Today’s data scientist has a plethora of options when processing data in Apache Kafka. In terms of stream processing, popular options are Kafka Streams, ksqlDB, and Apache Flink. In terms of real-time analytics, one can choose from Apache Druid, Clickhouse, Apache Pinot, and Apache Doris. One aspect of these systems is that they all support distributed processing, and so are capable of scaling to arbitrary amounts of data. However, the distributed nature of these offerings can lead to operational complexity.

Often a data scientist wants to quickly explore the data to get a better understanding of it without setting up a lot of infrastructure. This has led to the emergence of embedded OLAP databases, which support in-process exploratory data analysis. Databases in this category include Apache DataFusion, chDB, and DuckDB. DuckDB in particular has become especially popular, due to its ease of use and rich feature set, which includes:

  • support for querying and writing files in CSV, JSON and Parquet format
  • dynamic column selection, including use of regular expressions
  • PIVOT and UNPIVOT operations
  • windowing functions
  • full-text search
  • lambda functions, function chaining, and list comprehensions in SQL
  • rich type system, including JSON
  • enhancements to SQL to make it even easier to use
  • APIs for Java, go, C, C++, Node.js, Python, Julia, R, and others
  • etc.

The most popular formats for storing data in Kafka are Apache Avro, Protobuf, and JSON, all of which are supported by the Confluent Schema Registry. All of these formats support composing types using records, arrays, and unions. Unions are particularly interesting in the context of Kafka, because they can be used to support multiple types in the same topic. However, while relational databases often support the record type (usually called row type) and the array type, they usually do not support the union type. DuckDB is the only database that I know of that supports the union type, which makes it a natural target for storing and analyzing data which is originally in either Avro, Protobuf, or JSON format.

One way to load Kafka data into a relational database is to use the JDBC sink connector, which is part of the Apache Kafka Connect framework. However, setting up Kafka Connect involves a lot of moving parts, and the JDBC sink connector does not support DuckDB-specific functionality. For these reasons, I’ve created a small utility called kwack, which aims to do one thing well: load Kafka data into DuckDB, while making use of it’s rich type system, including support for unions.

Once Kafka data is loaded into DuckDB, one can make use of the myriad features of DuckDB. For example, you can quickly run SQL queries on the data, or even export DuckDB tables in Apache Parquet format. This means that by using kwack, you have a quick way to convert Kafka data in Apache Avro, Protobuf, and JSON to Apache Parquet files.

One can start kwack by simply specifying the Kafka broker, topic, and Schema Registry URL:

% kwack -b localhost:9092 -t mytopic -r http://localhost:8081
Welcome to kwack!
Enter "!help" for usage hints.

      ___(.)>
~~~~~~\___)~~~~~~

jdbc:duckdb::memory:>

By default, kwack will use a simple binary deserializer for the key, and for the value kwack will use a deserializer corresponding to the latest subject version in Schema Registry, where the subject is the topic name suffixed with “-value”. The deserializer can be customized to one that does not rely on Schema Registry (short, int, long, float, double, string, or binary) or one that does (latest or a specific schema ID).

For data that has a schema, kwack will map each type in Avro, Protobuf, and JSON Schema to the appropriate type in DuckDB, as follows:

Avro Protobuf JSON Schema DuckDB
boolean boolean boolean BOOLEAN
int int32, sint32, sfixed32 INTEGER
uint32, fixed32 UINTEGER
long int64. sint64, sfixed64 integer BIGINT
uint64, fixed64 UBIGINT
float float FLOAT
double double number DOUBLE
string string string VARCHAR
bytes, fixed bytes BLOB
enum enum enum ENUM
record message object STRUCT
array repeated array LIST
map map MAP
union oneof oneOf,anyOf UNION
decimal confluent.type.Decimal DECIMAL
date google.type.Date DATE
time-millis, time-micros google.type.TimeOfDay TIME
timestamp-millis TIMESTAMP_MS
timestamp-micros TIMESTAMP
timestamp-nanos google.protobuf.Timestamp TIMESTAMP_NS
duration google.protobuf.Duration INTERVAL
uuid UUID

Once kwack has been started, one can enter SQL commands at the prompt. (Note that if your topic name has a hyphen, you’ll need to surround it with quotes in the SQL query.)

% kwack -b localhost:9092 -t mytopic -r http://localhost:8081
Welcome to kwack!
Enter "!help" for usage hints.

      ___(.)>
~~~~~~\___)~~~~~~

jdbc:duckdb::memory:> select * from mytopic;
+--------+-----+-----+-----------------------------------------------------------+
| rowkey | f1  | f2  |                          rowinfo                          |
+--------+-----+-----+-----------------------------------------------------------+
| null   | hi  | 123 | {ksi=null, vsi=1, par=0, off=0, ts=1720286713492, hdr={}} |
| null   | bye | 456 | {ksi=null, vsi=1, par=0, off=1, ts=1720286719746, hdr={}} |
+--------+-----+-----+-----------------------------------------------------------+
2 rows selected (0.012 seconds)

By default all data from the topic will be loaded. One can use the -p option to specify a subset of partitions to load, or the -o option to specify an absolute offset, a relative offset from the end, or a timestamp (in ms) from which to start loading data. (Type kwack -h to see all options.)

The table created for the topic will have one column to represent the Kafka record key (rowkey) and one column to represent the Kafka record metadata (rowinfo). The rest of the columns will be the fields of the Kafka record value assuming that it is a record (Avro), message (Protobuf), or object (JSON); otherwise it will be a single column (rowval).

One can omit attributes of rowinfo, or even rowkey, by specifying exactly which attributes to add to the DuckDB table. Below we include only the partition and offset in rowinfo.

% kwack -b localhost:9092 -t mytopic -r http://localhost:8081 -a par -a off
Welcome to kwack!
Enter "!help" for usage hints.

      ___(.)>
~~~~~~\___)~~~~~~

jdbc:duckdb::memory:> select * from mytopic;
+-----+-----+----------------+
| f1  | f2  |    rowinfo     |
+-----+-----+----------------+
| hi  | 123 | {par=0, off=0} |
| bye | 456 | {par=0, off=1} |
+-----+-----+----------------+
2 rows selected (0.01 seconds)

One can also load multiple topics, and perform joins between the resulting DuckDB tables. Below we load two topics and perform a join on a shared column name.

% kwack -b localhost:9092 -t mytopic -t mytopic2 -r http://localhost:8081 -a none
Welcome to kwack!
Enter "!help" for usage hints.

      ___(.)>
~~~~~~\___)~~~~~~

jdbc:duckdb::memory:> select * from mytopic;
+-----+-----+
| f1  | f2  |
+-----+-----+
| hi  | 123 |
| bye | 456 |
+-----+-----+
2 rows selected (0.002 seconds)
jdbc:duckdb::memory:> select * from mytopic2;
+-----+-------+
| f1  |  f3   |
+-----+-------+
| hi  | world |
| bye | folks |
+-----+-------+
2 rows selected (0.003 seconds)
jdbc:duckdb::memory:> select * from mytopic join mytopic2 using (f1);
>
+-----+-----+-------+
| f1  | f2  |  f3   |
+-----+-----+-------+
| hi  | 123 | world |
| bye | 456 | folks |
+-----+-----+-------+
2 rows selected (0.002 seconds)

As mentioned, one can write DuckDB tables in Parquet format.

% kwack -b localhost:9092 -t mytopic -r http://localhost:8081
Welcome to kwack!
Enter "!help" for usage hints.

      ___(.)>
~~~~~~\___)~~~~~~

jdbc:duckdb::memory:> COPY mytopic to 'mytopic.parquet' (FORMAT 'parquet');
2 rows affected (0.007 seconds)

Additionally, one can persist the DuckDB database to a file, and then use the official DuckDB command line to perform analytical queries.

% kwack -b localhost:9092 -t mytopic -t mytopic2 -r http://localhost:8081 -a none -d /tmp/mydb
Welcome to kwack!
Enter "!help" for usage hints.

      ___(.)>
~~~~~~\___)~~~~~~

jdbc:duckdb:/tmp/mydb> !exit

% duckdb /tmp/mydb
v1.0.0 1f98600c2c
Enter ".help" for usage hints.
D .tables
mytopic mytopic2
D select * from mytopic;
┌─────────┬───────┐
│   f1    │  f2   │
│ varchar │ int32 │
├─────────┼───────┤
│ hi      │   123 │
│ bye     │   456 │
└─────────┴───────┘

One can pass a SQL query on the command line, and instead of entering interactive mode, kwack will output the results of the query.

% kwack -b localhost:9092 -t mytopic -r http://localhost:8081 -a par -a off -q "select * from mytopic"
{"f1":"hi","f2":123,"rowinfo":{"par":0,"off":0}}
{"f1":"bye","f2":456,"rowinfo":{"par":0,"off":1}}

Note that the output is in JSON format. This allows kwack to be used in conjunction with other tools like jq.

% kwack -b localhost:9092 -t mytopic -r http://localhost:8081 -a par -a off -q "select * from mytopic" | jq .f1
"hi"
"bye"

When using kwack with Confluent Cloud, it may be easier to pass all of the properties including credentials in a file, as follows:

% kwack -F kwack.properties
Welcome to kwack!
Enter "!help" for usage hints.

      ___(.)>
~~~~~~\___)~~~~~~

jdbc:duckdb::memory:>

With kwack, using Kafka to perform data science at the command line becomes easier than ever.

In-Memory Analytics for Kafka using DuckDB