JSONata: The Missing Declarative Language for Kafka Connect

JSONata is an expressive, declarative functional language that is as powerful as older transformation technologies such as XSLT, but with a concise syntax. When working with Confluent Schema Registry, JSONata can be used to define complex schema migration rules that can ensure that schema evolution does not break existing consumers.

Another area where JSONata can be useful is when defining transformations that need to occur in the context of Kafka Connect. Typically such transformations are performed using Simple Message Transforms (SMT). An SMT is typically implemented in Java and performs one small function, such as adding, modifying, or removing data or metadata in a Kafka Connect record. However, if an SMT does not exist for a desired transformation, one usually has to create a new custom SMT.

I’ve created a general-purpose JSONata SMT that can take a JSONata expression, so that instead of creating a new custom SMT, a user can simply provide a custom JSONata expression. Using the JSONata SMT, one can replace the behavior of most existing Kafka Connect SMTs. At the end of this article, I provide JSONata expressions for some of the more common SMTs that currently exist.

When using JSONata to transform a Kafka Connect record, the record is converted to the following structure, as an example:

{
  "topic" : "test",
  "kafkaPartition" : 1,
  "keySchema" : {
    "type" : "STRING"
  },
  "key" : "mykey",
  "valueSchema" : {
    "type" : "STRUCT",
    "fields" : {
      "first" : {
        "name" : "first",
        "index" : 0,
        "schema" : {
          "type" : "STRING"
        }
      },
      "last" : {
        "name" : "last",
        "index" : 1,
        "schema" : {
          "type" : "STRING"
        }
      },
      "email" : {
        "name" : "email",
        "index" : 2,
        "schema" : {
          "type" : "STRING"
        }
      }
    }
  },
  "value" : {
    "first" : "test",
    "last" : "user",
    "email" : "none@none.com"
  },
  "timestamp" : 1234,
  "headers" : [ {
    "key" : "key1",
    "value" : "value1",
    "schema" : {
      "type" : "STRING"
    }
  }, {
    "key" : "key2",
    "value" : "value2",
    "schema" : {
      "type" : "STRING"
    }
  } ]
}

JSONata operates by simply transforming the above structure. After the transformation, the structure is converted back to a Kafka Connect record. I won’t go into the details of understanding JSONata (instead see this blog), but I will provide some tips for writing JSONata expressions to transform Kafka Connect records.

When modifying JSON objects, the two most useful object functions are the following:

  • The $sift function removes properties from an object.
  • The $merge function merges two objects, and can be used to add properties to an existing object.

When modifying JSON arrays, the two most useful array functions are the following:

  • The $filter function removes items from an array.
  • The $append function concatenates two arrays, and can be used to add items to an existing array.

Furthermore, multiple expressions can be specified, which will be evaluated in sequential order, and each expression can be assigned to a variable for use in subsequent expressions. The result of the composite expression will be the last expression in the sequence.

($x := expr1; $y := expr2; $z := expr3)

The behavior of most Kafka Connect SMTs can be expressed using the above four functions for objects and arrays, with occasional use of variables to hold intermediate results. However, the power of JSONata is its ability to express custom transformations that aren’t yet embodied by existing SMTs.

The following sections show how JSONata can be used to replace the most common Kafka Connect SMTs. Click on each link below to see the JSONata transformation in action.

Cast Transform

To cast a field from a number to a string:

$merge([$, {'value': $merge([value, {'ts': $string(value.ts)}])}])

See https://try.jsonata.org/Po8DixmBo.

Drop Transform

To drop the record key and its schema:

$sift($, function($v, $k) {$k != 'keySchema' and $k != 'key'})

See https://try.jsonata.org/9nLK_0Idf.

To drop the record value and its schema:

$sift($, function($v, $k) {$k != 'valueSchema' and $k != 'value'})

See https://try.jsonata.org/FPc1ZXnUj.

Drop Headers Transform

To drop a header with a specific key:

$merge([$, {'headers': $filter($$.headers, function($v, $i, $a) {$v.key != 'key1'})}])

See https://try.jsonata.org/lX9kk_GHD.

Extract Field Transform

To replace the record value with a field extracted from the value:

$merge([$, {'value': value.email, 'valueSchema': {'type': 'STRING'}}])

See https://try.jsonata.org/AXjHLD3PJ.

Extract Topic Transform

To extract a field in the record value and use it as the topic name:

$merge([$, {'topic': value.last}])

See https://try.jsonata.org/ePMcD5LV3.

To extract a value in the header and use it as the topic name:

$merge([$, {'topic': headers[1].value}])

See https://try.jsonata.org/IJDMrs6DN.

Filter Transform

To include or drop records that match a specific condition:

key = 'mykey' ? null : $

See https://try.jsonata.org/y61a9dl20.

Flatten Transform

To flatten a nested data structure, generating names for each field by concatenating the field names at each level with a configurable delimiter character:

(
    /* Define the flatten function */
    $fn := function($o, $prefix) { 
        $each($o, function($v, $k) {(
            $name := $join([$prefix,$k], '.');
            $type($v) = 'object' ? $fn($v, $name): {
                $name: $v
            }
        )}) ~> $merge()
    };

    /* Flatten the record value */
    $merge([$$, {'value': $fn(value)}])
)

See https://try.jsonata.org/iGq7znZCi.

Header From Transform

To move a field in the record key into the record’s header:

( 
    /* Add the new header based on a field in the record value */
    $newHeaders := $append(headers, [{'key': 'email', 'value': value.email}]);

    /* Remove the field from the record value */
    $newValue := $sift(value, function($v, $k) {$k != 'email'});

    /* Merge the new headers and new value into the root document */
    $merge([$$, {'headers': $newHeaders}, {'value': $newValue}])
)

See https://try.jsonata.org/gZc0WVdNu.

Hoist Field Transform

To wrap the record value in a struct:

$merge([$, {'value': {'email': value}}])

See https://try.jsonata.org/Uo7uGfjjt.

Insert Field Transform

To insert the topic name as a field in the record value:

$merge([$, {'value': $merge([value, {'topic': topic}])}])

See ttps://try.jsonata.org/-kY4iZVPr.

Insert Header Transform

To insert a header into each record:

$merge([$, {'headers': $append(headers, {'key': 'topic', 'value': topic})}])

See https://try.jsonata.org/alPNXJUca.

Mask Field Transform

To mask a field in the record value:

$merge([$, {'value': $merge([value, {'email': 'xxxx@xxxx.com'}])}])

See https://try.jsonata.org/F1ADrO740.

Regex Router Transform

To update the topic using the configured regular expression and replacement string:

$merge([$, {'topic': $replace(topic, /(\w+)stream_(\w+)/, '$1$2')}])

See https://try.jsonata.org/rjtE_ctNb.

Replace Field Transform

To rename a field within a struct:

$merge([$, {'value': $merge([$sift(value, function($v, $k) {$k != 'email'}), {'emailAddr': $.value.email}])}])

See https://try.jsonata.org/LBxwpGgVe.

Set Schema Metadata Transform

To set the name and version of the schema:

$merge([$, {'valueSchema': $merge([valueSchema, {'name': 'order-value', 'version': 2}])}])

See https://try.jsonata.org/MlprCwePu.

Timestamp Converter Transform

To convert a field from milliseconds to a string date.

$merge([$, {'value': $merge([value, {'ts': $fromMillis(value.ts)}])}])

See https://try.jsonata.org/RC_nwPKcZ.

Timestamp Router Transform

To update the record’s topic as a function of the original topic and a timestamp:

$merge([$, {'topic': topic & '-' & $now('[Y0001][M01]')}])

See https://try.jsonata.org/OWuyidFcQ.

Tombstone Handler Transform

To ignore tombstone records:

value = null ? null : $

See https://try.jsonata.org/c9vDpnh71.

Value To Key Transform

To replace the record key with a new key formed from a subset of fields in the record value:

$merge([$, {'key': {'first': value.first, 'last': value.last}}])

See https://try.jsonata.org/kmTAXMrE6.

JSONata: The Missing Declarative Language for Kafka Connect

Leave a Reply