JSONata is an expressive, declarative functional language that is as powerful as older transformation technologies such as XSLT, but with a concise syntax. When working with Confluent Schema Registry, JSONata can be used to define complex schema migration rules that can ensure that schema evolution does not break existing consumers.
Another area where JSONata can be useful is when defining transformations that need to occur in the context of Kafka Connect. Typically such transformations are performed using Simple Message Transforms (SMT). An SMT is typically implemented in Java and performs one small function, such as adding, modifying, or removing data or metadata in a Kafka Connect record. However, if an SMT does not exist for a desired transformation, one usually has to create a new custom SMT.
I’ve created a general-purpose JSONata SMT that can take a JSONata expression, so that instead of creating a new custom SMT, a user can simply provide a custom JSONata expression. Using the JSONata SMT, one can replace the behavior of most existing Kafka Connect SMTs. At the end of this article, I provide JSONata expressions for some of the more common SMTs that currently exist.
When using JSONata to transform a Kafka Connect record, the record is converted to the following structure, as an example:
{
"topic" : "test",
"kafkaPartition" : 1,
"keySchema" : {
"type" : "STRING"
},
"key" : "mykey",
"valueSchema" : {
"type" : "STRUCT",
"fields" : {
"first" : {
"name" : "first",
"index" : 0,
"schema" : {
"type" : "STRING"
}
},
"last" : {
"name" : "last",
"index" : 1,
"schema" : {
"type" : "STRING"
}
},
"email" : {
"name" : "email",
"index" : 2,
"schema" : {
"type" : "STRING"
}
}
}
},
"value" : {
"first" : "test",
"last" : "user",
"email" : "none@none.com"
},
"timestamp" : 1234,
"headers" : [ {
"key" : "key1",
"value" : "value1",
"schema" : {
"type" : "STRING"
}
}, {
"key" : "key2",
"value" : "value2",
"schema" : {
"type" : "STRING"
}
} ]
}
JSONata operates by simply transforming the above structure. After the transformation, the structure is converted back to a Kafka Connect record. I won’t go into the details of understanding JSONata (instead see this blog), but I will provide some tips for writing JSONata expressions to transform Kafka Connect records.
When modifying JSON objects, the two most useful object functions are the following:
- The
$siftfunction removes properties from an object. - The
$mergefunction merges two objects, and can be used to add properties to an existing object.
When modifying JSON arrays, the two most useful array functions are the following:
- The
$filterfunction removes items from an array. - The
$appendfunction concatenates two arrays, and can be used to add items to an existing array.
Furthermore, multiple expressions can be specified, which will be evaluated in sequential order, and each expression can be assigned to a variable for use in subsequent expressions. The result of the composite expression will be the last expression in the sequence.
($x := expr1; $y := expr2; $z := expr3)
The behavior of most Kafka Connect SMTs can be expressed using the above four functions for objects and arrays, with occasional use of variables to hold intermediate results. However, the power of JSONata is its ability to express custom transformations that aren’t yet embodied by existing SMTs.
The following sections show how JSONata can be used to replace the most common Kafka Connect SMTs. Click on each link below to see the JSONata transformation in action.
Cast Transform
To cast a field from a number to a string:
$merge([$, {'value': $merge([value, {'ts': $string(value.ts)}])}])
See https://try.jsonata.org/Po8DixmBo.
Drop Transform
To drop the record key and its schema:
$sift($, function($v, $k) {$k != 'keySchema' and $k != 'key'})
See https://try.jsonata.org/9nLK_0Idf.
To drop the record value and its schema:
$sift($, function($v, $k) {$k != 'valueSchema' and $k != 'value'})
See https://try.jsonata.org/FPc1ZXnUj.
Drop Headers Transform
To drop a header with a specific key:
$merge([$, {'headers': $filter($$.headers, function($v, $i, $a) {$v.key != 'key1'})}])
See https://try.jsonata.org/lX9kk_GHD.
Extract Field Transform
To replace the record value with a field extracted from the value:
$merge([$, {'value': value.email, 'valueSchema': {'type': 'STRING'}}])
See https://try.jsonata.org/AXjHLD3PJ.
Extract Topic Transform
To extract a field in the record value and use it as the topic name:
$merge([$, {'topic': value.last}])
See https://try.jsonata.org/ePMcD5LV3.
To extract a value in the header and use it as the topic name:
$merge([$, {'topic': headers[1].value}])
See https://try.jsonata.org/IJDMrs6DN.
Filter Transform
To include or drop records that match a specific condition:
key = 'mykey' ? null : $
See https://try.jsonata.org/y61a9dl20.
Flatten Transform
To flatten a nested data structure, generating names for each field by concatenating the field names at each level with a configurable delimiter character:
(
/* Define the flatten function */
$fn := function($o, $prefix) {
$each($o, function($v, $k) {(
$name := $join([$prefix,$k], '.');
$type($v) = 'object' ? $fn($v, $name): {
$name: $v
}
)}) ~> $merge()
};
/* Flatten the record value */
$merge([$$, {'value': $fn(value)}])
)
See https://try.jsonata.org/iGq7znZCi.
Header From Transform
To move a field in the record key into the record’s header:
(
/* Add the new header based on a field in the record value */
$newHeaders := $append(headers, [{'key': 'email', 'value': value.email}]);
/* Remove the field from the record value */
$newValue := $sift(value, function($v, $k) {$k != 'email'});
/* Merge the new headers and new value into the root document */
$merge([$$, {'headers': $newHeaders}, {'value': $newValue}])
)
See https://try.jsonata.org/gZc0WVdNu.
Hoist Field Transform
To wrap the record value in a struct:
$merge([$, {'value': {'email': value}}])
See https://try.jsonata.org/Uo7uGfjjt.
Insert Field Transform
To insert the topic name as a field in the record value:
$merge([$, {'value': $merge([value, {'topic': topic}])}])
See ttps://try.jsonata.org/-kY4iZVPr.
Insert Header Transform
To insert a header into each record:
$merge([$, {'headers': $append(headers, {'key': 'topic', 'value': topic})}])
See https://try.jsonata.org/alPNXJUca.
Mask Field Transform
To mask a field in the record value:
$merge([$, {'value': $merge([value, {'email': 'xxxx@xxxx.com'}])}])
See https://try.jsonata.org/F1ADrO740.
Regex Router Transform
To update the topic using the configured regular expression and replacement string:
$merge([$, {'topic': $replace(topic, /(\w+)stream_(\w+)/, '$1$2')}])
See https://try.jsonata.org/rjtE_ctNb.
Replace Field Transform
To rename a field within a struct:
$merge([$, {'value': $merge([$sift(value, function($v, $k) {$k != 'email'}), {'emailAddr': $.value.email}])}])
See https://try.jsonata.org/LBxwpGgVe.
Set Schema Metadata Transform
To set the name and version of the schema:
$merge([$, {'valueSchema': $merge([valueSchema, {'name': 'order-value', 'version': 2}])}])
See https://try.jsonata.org/MlprCwePu.
Timestamp Converter Transform
To convert a field from milliseconds to a string date.
$merge([$, {'value': $merge([value, {'ts': $fromMillis(value.ts)}])}])
See https://try.jsonata.org/RC_nwPKcZ.
Timestamp Router Transform
To update the record’s topic as a function of the original topic and a timestamp:
$merge([$, {'topic': topic & '-' & $now('[Y0001][M01]')}])
See https://try.jsonata.org/OWuyidFcQ.
Tombstone Handler Transform
To ignore tombstone records:
value = null ? null : $
See https://try.jsonata.org/c9vDpnh71.
Value To Key Transform
To replace the record key with a new key formed from a subset of fields in the record value:
$merge([$, {'key': {'first': value.first, 'last': value.last}}])