JSON Schema Compatibility and
the Robustness Principle

In a previous article, I explained how to evolve JSON Schemas, based on whether backward, forward, or full (both backward and forward) compatibility is desired. In JSON Schema, the value of additionalProperties determines whether the JSON content model is open (if the value is true), closed (if the value is false), or partially open (if the value is any other value besides true or false). The discussion in that article led to the following points.

  • For backward compatibility, one can add optional properties to a closed content model or remove optional properties from an open content model.
  • For forward compatibility, one can add optional properties to an open content model or remove optional properties from a closed content model.

Because backward and forward compatibility each want the content model to be different when adding or removing optional properties, full compatibility can be difficult to achieve with JSON Schema.

Schema formats such as Avro and Protobuf allow for optional fields to be added and removed in a fully compatible manner.  These formats achieve full compatibility when adding and removing optional fields by ensuring the following:

  • When a payload is produced using a schema, only the fields in the schema will appear in the payload.
  • When a payload is consumed using a schema, any fields in the payload that are not in the schema are ignored.

The above behavior is an example of the Robustness Principle.  The Robustness Principle, also known as Postel’s Law, is often stated as

Be conservative in what you send, be liberal in what you accept.

An alternative formulation is

Be tolerant of inputs and strict on outputs.

Using the Robustness Principle

Applications that use JSON Schema are not required to follow the Robustness Principle. However, in practice, many applications do.

For example, the Robustness Principle is often used in REST applications. If a REST service does not receive all of the request properties, it will use default values for the missing properties. If a REST client receives response properties it does not expect, it will ignore the extra properties.

Using the Robustness Principle with JSON Schema ensures that the producer will not add properties to the schema other than those specified in the schema. The producer treats the schema as a closed content model. Likewise, the consumer will tolerate properties that do not appear in the schema by simply ignoring them, rather than raising an error. The consumer treats the schema as an open content model.

If you are sure that your application is following the Robustness Principle, then you can ask Schema Registry to use the lenient policy when performing compatibility checks with JSON Schema1. The lenient policy will allow optional properties to be added and removed in a fully compatible manner, because it assumes your application is following the Robustness Principle. If you do not specify the lenient policy, then the strict policy will be used, which is the default.

Limitations of Full Compatibility

The different schema formats can express both product types and sum types. Product types take the form of records (Avro), messages (Protobuf), and objects (JSON). A product type is comprised of components, which take the form of fields (Avro and Protobuf) and properties (JSON). Sum types take the form of unions (Avro) and oneofs (Protobuf and JSON). A sum type is comprised of variants. While product types are more common, sum types are also important. For example, sum types are needed when representing multiple types within the same Kafka topic.

If you are using the lenient policy with JSON Schema, you may still want to prefer backward compatibility over full compatibility. While both backward compatibility and full compatibility allow you to evolve product types by adding and removing optional components, full compatibility does not allow you to evolve sum types. The most common way to evolve a sum type is by adding a new variant. This is a backward compatible change, but not a fully compatible one.

JSON Schema Compatibility and
the Robustness Principle

Using Data Contracts with the Rust Schema Registry Client

The Rust Schema Registry Client is a fully asynchronous Rust client library for interacting with the Confluent Schema Registry. It allows you to enforce a Data Contract between a producer and a consumer. For example, you can

  • Serialize and deserialize Kafka records using Avro, Protobuf, and JSON Schema
  • Specify data quality rules using Google Common Expression Language (CEL) expressions
  • Specify schema migration rules using JSONata expressions
  • Enforce client-side field-level encryption (CSFLE) rules using AWS KMS, Azure Key Vault, Google Cloud KMS, or HashiCorp Vault

This library can be used with rust-rdkafka but does not depend on it.

Below we’ll show some examples of how to use this library with Avro, Protobuf, and JSON Schema formats.

Avro

The Avro support relies on the Apache Avro Rust library.  There are two ways of handling Avro data in Rust:

  • as Avro-specialized data types based on an Avro schema
  • as generic Rust serde-compatible types deriving Serialize and Deserialize

On the producer side, first we create an Avro serializer.

let client_conf = SchemaRegistryClientConfig::new(vec![url.to_string()]);
    
let client = SchemaRegistryClient::new(client_conf);

// We pass the schema and use auto-registration.
let schema_str = r#"
 {
    "namespace": "confluent.io.examples.serialization.avro",
    "name": "User",
    "type": "record",
    "fields": [
        {"name": "name", "type": "string", "confluent:tags": [ "PII" ]},
        {"name": "favorite_number", "type": "long"},
        {"name": "favorite_color", "type": "string"}
    ]
}
"#;

let schema = Schema {
    schema_type: Some("AVRO".to_string()),
    references: None,
    metadata: None,
    rule_set: None,
    schema: schema_str.to_string(),
};

let ser_conf = SerializerConfig::new(true, None, true, false, HashMap::new());

let ser = AvroSerializer::new(
    &client, Some(&schema), None, ser_conf
)
.expect("Failed to create serializer");
 

If using the Avro-specific data types, we create an Avro record and serialize using the serialize() method.

let fields = vec![
    ("name".to_string(), Value::String("John Doe".to_string())),
    ("favorite_number".to_string(), Value::Int(7)),
    ("favorite_color".to_string(), Value::String("blue".to_string())),
];

let value = Value::Record(fields);

let ser_ctx = SerializationContext {
    topic: topic_name.to_string(),
    serde_type: SerdeType::Value,  // whether record key or value
    serde_format: SerdeFormat::Avro,
    headers: None,
};

let bytes: Vec<u8> = ser
    .serialize(&ser_ctx, value)
    .await
    .expect("Failed to serialize");
 

If using the serde-compatible types, we create an object and serialize it using the serialize_ser() method.

// The User type must implement Serialize and Deserialize
let obj = User {
    name: "John Doe".to_string(),,
    favorite_number: 7,
    favorite_color: "blue".to_string(),
};

let ser_ctx = SerializationContext {
    topic: topic_name.to_string(),
    serde_type: SerdeType::Value,  // whether record key or value
    serde_format: SerdeFormat::Avro,
    headers: None,

};

let bytes: Vec<u8> = ser
    .serialize_ser(&ser_ctx, obj)
    .await
    .expect("Failed to serialize");
 

On the consumer side, we create an Avro deserializer.

let client_conf = SchemaRegistryClientConfig::new(vec![url.to_string()]);
    
let client = SchemaRegistryClient::new(client_conf);

let deser_conf = DeserializerConfig::new(None, false, HashMap::new());
    
let deser = AvroDeserializer::new(
    &client, None, deser_conf
)
.expect("Failed to create deserializer");
 

Assuming we have a Kafka message, we deserialize it as follows.

let m: BorrowedMessage = ...;

let ser_ctx = SerializationContext {
    topic: m.topic().to_string(),
    serde_type: SerdeType::Value,  // whether record key or value
    serde_format: SerdeFormat::Avro,
    headers: None,
};

let payload = deser
    .deserialize(&ser_ctx, &m.payload().unwrap_or(b""))
    .await
    .unwrap();

info!("received: '{:?}', payload.value);
 

If you want to deserialize the message into a serde-compatible type, you can add the following lines:

let obj = from_value::<User>(&payload.value).unwrap();

info!("received: '{:?}', obj);
 

Protobuf

The Protobuf support relies on the prost and prost-reflect libraries.

On the producer side, first we create a Protobuf serializer.

let client_conf = SchemaRegistryClientConfig::new(vec![url.to_string()]);
    
let client = SchemaRegistryClient::new(client_conf);

let ser_conf = SerializerConfig::new(true, None, true, false, HashMap::new());
    
let ser = ProtobufSerializer::new(
    &client, None, ser_conf
)
.expect("Failed to create serializer");
 

If your Protobuf object implements the ReflectMessage trait from prost-reflect, then serializing it is straightforward.

// In this example, the Author type implements ReflectMessage
let obj = Author {
    name: "Franz Kafka".to_string(),
    works: vec!["Metamorphosis".to_string(), "The Trial".to_string()],
};

let ser_ctx = SerializationContext {
    topic: topic_name.to_string(),
    serde_type: SerdeType::Value,  // whether record key or value
    serde_format: SerdeFormat::Protobuf,
    headers: None,
};
                
let bytes: Vec<u8> = ser
    .serialize(&ser_ctx, &obj)
    .await
    .expect("Failed to serialize");
 

However, if your Protobuf object only implements the Message trait from prost, then you must pass both the fully-qualified name of the message type and the relevant FileDescriptorSet.

// In this example, the Author type only implements Message
let obj = Author {
    name: "Franz Kafka".to_string(),
    works: vec!["Metamorphosis".to_string(), "The Trial".to_string()],
};

let fds: FileDescriptorSet = ...

let ser_ctx = SerializationContext {
    topic: topic_name.to_string(),
    serde_type: SerdeType::Value,  // whether record key or value
    serde_format: SerdeFormat::Protobuf,
    headers: None,
};
                
let bytes: Vec<u8> = ser
    .serialize_with_file_desc_set(&ser_ctx, &obj, "test.Author", fds)
    .await
    .expect("Failed to serialize");
 

On the consumer side, we create a Protobuf deserializer.

let client_conf = SchemaRegistryClientConfig::new(vec![url.to_string()]);
    
let client = SchemaRegistryClient::new(client_conf);

let deser_conf = DeserializerConfig::new(None, false, HashMap::new());

let deser = ProtobufDeserializer::new(
    &client, None, deser_conf
)
.expect("Failed to create deserializer");
 

Assuming we have a Kafka message, we deserialize it as follows.

let m: BorrowedMessage = ...;

let ser_ctx = SerializationContext {
    topic: m.topic().to_string(),
    serde_type: SerdeType::Value,  // whether record key or value
    serde_format: SerdeFormat::Protobuf,
    headers: None,
};

let obj: Author = deser
    .deserialize(&ser_ctx, &m.payload().unwrap_or(b""))
    .await
    .unwrap();
                
info!("received: '{:?}', obj);
 

JSON

When serializing JSON, JSON Schema validation can be performed using a JSON Schema validation library.

On the producer side, first we create a JSON serializer.

let client_conf = SchemaRegistryClientConfig::new(vec![url.to_string()]);
    
let client = SchemaRegistryClient::new(client_conf);

// We pass the schema and use auto-registration.
let schema_str = r#"
{
  "type": "object",
  "properties": {
    "name": {
       "type": "string",
       "confluent:tags": [ "PII" ]
    },
    "favorite_number": { "type": "number" },
    "favorite_color": { "type": "string" }
  }
}
"#;

let schema = Schema {
    schema_type: Some("JSON".to_string()),
    references: None,
    metadata: None,
    rule_set: None,
    schema: schema_str.to_string(),
};

let ser_conf = SerializerConfig::new(
    true, None, true, /* validate */ false, HashMap::new()
);

let ser = JsonSerializer::new(
    &client, Some(&schema), None, ser_conf
)
.expect("Failed to create serializer");
 

We create a JSON value and serialize it as follows.

let value = serde_json::json!({
    "name": "John Doe",
    "favorite_number": 7,
    "favorite_color": "blue"
});

let ser_ctx = SerializationContext {
    topic: topic_name.to_string(),
    serde_type: SerdeType::Value,  // whether record key or value
    serde_format: SerdeFormat::Json,
    headers: None,
};
                
let bytes: Vec<u8> = ser
    .serialize(&ser_ctx, value)
    .await
    .expect("Failed to serialize");
 

On the consumer side, we create a JSON deserializer.

let client_conf = SchemaRegistryClientConfig::new(vec![url.to_string()]);
    
let client = SchemaRegistryClient::new(client_conf);

let deser_conf = DeserializerConfig::new(
    None, /* validate */ false, HashMap::new()
);
    
let deser = JsonDeserializer::new(
    &client, None, deser_conf
)
.expect("Failed to create deserializer");
 

Assuming we have a Kafka message, we deserialize it as follows.

let m: BorrowedMessage = ...;

let ser_ctx = SerializationContext {
    topic: m.topic().to_string(),
    serde_type: SerdeType::Value,
    serde_format: SerdeFormat::Json,
    headers: None,
};
                
let value = deser
    .deserialize(&ser_ctx, &m.payload().unwrap_or(b""))
    .await
    .unwrap();

info!("received: '{:?}', value);
 

See the examples for more information, including details on how to use the Rust Schema Registry Client with rust-rdkafka.

Using Data Contracts with the Rust Schema Registry Client

JSONata: The Missing Declarative Language for Kafka Connect

JSONata is an expressive, declarative functional language that is as powerful as older transformation technologies such as XSLT, but with a concise syntax. When working with Confluent Schema Registry, JSONata can be used to define complex schema migration rules that can ensure that schema evolution does not break existing consumers.

Another area where JSONata can be useful is when defining transformations that need to occur in the context of Kafka Connect. Typically such transformations are performed using Simple Message Transforms (SMT). An SMT is typically implemented in Java and performs one small function, such as adding, modifying, or removing data or metadata in a Kafka Connect record. However, if an SMT does not exist for a desired transformation, one usually has to create a new custom SMT.

I’ve created a general-purpose JSONata SMT that can take a JSONata expression, so that instead of creating a new custom SMT, a user can simply provide a custom JSONata expression. Using the JSONata SMT, one can replace the behavior of most existing Kafka Connect SMTs. At the end of this article, I provide JSONata expressions for some of the more common SMTs that currently exist.

When using JSONata to transform a Kafka Connect record, the record is converted to the following structure, as an example:

{
  "topic" : "test",
  "kafkaPartition" : 1,
  "keySchema" : {
    "type" : "STRING"
  },
  "key" : "mykey",
  "valueSchema" : {
    "type" : "STRUCT",
    "fields" : {
      "first" : {
        "name" : "first",
        "index" : 0,
        "schema" : {
          "type" : "STRING"
        }
      },
      "last" : {
        "name" : "last",
        "index" : 1,
        "schema" : {
          "type" : "STRING"
        }
      },
      "email" : {
        "name" : "email",
        "index" : 2,
        "schema" : {
          "type" : "STRING"
        }
      }
    }
  },
  "value" : {
    "first" : "test",
    "last" : "user",
    "email" : "none@none.com"
  },
  "timestamp" : 1234,
  "headers" : [ {
    "key" : "key1",
    "value" : "value1",
    "schema" : {
      "type" : "STRING"
    }
  }, {
    "key" : "key2",
    "value" : "value2",
    "schema" : {
      "type" : "STRING"
    }
  } ]
}

JSONata operates by simply transforming the above structure. After the transformation, the structure is converted back to a Kafka Connect record. I won’t go into the details of understanding JSONata (instead see this blog), but I will provide some tips for writing JSONata expressions to transform Kafka Connect records.

When modifying JSON objects, the two most useful object functions are the following:

  • The $sift function removes properties from an object.
  • The $merge function merges two objects, and can be used to add properties to an existing object.

When modifying JSON arrays, the two most useful array functions are the following:

  • The $filter function removes items from an array.
  • The $append function concatenates two arrays, and can be used to add items to an existing array.

Furthermore, multiple expressions can be specified, which will be evaluated in sequential order, and each expression can be assigned to a variable for use in subsequent expressions. The result of the composite expression will be the last expression in the sequence.

($x := expr1; $y := expr2; $z := expr3)

The behavior of most Kafka Connect SMTs can be expressed using the above four functions for objects and arrays, with occasional use of variables to hold intermediate results. However, the power of JSONata is its ability to express custom transformations that aren’t yet embodied by existing SMTs.

The following sections show how JSONata can be used to replace the most common Kafka Connect SMTs. Click on each link below to see the JSONata transformation in action.

Cast Transform

To cast a field from a number to a string:

$merge([$, {'value': $merge([value, {'ts': $string(value.ts)}])}])

See https://try.jsonata.org/Po8DixmBo.

Drop Transform

To drop the record key and its schema:

$sift($, function($v, $k) {$k != 'keySchema' and $k != 'key'})

See https://try.jsonata.org/9nLK_0Idf.

To drop the record value and its schema:

$sift($, function($v, $k) {$k != 'valueSchema' and $k != 'value'})

See https://try.jsonata.org/FPc1ZXnUj.

Drop Headers Transform

To drop a header with a specific key:

$merge([$, {'headers': $filter($$.headers, function($v, $i, $a) {$v.key != 'key1'})}])

See https://try.jsonata.org/lX9kk_GHD.

Extract Field Transform

To replace the record value with a field extracted from the value:

$merge([$, {'value': value.email, 'valueSchema': {'type': 'STRING'}}])

See https://try.jsonata.org/AXjHLD3PJ.

Extract Topic Transform

To extract a field in the record value and use it as the topic name:

$merge([$, {'topic': value.last}])

See https://try.jsonata.org/ePMcD5LV3.

To extract a value in the header and use it as the topic name:

$merge([$, {'topic': headers[1].value}])

See https://try.jsonata.org/IJDMrs6DN.

Filter Transform

To include or drop records that match a specific condition:

key = 'mykey' ? null : $

See https://try.jsonata.org/y61a9dl20.

Flatten Transform

To flatten a nested data structure, generating names for each field by concatenating the field names at each level with a configurable delimiter character:

(
    /* Define the flatten function */
    $fn := function($o, $prefix) { 
        $each($o, function($v, $k) {(
            $name := $join([$prefix,$k], '.');
            $type($v) = 'object' ? $fn($v, $name): {
                $name: $v
            }
        )}) ~> $merge()
    };

    /* Flatten the record value */
    $merge([$$, {'value': $fn(value)}])
)

See https://try.jsonata.org/iGq7znZCi.

Header From Transform

To move a field in the record key into the record’s header:

( 
    /* Add the new header based on a field in the record value */
    $newHeaders := $append(headers, [{'key': 'email', 'value': value.email}]);

    /* Remove the field from the record value */
    $newValue := $sift(value, function($v, $k) {$k != 'email'});

    /* Merge the new headers and new value into the root document */
    $merge([$$, {'headers': $newHeaders}, {'value': $newValue}])
)

See https://try.jsonata.org/gZc0WVdNu.

Hoist Field Transform

To wrap the record value in a struct:

$merge([$, {'value': {'email': value}}])

See https://try.jsonata.org/Uo7uGfjjt.

Insert Field Transform

To insert the topic name as a field in the record value:

$merge([$, {'value': $merge([value, {'topic': topic}])}])

See ttps://try.jsonata.org/-kY4iZVPr.

Insert Header Transform

To insert a header into each record:

$merge([$, {'headers': $append(headers, {'key': 'topic', 'value': topic})}])

See https://try.jsonata.org/alPNXJUca.

Mask Field Transform

To mask a field in the record value:

$merge([$, {'value': $merge([value, {'email': 'xxxx@xxxx.com'}])}])

See https://try.jsonata.org/F1ADrO740.

Regex Router Transform

To update the topic using the configured regular expression and replacement string:

$merge([$, {'topic': $replace(topic, /(\w+)stream_(\w+)/, '$1$2')}])

See https://try.jsonata.org/rjtE_ctNb.

Replace Field Transform

To rename a field within a struct:

$merge([$, {'value': $merge([$sift(value, function($v, $k) {$k != 'email'}), {'emailAddr': $.value.email}])}])

See https://try.jsonata.org/LBxwpGgVe.

Set Schema Metadata Transform

To set the name and version of the schema:

$merge([$, {'valueSchema': $merge([valueSchema, {'name': 'order-value', 'version': 2}])}])

See https://try.jsonata.org/MlprCwePu.

Timestamp Converter Transform

To convert a field from milliseconds to a string date.

$merge([$, {'value': $merge([value, {'ts': $fromMillis(value.ts)}])}])

See https://try.jsonata.org/RC_nwPKcZ.

Timestamp Router Transform

To update the record’s topic as a function of the original topic and a timestamp:

$merge([$, {'topic': topic & '-' & $now('[Y0001][M01]')}])

See https://try.jsonata.org/OWuyidFcQ.

Tombstone Handler Transform

To ignore tombstone records:

value = null ? null : $

See https://try.jsonata.org/c9vDpnh71.

Value To Key Transform

To replace the record key with a new key formed from a subset of fields in the record value:

$merge([$, {'key': {'first': value.first, 'last': value.last}}])

See https://try.jsonata.org/kmTAXMrE6.

JSONata: The Missing Declarative Language for Kafka Connect

In-Memory Analytics for Kafka using DuckDB

Today’s data scientist has a plethora of options when processing data in Apache Kafka. In terms of stream processing, popular options are Kafka Streams, ksqlDB, and Apache Flink. In terms of real-time analytics, one can choose from Apache Druid, Clickhouse, Apache Pinot, and Apache Doris. One aspect of these systems is that they all support distributed processing, and so are capable of scaling to arbitrary amounts of data. However, the distributed nature of these offerings can lead to operational complexity.

Often a data scientist wants to quickly explore the data to get a better understanding of it without setting up a lot of infrastructure. This has led to the emergence of embedded OLAP databases, which support in-process exploratory data analysis. Databases in this category include Apache DataFusion, chDB, and DuckDB. DuckDB in particular has become especially popular, due to its ease of use and rich feature set, which includes:

  • support for querying and writing files in CSV, JSON and Parquet format
  • dynamic column selection, including use of regular expressions
  • PIVOT and UNPIVOT operations
  • windowing functions
  • full-text search
  • lambda functions, function chaining, and list comprehensions in SQL
  • rich type system, including JSON
  • enhancements to SQL to make it even easier to use
  • APIs for Java, go, C, C++, Node.js, Python, Julia, R, and others
  • etc.

The most popular formats for storing data in Kafka are Apache Avro, Protobuf, and JSON, all of which are supported by the Confluent Schema Registry. All of these formats support composing types using records, arrays, and unions. Unions are particularly interesting in the context of Kafka, because they can be used to support multiple types in the same topic. However, while relational databases often support the record type (usually called row type) and the array type, they usually do not support the union type. DuckDB is the only database that I know of that supports the union type, which makes it a natural target for storing and analyzing data which is originally in either Avro, Protobuf, or JSON format.

One way to load Kafka data into a relational database is to use the JDBC sink connector, which is part of the Apache Kafka Connect framework. However, setting up Kafka Connect involves a lot of moving parts, and the JDBC sink connector does not support DuckDB-specific functionality. For these reasons, I’ve created a small utility called kwack, which aims to do one thing well: load Kafka data into DuckDB, while making use of it’s rich type system, including support for unions.

Once Kafka data is loaded into DuckDB, one can make use of the myriad features of DuckDB. For example, you can quickly run SQL queries on the data, or even export DuckDB tables in Apache Parquet format. This means that by using kwack, you have a quick way to convert Kafka data in Apache Avro, Protobuf, and JSON to Apache Parquet files.

One can start kwack by simply specifying the Kafka broker, topic, and Schema Registry URL:

% kwack -b localhost:9092 -t mytopic -r http://localhost:8081
Welcome to kwack!
Enter "!help" for usage hints.

      ___(.)>
~~~~~~\___)~~~~~~

jdbc:duckdb::memory:>

By default, kwack will use a simple binary deserializer for the key, and for the value kwack will use a deserializer corresponding to the latest subject version in Schema Registry, where the subject is the topic name suffixed with “-value”. The deserializer can be customized to one that does not rely on Schema Registry (short, int, long, float, double, string, or binary) or one that does (latest or a specific schema ID).

For data that has a schema, kwack will map each type in Avro, Protobuf, and JSON Schema to the appropriate type in DuckDB, as follows:

Avro Protobuf JSON Schema DuckDB
boolean boolean boolean BOOLEAN
int int32, sint32, sfixed32 INTEGER
uint32, fixed32 UINTEGER
long int64. sint64, sfixed64 integer BIGINT
uint64, fixed64 UBIGINT
float float FLOAT
double double number DOUBLE
string string string VARCHAR
bytes, fixed bytes BLOB
enum enum enum ENUM
record message object STRUCT
array repeated array LIST
map map MAP
union oneof oneOf,anyOf UNION
decimal confluent.type.Decimal DECIMAL
date google.type.Date DATE
time-millis, time-micros google.type.TimeOfDay TIME
timestamp-millis TIMESTAMP_MS
timestamp-micros TIMESTAMP
timestamp-nanos google.protobuf.Timestamp TIMESTAMP_NS
duration google.protobuf.Duration INTERVAL
uuid UUID

Once kwack has been started, one can enter SQL commands at the prompt. (Note that if your topic name has a hyphen, you’ll need to surround it with quotes in the SQL query.)

% kwack -b localhost:9092 -t mytopic -r http://localhost:8081
Welcome to kwack!
Enter "!help" for usage hints.

      ___(.)>
~~~~~~\___)~~~~~~

jdbc:duckdb::memory:> select * from mytopic;
+--------+-----+-----+-----------------------------------------------------------+
| rowkey | f1  | f2  |                          rowinfo                          |
+--------+-----+-----+-----------------------------------------------------------+
| null   | hi  | 123 | {ksi=null, vsi=1, par=0, off=0, ts=1720286713492, hdr={}} |
| null   | bye | 456 | {ksi=null, vsi=1, par=0, off=1, ts=1720286719746, hdr={}} |
+--------+-----+-----+-----------------------------------------------------------+
2 rows selected (0.012 seconds)

By default all data from the topic will be loaded. One can use the -p option to specify a subset of partitions to load, or the -o option to specify an absolute offset, a relative offset from the end, or a timestamp (in ms) from which to start loading data. (Type kwack -h to see all options.)

The table created for the topic will have one column to represent the Kafka record key (rowkey) and one column to represent the Kafka record metadata (rowinfo). The rest of the columns will be the fields of the Kafka record value assuming that it is a record (Avro), message (Protobuf), or object (JSON); otherwise it will be a single column (rowval).

One can omit attributes of rowinfo, or even rowkey, by specifying exactly which attributes to add to the DuckDB table. Below we include only the partition and offset in rowinfo.

% kwack -b localhost:9092 -t mytopic -r http://localhost:8081 -a par -a off
Welcome to kwack!
Enter "!help" for usage hints.

      ___(.)>
~~~~~~\___)~~~~~~

jdbc:duckdb::memory:> select * from mytopic;
+-----+-----+----------------+
| f1  | f2  |    rowinfo     |
+-----+-----+----------------+
| hi  | 123 | {par=0, off=0} |
| bye | 456 | {par=0, off=1} |
+-----+-----+----------------+
2 rows selected (0.01 seconds)

One can also load multiple topics, and perform joins between the resulting DuckDB tables. Below we load two topics and perform a join on a shared column name.

% kwack -b localhost:9092 -t mytopic -t mytopic2 -r http://localhost:8081 -a none
Welcome to kwack!
Enter "!help" for usage hints.

      ___(.)>
~~~~~~\___)~~~~~~

jdbc:duckdb::memory:> select * from mytopic;
+-----+-----+
| f1  | f2  |
+-----+-----+
| hi  | 123 |
| bye | 456 |
+-----+-----+
2 rows selected (0.002 seconds)
jdbc:duckdb::memory:> select * from mytopic2;
+-----+-------+
| f1  |  f3   |
+-----+-------+
| hi  | world |
| bye | folks |
+-----+-------+
2 rows selected (0.003 seconds)
jdbc:duckdb::memory:> select * from mytopic join mytopic2 using (f1);
>
+-----+-----+-------+
| f1  | f2  |  f3   |
+-----+-----+-------+
| hi  | 123 | world |
| bye | 456 | folks |
+-----+-----+-------+
2 rows selected (0.002 seconds)

As mentioned, one can write DuckDB tables in Parquet format.

% kwack -b localhost:9092 -t mytopic -r http://localhost:8081
Welcome to kwack!
Enter "!help" for usage hints.

      ___(.)>
~~~~~~\___)~~~~~~

jdbc:duckdb::memory:> COPY mytopic to 'mytopic.parquet' (FORMAT 'parquet');
2 rows affected (0.007 seconds)

Additionally, one can persist the DuckDB database to a file, and then use the official DuckDB command line to perform analytical queries.

% kwack -b localhost:9092 -t mytopic -t mytopic2 -r http://localhost:8081 -a none -d /tmp/mydb
Welcome to kwack!
Enter "!help" for usage hints.

      ___(.)>
~~~~~~\___)~~~~~~

jdbc:duckdb:/tmp/mydb> !exit

% duckdb /tmp/mydb
v1.0.0 1f98600c2c
Enter ".help" for usage hints.
D .tables
mytopic mytopic2
D select * from mytopic;
┌─────────┬───────┐
│   f1    │  f2   │
│ varchar │ int32 │
├─────────┼───────┤
│ hi      │   123 │
│ bye     │   456 │
└─────────┴───────┘

One can pass a SQL query on the command line, and instead of entering interactive mode, kwack will output the results of the query.

% kwack -b localhost:9092 -t mytopic -r http://localhost:8081 -a par -a off -q "select * from mytopic"
{"f1":"hi","f2":123,"rowinfo":{"par":0,"off":0}}
{"f1":"bye","f2":456,"rowinfo":{"par":0,"off":1}}

Note that the output is in JSON format. This allows kwack to be used in conjunction with other tools like jq.

% kwack -b localhost:9092 -t mytopic -r http://localhost:8081 -a par -a off -q "select * from mytopic" | jq .f1
"hi"
"bye"

When using kwack with Confluent Cloud, it may be easier to pass all of the properties including credentials in a file, as follows:

% kwack -F kwack.properties
Welcome to kwack!
Enter "!help" for usage hints.

      ___(.)>
~~~~~~\___)~~~~~~

jdbc:duckdb::memory:>

With kwack, using Kafka to perform data science at the command line becomes easier than ever.

In-Memory Analytics for Kafka using DuckDB

Using Data Contracts with Confluent Schema Registry

The following post originally appeared in the Confluent blog on October 18, 2023.

The Confluent Schema Registry plays a pivotal role in ensuring that producers and consumers in a streaming platform are able to communicate effectively. Ensuring the consistent use of schemas and their versions allows producers and consumers to easily interoperate, even when schemas evolve over time.

As streaming has become more ubiquitous, enterprises find that downstream components such as consumers are often tasked with handling data inconsistencies, incompatible changes, and expensive and complicated transformations in order to be able to process the data effectively. This has led to an effort to shift these responsibilities to the source of the data, such as the producer, an activity often referred to as shift-left. In the context of modern data architectures, such as streaming and data mesh, the effort to shift-left has led to an emphasis on the data contract. As a result, Confluent Schema Registry, both in Confluent Platform Enterprise and Confluent Cloud, now supports the use of data contracts.

A data contract is a formal agreement between an upstream component and a downstream component on the structure and semantics of data that is in motion. The upstream component enforces the data contract, while the downstream component can assume that the data it receives conforms to the data contract. Data contracts are important because they provide transparency over dependencies and data usage in a streaming architecture. They help to ensure the consistency, reliability and quality of the data in event streams, and they provide a single source of truth for understanding the data in motion.

In this article, we’ll walk through an example of enhancing a schema to be a full-fledged data contract. Our example will involve the following actions:

  • Defining an initial schema for the data contract
  • Enhancing the data contract with business metadata
  • Adding data quality rules to the data contract
  • Specifying custom actions for the data contract
  • Adding migration rules to the data contract for a complex schema evolution

Defining an Order Schema

Let’s create a simple Avro schema to represent a customer order.

{
  "name": "Order",
  "namespace": "acme.com",
  "type": "record",
  "fields": [
    {
      "name": "orderId",
      "type": "int"
    },
    {
      "name": "customerId",
      "type": "int"
    },
    { 
      "name": "totalPriceCents",
      "type": "int"
    },
    {
      "name": "state",
      "type": {
        "type": "enum",
        "name": "OrderStatus",
        "symbols": [
          "Pending",
          "Processing",
          "Completed",
          "Canceled",
          "Unknown"
        ],
        "default": "Unknown"  
      }
    },
    {
      "name": "timestamp",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    }
  ]
}

Assuming the above schema is in a file named “order.avsc”, upload it to a local Schema Registry as follows:

jq -n --rawfile schema order.avsc '{schema: $schema}' |
  curl http://localhost:8081/subjects/orders-value/versions --json @-

Let’s try producing and consuming with our new schema using the Avro console producer and consumer, which you can find the documentation for here. First, start the consumer.

./bin/kafka-avro-console-consumer \
  --topic orders \
  --bootstrap-server localhost:9092

In a separate terminal, start the producer, and pass the schema ID that was returned during registration as the value of “value.schema.id”.

./bin/kafka-avro-console-producer \
  --topic orders \
  --broker-list localhost:9092 \
  --property value.schema.id=<id>

{"orderId": 1, "customerId": 2, "totalPriceCents": 12000, "state": "Pending", "timestamp": 1693591356 }

When the above record in JSON format is sent via the producer, it will be received by the consumer.

Enhancing the Data Contract with Metadata

As mentioned, in the effort to shift-left the responsibilities for the quality and timeliness of streaming data, data contracts are typically defined by the producers. By examining the data contract, a downstream consumer can determine which person or team is responsible for the data contract and what service level objectives (SLOs) the data contract is attempting to achieve.

In a file named “order_metadata.json”, we declare that this contract is owned by the Orders Team, and that one of the service level objectives is to ensure that orders are available to downstream consumers no later than 10 seconds after the order timestamp. We’ll make use of these metadata properties later in this article.

{
  "metadata": {
    "properties": {
       "owner": "OrdersTeam",
       "owner_email": "orders.team@acme.com",
       "slo_timeliness_secs": "10"
    } 
  }
}

Register the metadata directly to Schema Registry without specifying a schema. When omitting the schema while registering metadata, a new schema version will be created with the schema of the previous version.

curl http://localhost:8081/subjects/orders-value/versions \
  --json @order_metadata.json

Adding Data Quality Rules to the Data Contract

As shown in the previous section, data contracts can capture business metadata. They can also be used to capture integrity constraints or data quality rules to ensure the quality of the data. In a file named “order_ruleset.json”, let’s add a rule that the price needs to be a positive number. The rule is specified as a Google Common Expression Language (CEL) expression.

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "checkTotalPrice",
        "kind": "CONDITION",
        "type": "CEL",
        "mode": "WRITE",
        "expr": "message.totalPriceCents > 0"
      }
    ]
  }
}

Similarly to the metadata, register the rule set directly to Schema Registry without specifying a schema, as a new schema version will be created with the schema (as well as metadata) of the previous version.

curl http://localhost:8081/subjects/orders-value/versions \
  --json @order_ruleset.json

The above rule will cause all messages with a non-positive price to be rejected. Instead of just rejecting the invalid messages, we may want to capture them in a dead letter queue for further analysis or processing. To do so, set the “onFailure” action to “DLQ”.

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "checkTotalPrice",
        "kind": "CONDITION",
        "type": "CEL",
        "mode": "WRITE",
        "expr": "message.totalPriceCents > 0",
        "params": {
          "dlq.topic": "bad_orders"
        },
        "onFailure": "DLQ"
      }
    ]
  }
}

Let’s try out our data quality rule. First start the consumer.

./bin/kafka-avro-console-consumer \
  --topic orders \
  --bootstrap-server localhost:9092

In a separate terminal, start the producer. The last two properties passed to the console producer are used by the DLQ action.

./bin/kafka-avro-console-producer \
  --topic orders \
  --broker-list localhost:9092 \
  --property value.schema.id=<id> \
  --property bootstrap.servers=localhost:9092 #dlq \
  --property dlq.auto.flush=true #dlq

{"orderId": 1, "customerId": 2, "totalPriceCents": -1, "state": "Pending", "timestamp": 1693591356 }

We should see an error of the form:

Expr failed: 'message.totalPriceCents > 0'

If we check the DLQ, we’ll see the errant record.

To learn more about CEL, see Understanding CEL in Data Contract Rules.

Specifying Custom Rule Executors and Actions

The rule framework for data contracts is completely customizable. One can specify both custom rule executors and actions for a data contract.

Below is the Java interface for rule executors. The “transform” method is used by rules of both CONDITION and TRANSFORM. In the former case, it should return a Boolean value; in the latter case, it should return the transformed value.

public interface RuleExecutor extends RuleBase {
  String type();

  Object transform(RuleContext ctx, Object message) throws RuleException;
}

Below is the Java interface for rule actions. An exception is passed to the rule action if the rule executor failed, which includes a condition that returned false. After performing its work, the rule action should throw this exception if it exists.

public interface RuleAction extends RuleBase {
  String type();

  void run(RuleContext ctx, Object message, RuleException ex) 
    throws RuleException;
}

Assume that on the consumer side we want to check the timeliness of data to see if it conforms to the declared service level objective. If it doesn’t, we want to send an alert in the form of an email message. Below is a custom rule action to check the timeliness SLO. Note that the “type” of the action is specified as “EMAIL”. Also, the expression “ctx.getParameter(name)” will look for the value of a rule parameter with the given name. If not found, it will then look for the value of a metadata property with the given name.

public class EmailAction implements RuleAction {

  private static final String USERNAME = "username";
  private static final String PASSWORD = "password";

  private String username;
  private String password;

  public String type() {
    return "EMAIL";
  }

  @Override
  public void configure(Map<String, ?> configs) {
    this.username = (String) configs.get(USERNAME);
    this.password = (String) configs.get(PASSWORD);
  }

  public void run(RuleContext ctx, Object message, RuleException ex)
    throws RuleException {
    if (!meetsSloTimeliness(ctx, message)) {
      sendMail(ctx, message);
    }
    if (ex != null) {
      throw ex;
    }
  }

  private boolean meetsSloTimeliness(RuleContext ctx, Object message) 
    throws RuleException {
    try {
      String sloStr = ctx.getParameter("slo_timeliness_secs");
      int slo = Integer.parseInt(sloStr);
      long timestamp = (Long) ((GenericData.Record) message).get("timestamp");
      long now = System.currentTimeMillis();
      return now - timestamp <= slo * 1000L;
    } catch (Exception e) {
      throw new RuleException(e);
    }
  }

  private void sendMail(RuleContext ctx, Object message)
      throws RuleException {
    try {
      String from = ctx.getParameter("mail.smtp.from");
      String to = ctx.getParameter("owner_email");
      String sloTimeliness = ctx.getParameter("slo_timeliness_secs");

      Properties props = new Properties();
      props.put("mail.smtp.host", ctx.getParameter("mail.smtp.host"));
      props.put("mail.smtp.port", ctx.getParameter("mail.smtp.port"));
      props.put("mail.smtp.auth", "true");
      props.put("mail.smtp.starttls.enable", "true");
      Session session = Session.getInstance(props, new Authenticator() {
        @Override
        protected PasswordAuthentication getPasswordAuthentication() {
          return new PasswordAuthentication(username, password);
        }
      });

      Message mail = new MimeMessage(session);
      mail.setFrom(new InternetAddress(from));
      mail.setRecipients(Message.RecipientType.TO, InternetAddress.parse(to));
      mail.setSubject("Mail Subject");

      String msg = "Order '" + ((GenericData.Record) message).get("orderId")
          + "' failed to meet the timeliness SLO of "
          + sloTimeliness + " seconds";

      MimeBodyPart mimeBodyPart = new MimeBodyPart();
      mimeBodyPart.setContent(msg, "text/html; charset=utf-8");
      Multipart multipart = new MimeMultipart();
      multipart.addBodyPart(mimeBodyPart);
      mail.setContent(multipart);

      Transport.send(mail);
    } catch (Exception e) {
      throw new RuleException(e);
    }
  }
}

Now that we’ve implemented a rule action to check the timeliness SLO, let’s add it to the rule set. Note that the “mode” of the rule is “READ” since the timeliness check happens on the consumer. We use a CEL expression of “true”, to ensure that the action is always invoked. Finally, the “onSuccess” action is set to “EMAIL”.

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "checkTotalPrice",
        "kind": "CONDITION",
        "type": "CEL",
        "mode": "WRITE",
        "expr": "message.totalPriceCents > 0",
        "params": {
          "dlq.topic": "bad_orders"
        },
        "onFailure": "DLQ"
      },
      {
        "name": "checkSloTimeliness",
        "kind": "CONDITION",
        "type": "CEL",
        "mode": "READ",
        "expr": "true",
        "params": {
          "mail.smtp.host": "smtp.acme.com",
          "mail.smtp.port": "25",
          "mail.smtp.from": "billing.team@acme.com"
        },
        "onSuccess": "EMAIL"
      }
    ]
  }
}

The above code is available in a GitHub repository. After cloning the repository, register the schema with the following command:

mvn schema-registry:register

To run the consumer:

mvn compile exec:java \
  -Dexec.mainClass="io.confluent.data.contracts.ConsumerApp" \
  -Dexec.args="./src/main/resources/data-contracts.properties group-1 client-1 <emailUsername> <emailPassword>"

To generate random Order data, run the producer:

mvn compile exec:java \
  -Dexec.mainClass="io.confluent.data.contracts.ProducerApp" \
  -Dexec.args="./src/main/resources/data-contracts.properties client-1"

You should now see emails being sent for any records that do not meet the timeliness SLO.

Adding Migration Rules to the Data Contract

The consumers may be satisfied with the data contract so far, but perhaps one day the Orders Team decides that they want to change the schema in a backward-incompatible manner. For example, they decide to change the field named “state” to “status”. In Avro, one possibility is to use an alias, but some Avro implementations might not support aliases. So for the sake of this article, we’ll consider changing the name of an Avro field to be a backward-incompatible change (as is the case for Protobuf and JSON Schema).

To support breaking changes within a schema version history, we need a way to partition the history into subsets of versions that are compatible with one another. We can achieve this with the notion of a compatibility group. We choose an arbitrary name for a metadata property, such as “major_version”, and then use that property to specify which data contracts belong to the same compatibility group.

First, we configure Schema Registry to only perform compatibility checks for schemas within a compatibility group.

curl http://localhost:8081/config/orders-value \
  -X PUT --json '{ "compatibilityGroup": "major_version" }'

Next we change the field named “state” to “status”, as specified in a file named “order2.avsc”.

{
  "name": "Order",
  "namespace": "acme.com",
  "type": "record",
  "fields": [
    {
      "name": "orderId",
      "type": "int"
    },
    {
      "name": "customerId",
      "type": "int"
    },
    { 
      "name": "totalPriceCents",
      "type": "int"
    },
    {
      "name": "status",
      "type": {
        "type": "enum",
        "name": "OrderStatus",
        "symbols": [
          "Pending",
          "Processing",
          "Completed",
          "Canceled",
          "Unknown"
        ],
        "default": "Unknown"  
      }
    },
    {
      "name": "timestamp",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    }
  ]
}

Register the schema, and pass a new metadata property with name “major_version” and value “2”, so that the schema need not be backward compatible with previous versions.

jq -n --rawfile schema order2.avsc '{ schema: $schema, metadata: { properties: { owner: "OrdersTeam", email: "orders.team@acme.com", slo_timeliness_secs: 10, major_version: 2 } } }' |
  curl http://localhost:8081/subjects/orders-value/versions --json @-

If desired, we can now pin a client to the latest version of a specific compatibility group using the following client-side properties:

auto.register.schemas=false
use.latest.with.metadata=major_version=2
latest.cache.ttl.sec=1800

Above we’ve specified that the client should check for a new latest version after every 30 minutes.

Now that our subject supports breaking changes in the version history as partitioned by compatibility groups, we need a way for the consumer to transform messages for the previous compatibility group to the current one. We can achieve this with migration rules using JSONata. Below the UPGRADE rule allows new consumers to read old messages, while the DOWNGRADE rule allows old consumers to read new messages. If you plan to upgrade all consumers, you can omit the DOWNGRADE rule. In each migration rule, we use the JSONata function called “$sift()” to remove a field with one name, and then use a JSON property to add a field with another name.

{
  "ruleSet": {
    "domainRules": [
      ...
    ],
    "migrationRules": [
      {
        "name": "changeStateToStatus",
        "kind": "TRANSFORM",
        "type": "JSONATA",
        "mode": "UPGRADE",
        "expr": "$merge([$sift($, function($v, $k) {$k != 'state'}), {'status': $.'state'}])"
      },
      {
        "name": "changeStatusToState",
        "kind": "TRANSFORM",
        "type": "JSONATA",
        "mode": "DOWNGRADE",
        "expr": "$merge([$sift($, function($v, $k) {$k != 'status'}), {'state': $.'status'}])"
      }
    ]
  }
}

The new rule set, in a file named “order_ruleset2.json”, has both domain rules and migration rules, and can be registered separately from the schema, as before.

curl http://localhost:8081/subjects/orders-value/versions \
  --json @order_ruleset2.json

After updating the metadata and rule set, let’s try out our migration rule. We’ll produce a record using the original schema version with field “state” and see it transformed by the consumer to conform to the latest schema version with field “status”.

First, start the consumer. Note that we specify “use.latest.version=true” to tell the consumer that we want the record to conform to the latest version even if it was produced with an older version. As mentioned, we could alternatively specify “use.latest.with.metadata=major_version=2”. Either setting will cause the migration rule to be invoked.

./bin/kafka-avro-console-consumer \
  --topic orders \
  --bootstrap-server localhost:9092 \
  --property auto.register.schemas=false \
  --property use.latest.version=true

In a separate terminal, start the producer. Note that we should pass the ID of the original schema that has a field named “state” instead of “status”.

./bin/kafka-avro-console-producer \
  --topic orders \
  --broker-list localhost:9092 \
  --property value.schema.id=<id>

{"orderId": 1, "customerId": 2, "totalPriceCents": 12000, "state": "Pending", "timestamp": 1693591356 }

When the above record is sent to the producer, the following record is received by the consumer, where the JSONata transform has modified the record to have a field named “status” instead of “state”.

{"orderId": 1, "customerId": 2, "totalPriceCents": 12000, "status": "Pending", "timestamp": 1693591356 }

The combination of compatibility groups with migration rules for transforming data across those groups is a powerful combination for evolving schemas in arbitrarily complex ways.

To learn more about JSONata, see Understanding JSONata.

Summary

The use of data contracts helps to shift-left the responsibility of ensuring data quality, interoperability, and compliance to the upstream component, which is the source of the data. By doing so, a data contract can provide a formal agreement between an upstream component and a downstream component for the structure and semantics of the data in motion. Today Confluent Schema Registry supports the use of data contracts to make streaming more reliable and powerful than ever before.

To learn more, see the documentation on Data Contracts for Schema Registry.

Using Data Contracts with Confluent Schema Registry

Understanding CEL In Data Contract Rules

The Confluent Schema Registry recently added support for tags, metadata, and rules, which together support the concept of a data contract. Data quality rules in a data contract can be expressed using the Google Common Expression Language (CEL), while migration rules can be expressed using JSONata. In this article, I’ll provide some tips on understanding the capabilities of CEL in the context of data contract rules.

The CEL Type System

One of the most important aspects of CEL is that it is a strongly typed language. In CEL, all expressions have a well-defined type, and all operators and functions check that their arguments have the expected types. Consider the following CEL expression:

'hello' == 3  // raises an error
 

One would expect this expression to return false, but instead it raises an error. That’s because the CEL type checker ensures that both arguments to the equality operator have the same type. The CEL type system includes the usual built-in types (int, uint, double, bool, string, bytes, list, map, null_type), but also has two built-in types that deserve further commentary: type and dyn.

Every value in CEL has a type, which is also considered a value. That means that the type of a value can be used in expressions. Below we use the type function to compare the type of the value “hello” with the type of the value “world”.

type('hello') == type('world')  // both sides evaluate to the 'string' type
 

The other type that deserves discussion is dyn. This type is the union of all other types, similar to Object in Java. A value can have its type converted to the dyn type using the dyn function. The dyn function can often be used to prevent the type checker from raising an error. For example, the following expression may raise an error because both the equality and conditional operators require arguments to have the same type.

value == null ? message.name + ' ' + message.lastName : value
 

However, the following expression using the dyn function will not raise an error.

dyn(value) == null ? message.name + ' ' + message.lastName : dyn(value)
 

Guards for Field-Level Rules

When defining a data contract rule, a CEL expression can be used at the message level or at the field level. Below we express a message-level rule of type CEL to check that the ssn field is not empty.

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "checkSsn",
        "kind": "CONDITION",
        "type": "CEL",
        "mode": "WRITE",
        "expr": "message.ssn != ''"
      }
    ]
  }
}

Message-level rules with type CEL are passed a variable named message, which represents the message being processed.

We could have instead expressed the above condition as a field-level rule using the CEL_FIELD rule type.

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "checkSsn",
        "kind": "CONDITION",
        "type": "CEL_FIELD",
        "mode": "WRITE",
        "expr": "name == 'ssn' ; value != ''"
      }
    ]
  }
}

Rules with type CEL_FIELD are executed for every field in a message. Such rules are passed the following variables:

  • value – the field value
  • fullName – the fully-qualified name of the field
  • name – the field name
  • typeName – the name of the field type, one of STRING, BYTES, INT, LONG, FLOAT, DOUBLE, BOOLEAN
  • tags – tags that apply to the field
  • message – the containing message

Note that the expr for a rule of type CEL_FIELD is of the following form, where the guard is an optional CEL expression preceding the CEL expression for the rule body.

<CEL expr for guard> ; <CEL expr for rule body>
 

Guards are useful for preventing the type checker from raising an error. Without a guard, the following CEL expression will raise an error for any field in the message that is not of type string, because the inequality operator requires that both arguments have the same type.

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "checkSsn",
        "kind": "CONDITION",
        "type": "CEL_FIELD",
        "mode": "WRITE",
        "expr": "name == 'ssn' && value != ''"
      }
    ]
  }
}

One could fix the above expression using the dyn function as shown below, but that is less obvious than using a guard.

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "checkSsn",
        "kind": "CONDITION",
        "type": "CEL_FIELD",
        "mode": "WRITE",
        "expr": "name == 'ssn' && dyn(value) != ''"
      }
    ]
  }
}

If we want to apply the rule body to all fields with the same type, we can use a guard that checks the typeName:

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "checkSsn",
        "kind": "CONDITION",
        "type": "CEL_FIELD",
        "mode": "WRITE",
        "expr": "typeName == 'STRING' ; value != ''"
      }
    ]
  }
}

Checking for Empty or Missing Fields

Confluent Schema Registry supports schemas for Avro, Protobuf, and JSON Schema. When using CEL expressions, checking for an empty or missing field in an object may need to be performed differently for each of the corresponding schema types.

In Protobuf, a missing field is set to the default value for the field type. From the Protobuf documentation:

  • For strings, the default value is the empty string.
  • For bytes, the default value is empty bytes.
  • For bools, the default value is false.
  • For numeric types, the default value is zero.
  • For enums, the default value is the first defined enum value, which must be 0.
  • For message fields, the field is not set. Its exact value is language-dependent.

For Protobuf, a field with type message is the only type of field that might be null, depending on the language.

In Avro, any field can be null as long as it has been declared as optional, which is a union of null and one or more other types.

Similarly, in JSON Schema, any field can be null if it has been declared as optional, which is a oneOf of null and one or more other types. Furthermore, any field can be missing unless it has been declared as required.

As an example, for Protobuf, to check that a string is empty or missing, we use the following expression at the message level.

message.ssn == ''
 

Alternatively for Protobuf, one can use the has macro to determine whether the field is set to its default value.

!has(message.ssn)
 

For Avro, we would use a rule like the following. Note that we need to use the dyn function since in the CEL type system, the null type is distinct from the other types.

message.ssn == '' || dyn(message.ssn) == null
 

For JSON, we would use a rule with the has macro, since fields can be missing in JSON.

!has(message.ssn) || message.ssn == '' || dyn(message.ssn) == null
 

CEL String Literals

CEL supports several kinds of string literals. Quoted string literals can use either single-quotes or double-quotes.

message.name == 'hello' // equivalent to: message.name == "hello"
 

Since CEL expressions are used as JSON values in data contract rules, single quotes are to be preferred.

A triple-quoted string is delimited by either three single-quotes or three double-quotes, and may contain newlines.

message.name == '''I'm happy for y'all'''
 

Finally, a string preceded by the r or R character is a raw string, which does not interpret escape sequences. A raw string is useful for representing regular expressions.

message.ssn.matches(r'\\d{3}-\\d{2}-\\d{4}')
 

Transformations using CEL Maps and Messages

So far, all examples using the CEL and CEL_FIELD rule types have used CEL expressions to represent conditions. Both rule types can also use CEL expressions to represent transformations. Below we use a rule of type CEL_FIELD to set an empty status value to the string ‘unknown’.

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "transformStatus",
        "kind": "TRANSFORM",
        "type": "CEL_FIELD",
        "mode": "WRITE",
        "expr": "name == 'status' ; value == '' ? 'unknown' : value"
      }
    ]
  }
}

To use the CEL rule type as a message-level transformation, return a CEL map. For example, assume that the Order message has two fields and we want to transform the status field as before. The following message-level rule of type CEL will return a CEL map with the desired result.

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "transformStatus",
        "kind": "TRANSFORM",
        "type": "CEL",
        "mode": "WRITE",
        "expr": "{ 'orderId': message.orderId, 'status': message.status == '' ? 'unknown' : message.status }"
      }
    ]
  }
}

If possible, the resulting CEL map will be automatically converted to an object of the appropriate schema type, either Avro, Protobuf, or JSON.

CEL also has first-class support for Protobuf messages. To return a Protobuf message, the expression M{f1: e1, f2: e2, ..., fN: eN} can be used, where M is the simple or qualified name of the message type. For example, if com.acme.Order is a Protobuf message type, the following rule can be used to return a Protobuf object.

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "transformStatus",
        "kind": "TRANSFORM",
        "type": "CEL",
        "mode": "WRITE",
        "expr": "com.acme.Order{ orderId: message.orderId, status: message.status == '' ? 'unknown' : message.status }"
      }
    ]
  }
}

Of course, returning a Protobuf object should only be done if the data contract has a Protobuf schema.

Understanding CEL In Data Contract Rules

Understanding JSONata

JSONata is a declarative functional language for querying and transforming JSON data. It was inspired by the path semantics of XPath. Whereas other languages that are inspired by XPath also incorporate SQL-like elements1, JSONata stays true to its XPath roots and augments the path semantics of XPath with additional features to increase its power.

JSONata is powerful but it can take a while to get accustomed to its terse syntax. In this article I’ll discuss three aspects of JSONata that will help you understand some of the subtleties of the language.

  1. Sequences and Path Operators
  2. Object Constructors
  3. Contexts and Context Variable Binding

Sequences and Path Operators

Since XPath 2.0, every value in XPath is a sequence of zero or more items. An atomic value, such as a Boolean, number, or string, is just a special case of a sequence of length one. JSONata uses the same data model as XPath.

Since JSONata is a functional language, and its main data construct is a sequence, one would expect the well-known map, filter, and reduce functional operators to be available for sequences. This is indeed the case, but these operators often hide in plain sight.

For example, with the JSON below, assume we want to find the number of products in each category that are priced greater than 5.

{
  "products": [
    { "name": "broiler", "category": "kitchen", "price": 100, "cost": 70 },
    { "name": "toaster", "category": "kitchen", "price": 30, "cost": 10 },
    { "name": "blender", "category": "kitchen", "price": 50, "cost": 25 },
    { "name": "socks", "category": "clothes", "price": 5, "cost": 2 },
    { "name": "shirt", "category": "clothes", "price": 10, "cost": 3 }
  ]
}

Here is a JSONata query that will return the desired results.

$.products[price > 5]{category: $count(name)}
{
  "kitchen": 3,
  "clothes": 1
}

You can see this JSONata query in action here.

Let’s break down the above query into four elements:

  1. $
  2. .products
  3. [price > 5]
  4. {category: $count(name)}

The first element, $, is a built-in variable. When at the beginning of a JSONata expression, it represents the entire input document.

The second element, .products, is composed of the map operator (.) along with a function to be applied to each element of the input sequence. In this case the function is the products path expression, and the input sequence is the entire document.

The third element, [price > 5], is the filter operator ([...]) along with a predicate expression to determine which elements of the input sequence should be returned.

The fourth element, {category: $count(name)}, is the reduce operator ({...}) along with an object to hold the groupings, determined by the key expressions, and the aggregated values.

For those familiar with functional programming, being able to parse JSONata path expressions as compositions of the usual functional operators will help you understand the results that are returned.

Object Constructors

One of the features of JSONata that might initially be confusing to those first using it are the object constructors. There are two types of object constructors: one produces a single object and the other produces an array of objects. We’ve seen the first one in the previous section:

$.products[price > 5]{category: $count(name)}
 
{
  "kitchen": 3,
  "clothes": 1
}

The second one looks similar:

$.products[price > 5].{category: $count(name)}
 
[
  {
    "kitchen": 1
  },
  {
    "kitchen": 1
  },
  {
    "kitchen": 1
  },
  {
    "clothes": 1
  }
]

As can be seen, one object constructor is of the form pathExpr{...} while the other is of the form pathExpr.{...}. Note the dot (.) between the path expression and the object expression.

Now that we’ve learned to parse these expressions as compositions of functional operators, we can make sense of the resulting output. The first object constructor is simply the reduce operator ({...}), as we’ve discussed previously, while the second object constructor is the map operator (.) followed by a function that returns an object for each element of the input sequence. This explains why the output of the second object constructor is an array.

Contexts and Context Variable Binding

Previously we explained that $ is a built-in variable that represents the input document when at the beginning of a JSONata expression. In general, the variable $ refers to the context item, which is the sequence currently being evaluated.

The context item is implied at the beginning of a path expression, but can also be explicitly referenced. For example, the following two queries are equivalent:

products[price > 5]{category: $count(name)}
$.products[$.price > 5]{$.category: $count($.name)}

After applying the map (.) operator, the context item changes. However, there is one exception, and that is when using a context variable binding. The context variable binding feature is unique to JSONata, and was introduced to support join semantics in a path expression. We show an example below.

Assume our data looks as follows:

{
  "sales": [
    { "product": "broiler", "store number": 1, "quantity": 20  },
    { "product": "toaster", "store number": 2, "quantity": 100 },
    { "product": "toaster", "store number": 2, "quantity": 50 },
    { "product": "toaster", "store number": 3, "quantity": 50 },
    { "product": "blender", "store number": 3, "quantity": 100 },
    { "product": "blender", "store number": 3, "quantity": 150 },
    { "product": "socks", "store number": 1, "quantity": 500 },
    { "product": "socks", "store number": 2, "quantity": 10 },
    { "product": "shirt", "store number": 3, "quantity": 10 }
  ],
  "stores": [
    { "store number": 1, "state": "CA" },
    { "store number": 2, "state": "CA" },
    { "store number": 3, "state": "MA" },
    { "store number": 4, "state": "MA" }
  ]
}

We want to join the sales data with the store data to find out how many products were sold in each state. Here’s the JSONata query to achieve this, which you can also see here.

$.stores@$st.sales@$sl[$sl.`store number` = $st.`store number`]{
    $st.state: $sum($sl.quantity)
}
{
  "CA": 680,
  "MA": 310
}

At the beginning of our JSONata expression, the context item is the entire input document. We use the map operator followed by a path expression with a context variable binding (.stores@$st) to bind the variable $st to the result of mapping the stores path expression to the input document. Since we use a context variable binding, the context item does not change to the result of the stores path expression, but instead remains as the input document. The next map operator followed by a path expression (.sales$sl) is again applied to the input document as the context item.

After the $st and $sl variables are bound to the result of the stores and sales path expressions, respectively, the filter expression [$sl.`store number` = $st.`store number`] performs the join, and the reduce expression { $st.state: $sum($sl.quantity) } aggregates the results.

As demonstrated, the reason that the context item does not change when using a context variable binding is to more easily support joins between sibling nodes in the JSON document.

Sample Queries

The rest of this article shows various sample queries using JSONata. These examples are adapted from an enumeration of various use cases for a different JSON transformation language that was also inspired by XPath. You can see these examples implemented in the alternative language here. The JSONata examples tend to be more concise.

Join

The JSON below is based on a social media site that allows users to interact with their friends.

[
  {
    "name": "Sarah",
    "age": 13,
    "gender": "female",
    "friends": [
      "Jim",
      "Mary",
      "Jennifer"
    ]
  },
  {
    "name": "Jim",
    "age": 13,
    "gender": "male",
    "friends": [
      "Sarah"
    ]
  }
]

The following query performs a join on Sarah’s friend list to return the object representing each of her friends:

$[name in $$[name = "Sarah"].friends]
{
  "name": "Jim",
  "age": 13,
  "gender": "male",
  "friends": [
    "Sarah"
  ]
}

See https://try.jsonata.org/1A5n67svD.

Grouping Queries for JSON

The JSON below contains data about sales, products, and stores.

{
  "sales": [
    { "product": "broiler", "store number": 1, "quantity": 20  },
    { "product": "toaster", "store number": 2, "quantity": 100 },
    { "product": "toaster", "store number": 2, "quantity": 50 },
    { "product": "toaster", "store number": 3, "quantity": 50 },
    { "product": "blender", "store number": 3, "quantity": 100 },
    { "product": "blender", "store number": 3, "quantity": 150 },
    { "product": "socks", "store number": 1, "quantity": 500 },
    { "product": "socks", "store number": 2, "quantity": 10 },
    { "product": "shirt", "store number": 3, "quantity": 10 }
  ],
  "products": [
    { "name": "broiler", "category": "kitchen", "price": 100, "cost": 70 },
    { "name": "toaster", "category": "kitchen", "price": 30, "cost": 10 },
    { "name": "blender", "category": "kitchen", "price": 50, "cost": 25 },
    { "name": "socks", "category": "clothes", "price": 5, "cost": 2 },
    { "name": "shirt", "category": "clothes", "price": 10, "cost": 3 }
  ],
  "stores": [
    { "store number": 1, "state": "CA" },
    { "store number": 2, "state": "CA" },
    { "store number": 3, "state": "MA" },
    { "store number": 4, "state": "MA" }
  ]
}

We want to group sales by product, across stores.

sales{ product: [$sum(quantity)]}
{ "broiler": 20, "toaster": 200, "blender":250, "socks": 510, "shirt": 10 }

See https://try.jsonata.org/cGRl7Xi1T.

Now let’s do a more complex grouping query, showing sales by category within each state. The following query groups by state, then by category, then lists individual products and the sales associated with each.

(
    /* First join all the rows */
    $join := stores@$st
        .sales@$sl[$sl.`store number` = $st.`store number`]
        .products@$p[$sl.product = $p.name].{
            "state": $st.state,
            "category": $p.category,
            "product": $sl.product,
            "quantity": $sl.quantity
        };

    /* Next perform the grouping */
    $join{state: {category: {product: $sum(quantity)}}}
)
{
  "CA": {
    "kitchen": {
      "broiler": 20,
      "toaster": 150
    },
    "clothes": {
      "socks": 510
    }
  },
  "MA": {
    "kitchen": {
      "toaster": 50,
      "blender": 250
    },
    "clothes": {
      "shirt": 10
    }
  }
}

See https://try.jsonata.org/-0LDeg4hI.

JSON to JSON Transformations

The following query takes satellite data, and summarizes which satellites are visible. The data for the query is a simplified version of a Stellarium file that contains this information.

{
  "creator": "Satellites plugin version 0.6.4",
  "satellites": {
    "AAU CUBESAT": {
      "tle1": "1 27846U 03031G 10322.04074654  .00000056  00000-0  45693-4 0  8768",
      "visible": false
    },
    "AJISAI (EGS)": {
      "tle1": "1 16908U 86061A 10321.84797408 -.00000083  00000-0  10000-3 0  3696",
      "visible": true
    },
    "AKARI (ASTRO-F)": {
      "tle1": "1 28939U 06005A 10321.96319841  .00000176  00000-0  48808-4 0  4294",
      "visible": true
    }
  }
}

We want to query this data to return a summary. The following is a JSONata query that returns the desired result.

(
    $sats := $each(satellites, function($v, $k) {$merge([{"name": $k}, $v])});
    {
        "visible": [$sats[visible].name],
        "invisible": [$sats[$not(visible)].name]
    }
)
{
  "visible": [
    "AJISAI (EGS)",
    "AKARI (ASTRO-F)"
  ],
  "invisible": [
    "AAU CUBESAT"
  ]
}

See https://try.jsonata.org/45R_q6dVJ.

JSON Updates

Suppose an application receives an order that contains a credit card number, and needs to put the user on probation.

Data for the users:

[
  {
    "name": "Deadbeat Jim",
    "address": "1 E 161st St, Bronx, NY 10451",
    "risk tolerance": "high"
  },
  {
    "name": "Rich Jim",
    "address": "123 Main St, Azusa, CA 91010",
    "risk tolerance": "low"
  }
]

The following query adds "status": "credit card declined" to the user’s record.

[$.$merge([$, name = "Deadbeat Jim" ? {"status": "credit card declined"} : {}])]
 

After the update is finished, the user’s record looks like this:

[
  {
    "name": "Deadbeat Jim",
    "address": "1 E 161st St, Bronx, NY 10451",
    "risk tolerance": "high",
    "status": "credit card declined"
  },
  {
    "name": "Rich Jim",
    "address": "123 Main St, Azusa, CA 91010",
    "risk tolerance": "low"
  }
]

See https://try.jsonata.org/btnJJFsBV.

Data Transformations

Many applications need to modify data before forwarding it to another source. Suppose an application make videos available using feeds from Youtube. The following data comes from one such feed:

{
  "encoding": "UTF-8",
  "feed": {
    "author": [
      {
        "name": {
          "$t": "YouTube"
        },
        "uri": {
          "$t": "http://www.youtube.com/"
        }
      }
    ],
    "category": [
      {
        "scheme": "http://schemas.google.com/g/2005#kind",
        "term": "http://gdata.youtube.com/schemas/2007#video"
      }
    ],
    "entry": [
      {
        "app$control": {
          "yt$state": {
            "$t": "Syndication of this video was restricted by its owner.",
            "name": "restricted",
            "reasonCode": "limitedSyndication"
          }
        },
        "author": [
          {
            "name": {
              "$t": "beyonceVEVO"
            },
            "uri": {
              "$t": "http://gdata.youtube.com/feeds/api/users/beyoncevevo"
            }
          }
        ]
      }
    ]
  }
}

The following query creates a modified copy of the feed by removing all entries that restrict syndication.

{
    "encoding": encoding,
    "feed": {
        "author": feed.author,
        "category": feed.category,
        "entry": [feed.entry[app$control.yt$state.name != "restricted"]]
    }
}
{
  "encoding": "UTF-8",
  "feed": {
    "author": [
      {
        "name": {
          "$t": "YouTube"
        },
        "uri": {
          "$t": "http://www.youtube.com/"
        }
      }
    ],
    "category": [
      {
        "scheme": "http://schemas.google.com/g/2005#kind",
        "term": "http://gdata.youtube.com/schemas/2007#video"
      }
    ],
    "entry": []
  }
}

See https://try.jsonata.org/iUwXbcPT2.

Understanding JSONata

kgiraffe: GraphQL for Kafka and Schema Registry

While working with Apache Kafka and Confluent Schema Registry, I often spend time switching between tools like

  • kcat (formerly kafkacat)
  • the Kafka Avro console consumer/producer
  • the Kafka JSON Schema console consumer/producer
  • the Kafka Protobuf console consumer/producer
  • the Schema Registry Maven plugin

Recently I decided to combine much of the functionality of the above tools into a single tool called kgiraffe, which is sort of a companion to kcat. While kcat excels at the command line, kgiraffe exposes its functionality with a GraphQL interface. In addition, kgiraffe fully supports topics using Avro, JSON Schema, and Protobuf schemas. kgiraffe provides the following features for managing both topics and schemas out of the box:

  • Topic Management
    • Query topics
    • Publish to topics
    • Subscribe to topics
    • Full support for Avro, JSON Schema, and Protobuf records
  • Schema Management
    • Validate and stage schemas
    • Test schema compatibility
    • Query schemas and subjects
    • Register schemas

Before kgiraffe starts, you indicate which schemas are associated with which topics. Based on the specified schemas, kgiraffe will automatically generate a GraphQL schema that can be used to read, write, and subscribe to your topics. kgiraffe also allows you to validate and stage schemas, as well as test them for compatibility, before you register them to Schema Registry.

After downloading a release, starting kgiraffe is as easy as

$ bin/kgiraffe -b mybroker -t mytopic -r http://schema-registry-url:8081
 

Once kgiraffe is running, browse to http://localhost:8765/kgiraffe to launch the GraphQL Playground.

Example GraphQL queries can be found at the README.

If you find yourself bouncing between tools while working with Kafka and Schema Registry, please give kgiraffe a try and let me know what you think.

kgiraffe: GraphQL for Kafka and Schema Registry

Understanding Protobuf Compatibility

In a recent article, I presented rules for evolving JSON Schemas in a backward, forward, and fully compatible manner. The backward compatibility rules for Protocol Buffers (Protobuf) are much simpler, and most of them are outlined in the section “Updating A Message Type” in the Protobuf Language Guide.

The Confluent Schema Registry uses these rules to check for compatibility when evolving a Protobuf schema. However, unlike some other tools, the Confluent Schema Registry does not require that a removed field be marked as reserved when evolving a schema in a backward compatible manner. The reserved keyword can be used to prevent incompatible changes in future versions of a schema. Instead of requiring the reserved keyword, the Confluent Schema Registry achieves compatibility across multiple versions by allowing compatibility checks to be performed transitively. The reserved keyword is most useful for those tools that do not perform transitive compatibility checks. Using it when removing fields is still considered a best practice, however.

For the Protobuf oneof construct, there are four additional compatibility issues that are mentioned in the Protobuf Language Guide.  Since the text of the language guide is a bit terse, for the rest of this article I’ll discuss each of these four issues further. First, let me reproduce the text below.

  1. Be careful when adding or removing oneof fields. If checking the value of a oneof returns None/NOT_SET, it could mean that the oneof has not been set or it has been set to a field in a different version of the oneof. There is no way to tell the difference, since there’s no way to know if an unknown field on the wire is a member of the oneof.
  2. Move fields into or out of a oneof: You may lose some of your information (some fields will be cleared) after the message is serialized and parsed. However, you can safely move a single field into a new oneof and may be able to move multiple fields if it is known that only one is ever set.
  3. Delete a oneof field and add it back: This may clear your currently set oneof field after the message is serialized and parsed.
  4. Split or merge oneof: This has similar issues to moving regular fields.

Each of the above issues can cause information to be lost. For that reason, these are considered backward compatibility issues.

Adding or removing oneof fields

Let’s look at an example of when a oneof field is removed.  Let’s say we have the following Protobuf message with fields f1 and f2 in a oneof.

message SampleMessage {
  oneof test_oneof {
    string f1 = 1;
    string f2 = 2;
  }
}

Later we decide to remove the field f2.

message SampleMessage {
  oneof test_oneof {
    string f1 = 1;
  }
}

If an old binary with the first version of SampleMessage creates a message and sets f2 in test_oneof, then a new binary with the second version of SampleMessage will see test_oneof as unset. The value of f2 will be contained in the unknown fields of the message, but there’s no way to tell which, if any, of the unknown fields were previously in test_oneof. The information that is lost is whether the oneof was really set or not.

This behavior may be unexpected when comparing it to how Protobuf handles removing a value from an enum. Any unrecognized enum values encountered on the wire are retained by the binary, and a field using the enum will still appear as set (to an unrecognized value) if the binary encounters a value that is no longer enumerated in the enum.

The information loss from removing a oneof field can cascade and cause other information to be lost, depending on the scenario. For example, if the new binary reads the message with f2 set by the old binary and modifies the same message by setting f1, then when the old binary reads the modified message, the old binary may see f2 as still set! In this scenario, f2 can still appear as set because setting f1 in the new binary does not clear the unknown field for f2, since for the new binary, f2 is not associated with test_oneof. When reading from the wire, the old binary may read f2 after f1, and then clear f1. This assumes that f2 is deserialized after f1. The value of f1 will also be lost for the old binary.

Furthermore, when reading the modified message, a binary in a different programming language may read f1 after f2, thus clearing the value of f2. In this case, the value for f2 will be lost. The actual serialization order is implementation-specific and subject to change. Most implementations serialize fields in ascending order of their field numbers, but this is not guaranteed, especially in the presence of unknown fields. Also, inconsistencies in how various programming languages handle unknown fields prevents a canonical serialization order from being defined. The Protobuf Encoding documentation states the following:

  • By default, repeated invocations of serialization methods on the same protocol buffer message instance may not return the same byte output; i.e. the default serialization is not deterministic.
  • Deterministic serialization only guarantees the same byte output for a particular binary. The byte output may change across different versions of the binary.

So even a later version of the binary in the same programming language may see different results.

For these reasons, removing a field from a oneof is considered a backward incompatible change. Likewise, adding a field to a oneof is considered a forward incompatible change (since a forward compatibility check is just a backward compatibility check with the schemas switched).

Moving fields into or out of a oneof

Consider the following Protobuf message with f1 in a oneof and f2 and f3 outside of the oneof.

message SampleMessage {
  oneof test_oneof {
    string f1 = 1;
  }
  string f2 = 2;
  string f3 = 3;
}

Later we decide to move both f2 and f3 into test_oneof.

message SampleMessage {
  oneof test_oneof {
    string f1 = 1;
    string f2 = 2;
    string f3 = 3;
  }
}

If an old binary with the first version of SampleMessage creates a message and sets f1, f2, and f3, then when a new binary with the second version of SampleMessage reads the message from the wire, it will clear two of the fields (depending on serialization order) and leave only one field, say f3, with its value. Thus the values of the other two fields will be lost.

As mentioned, the order in which fields are serialized is implementation-specific. For the fields in the oneof, only the value of the last field read from the wire is retained. Therefore, an additional problem is that a different field in the oneof may appear as set when using a different implementation.

For these reasons, moving fields into a oneof is considered a backward incompatible change, unless the oneof is new and only a single field is moved into it. Moving fields out of a oneof is also a backward incompatible change, since this has the effect of removing the fields from the oneof.

Deleting a oneof and adding it back

In this scenario there are three versions involved. The first version has f1 and f2 in a oneof.

message SampleMessage {
  oneof test_oneof {
    string f1 = 1;
    string f2 = 2;
  }
}

In the second version, we remove the field f2 from the oneof.

message SampleMessage {
  oneof test_oneof {
    string f1 = 1;
  }
}

Later we add the field f2 back to the oneof.

message SampleMessage {
  oneof test_oneof {
    string f1 = 1;
    string f2 = 2;
  }
}

If an old binary with the first version of SampleMessage creates a message and sets f2 in test_oneof, then next a binary with the second version reads the message and sets f1 in test_oneof, and finally a later binary with the third version reads the modified message, it may see f2 as set, and the value of f1 as lost. This is a similar scenario to that described above involving only two binaries when removing a field from the oneof. Here the modified message is being read by a third later binary, rather than by the first original binary in the scenario involving only two binaries.

Splitting or merging a oneof

Consider a Protobuf message with two oneof constructs.

message SampleMessage {
  oneof test_oneof {
    string f1 = 1;
  }
  oneof test_oneof {
    string f2 = 2;
  }
}

Later we decide to merge the oneof constructs.

message SampleMessage {
  oneof test_oneof {
    string f1 = 1;
    string f2 = 2;
  }
}

If an old binary with the first version of SampleMessage creates a message and sets both f1 and f2, then when a new binary with the second version of SampleMessage reads the message, it will see only one of the fields as set, with the value of the other field being lost. Again, which field is set and which is lost depends on the implementation. This issue is similar to that described above involving moving existing fields into a oneof.

Summary

In Protobuf, evolving schemas with oneof constructs can be problematic for a couple of reasons:

  1. Information may be lost, such as the values of fields that were moved into a oneof, and whether a oneof was set or not. The information loss can cascade and result in further loss, such as when an unknown field reappears in a oneof, causing the previous value of the oneof to be cleared.
  2. A schema change can unfortunately cause multiple fields which have been set to be pulled into the same oneof. If this happens, all of the fields except one will be cleared. Since the order in which fields are serialized to the wire is implementation-specific and subject to change, especially in the presence of unknown fields, the field that is retained may be different between implementations.

For these reasons, Confluent Schema Registry implements two backward compatibility checks for the oneof construct:

  1. Removing a field from a oneof is a backward incompatible change.
  2. Moving existing fields into a oneof is a backward incompatible change, unless it is a single field being moved into a new oneof.
Understanding Protobuf Compatibility

The Enterprise is Made of Events, Not Things

“The world is made of events, not things.” — Carlo Rovelli

“Every company is becoming software.” — Jay Kreps

In The Order of Time, the physicist Carlo Rovelli argues that the theory of relativity compels us to view the world not as made of things or entities, but as events or processes. In the world of technology, Jay Kreps argues that the core processes that a company executes are increasingly being captured in software, and these processes consume and produce the business events that drive the company. From the viewpoint of both Rovelli and Kreps, one can view a company as made of events or processes.

The dichotomy between events and things (with state) has been noted by many. Martin Kleppmann captured it elegantly in his book Designing Data-Intensive Applications as follows:

One technique often used in Domain-Driven Development (DDD) is event sourcing, which derives application state from immutable business events. Event sourcing often involves arbitrary logic to derive the state. A more formal model would use a finite-state machine (FSM) to derive the state.

In a software architecture involving a network of FSMs, each FSM can perform state transitions when receiving events, produce events for other FSMs to consume, and persist some internal data. With FSMs, one can implement several other models, such as the following:

  • Simple CRUD entities.  This is the most common use-case for event sourcing.
  • Function as a Service (FaaS).  In this case, each FSM only has one state, and a single transition to and from that state, during which it performs the function.
  • Actor model.  In the actor model, actors receive and send events, but are otherwise passive when no events occur.
  • Intelligent agents (IA).  Intelligent agents are similar to actors in that they receive and send events, but are generally viewed as continuously active in order to achieve some goal.

In the rest of this article I’ll show how to implement a network of intelligent agents using Kafka Streams and finite-state machines.

The implementation is comprised of two parts:

  1. An FSM implementation that sits atop Kafka Streams, called a KMachine.  A KMachine definition is comprised of a set of states, a set of state transitions, some internal data, and a set of functions that can be attached to state transitions.  The entire KMachine definition can be expressed in YAML, in which the functions are written as JavaScript.
  2. A REST-based web application that can be used to create and manage both KMachine definitions and instances. A KMachine instance is created for each unique key in the input stream.

To demonstrate how KMachine can be used to implement a network of intelligent agents, I’ve borrowed an example from “Programming Game AI By Example,” by Mat Buckland.  In this example, two intelligent agents inhabit a gaming environment that represents a miners’ town in the Wild West.  One agent is a gold miner, and the other agent is the miner’s wife.

As a preview, here is sample output of the interaction between the miner and his wife:

Miner Bob: Walkin' to the goldmine
Miner Bob: Pickin' up a nugget
Elsa: Makin' the bed
Miner Bob: Pickin' up a nugget
Elsa: Makin' the bed
Miner Bob: Pickin' up a nugget
Miner Bob: Ah'm leavin' the goldmine with mah pockets full o' sweet gold
Miner Bob: Goin' to the bank. Yes siree
Miner Bob: Depositing gold. Total savings now: 3
Miner Bob: Leavin' the bank
Miner Bob: Walkin' to the goldmine
Miner Bob: Pickin' up a nugget
Elsa: Washin' the dishes
Miner Bob: Pickin' up a nugget
Elsa: Moppin' the floor
Miner Bob: Pickin' up a nugget
Miner Bob: Ah'm leavin' the goldmine with mah pockets full o' sweet gold
Miner Bob: Goin' to the bank. Yes siree
Miner Bob: Depositing gold. Total savings now: 6
Miner Bob: WooHoo! Rich enough for now. Back home to mah li'lle lady
Miner Bob: Leavin' the bank
Miner Bob: Walkin' home
Elsa: Hi honey. Let me make you some of mah fine country stew
Miner Bob: ZZZZ... 
Elsa: Putting the stew in the oven
Elsa: Fussin' over food
Miner Bob: ZZZZ... 
Elsa: Fussin' over food
Elsa: Puttin' the stew on the table
Elsa: StewReady! Lets eat
Miner Bob: All mah fatigue has drained away. Time to find more gold!
Elsa: Time to do some more housework!
Miner Bob: Walkin' to the goldmine

Both the miner and his wife are implemented as separate KMachine definitions. Here is the KMachine definition that represents the miner:

name: miner
input: miner
init: goHomeAndSleepTilRested
states:
  - name: enterMineAndDigForNugget
    onEntry: enterMineAction
    onExit: exitMineAction
  - name: visitBankAndDepositGold
    onEntry: enterBankAction
    onExit: exitBankAction
  - name: goHomeAndSleepTilRested
    onEntry: enterHomeAction
    onExit: exitHomeAction
  - name: quenchThirst
    onEntry: enterSaloonAction
    onExit: exitSaloonAction
  - name: eatStew
    onEntry: startEatingAction
    onExit: finishEatingAction
transitions:
  - type: stayInMine
    from: enterMineAndDigForNugget
    to:
    guard:
    onTransition: stayInMineAction
  - type: visitBank
    from: enterMineAndDigForNugget
    to: visitBankAndDepositGold
    guard:
    onTransition:
  - type: quenchThirst
    from: enterMineAndDigForNugget
    to: quenchThirst
    guard:
    onTransition:
  - type: goHome
    from: visitBankAndDepositGold
    to: goHomeAndSleepTilRested
    guard:
    onTransition:
  - type: enterMine
    from: visitBankAndDepositGold
    to: enterMineAndDigForNugget
    guard:
    onTransition:
  - type: enterMine
    from: goHomeAndSleepTilRested
    to: enterMineAndDigForNugget
    guard:
    onTransition:
  - type: enterMine
    from: quenchThirst
    to: enterMineAndDigForNugget
    guard:
    onTransition:
  - type: stayHome
    from: goHomeAndSleepTilRested
    to:
    guard:
    onTransition: stayHomeAction
  - type: stewReady
    from: goHomeAndSleepTilRested
    to: eatStew
    guard:
    onTransition: imComingAction
  - type: finishEating
    from: eatStew
    to: goHomeAndSleepTilRested
    guard:
    onTransition:
data:
  location: shack
  goldCarried: 0
  moneyInBank: 0
  thirst: 0
  fatigue: 0
functions:
  enterMineAction: >-
    (ctx, key, value, data) => {
      if (data.location != 'goldMine') {
        console.log("Miner " + key + ": Walkin' to the goldmine");
        data.location = 'goldMine';
      }
      ctx.sendMessage(ctx.topic(), key, { type: 'stayInMine' }, 0);
    }
  stayInMineAction: >-
    (ctx, key, value, data) => {
      data.goldCarried++;
      data.fatigue++;
      console.log("Miner " + key + ": Pickin' up a nugget");
      if (data.goldCarried >= 3) {
        ctx.sendMessage(ctx.topic(), key, { type: 'visitBank' }, 0);
      } else if (data.thirst >= 5) {
        ctx.sendMessage(ctx.topic(), key, { type: 'quenchThirst' }, 0);
      } else {
        ctx.sendMessage(ctx.topic(), key, { type: 'stayInMine' }, 1000);
      }
    }
  exitMineAction: >-
    (ctx, key, value, data) => {
      console.log("Miner " + key + ": Ah'm leavin' the goldmine with mah pockets full o' sweet gold");
    }
  enterBankAction: >-
    (ctx, key, value, data) => {
      console.log("Miner " + key + ": Goin' to the bank. Yes siree");
      data.location = 'bank';
      data.moneyInBank += data.goldCarried;
      data.goldCarried = 0;
      console.log("Miner " + key + ": Depositing gold. Total savings now: " + data.moneyInBank);
      if (data.moneyInBank >= 5) {
        console.log("Miner " + key + ": WooHoo! Rich enough for now. Back home to mah li'lle lady");
        ctx.sendMessage(ctx.topic(), key, { type: 'goHome' }, 0);
      } else {
        ctx.sendMessage(ctx.topic(), key, { type: 'enterMine' }, 0);
      }
    }
  exitBankAction: >-
    (ctx, key, value, data) => {
      console.log("Miner " + key + ": Leavin' the bank");
    }
  enterHomeAction: >-
    (ctx, key, value, data) => {
      if (data.location != 'shack') {
        console.log("Miner " + key + ": Walkin' home");
        data.location = 'shack';
        if (data.wife) {
          ctx.sendMessage('miners_wife', data.wife, { type: 'hiHoneyImHome' }, 0);
        }
      }
      ctx.sendMessage(ctx.topic(), key, { type: 'stayHome' }, 0);
    }
  stayHomeAction: >-
    (ctx, key, value, data) => {
      if (value.wife) {
        data.wife = value.wife;
      }
      if (data.fatigue < 5) {
        console.log("Miner " + key + ": All mah fatigue has drained away. Time to find more gold!");
        data.location = 'shack';
        ctx.sendMessage(ctx.topic(), key, { type: 'enterMine' }, 0);
      } else {
        data.fatigue--;
        console.log("Miner " + key + ": ZZZZ... ");
        ctx.sendMessage(ctx.topic(), key, { type: 'stayHome' }, 1000);
      }
    }
  exitHomeAction: >-
    (ctx, key, value, data) => {
    }
  enterSaloonAction: >-
    (ctx, key, value, data) => {
      if (data.moneyInBank >= 2) {
        data.thirst = 0;
        data.moneyInBank -= 2;
        console.log("Miner " + key + ": That's mighty fine sippin liquer");
      }
      ctx.sendMessage(ctx.topic(), key, { type: 'enterMine' }, 0);
    }
  exitSaloonAction: >-
    (ctx, key, value, data) => {
      console.log("Miner " + key + ": Leavin' the saloon, feelin' good");
    }
  imComingAction: >-
    (ctx, key, value, data) => {
      console.log("Miner " + key + ": Okay Hun, ahm a comin'!");
    }
  startEatingAction: >-
    (ctx, key, value, data) => {
      console.log("Miner " + key + ": Smells Reaaal goood Elsa!");
      console.log("Miner " + key + ": Tastes real good too!");
      ctx.sendMessage(ctx.topic(), key, { type: 'finishEating' }, 0);
    }
  finishEatingAction: >-
    (ctx, key, value, data) => {
      console.log("Miner " + key + ": Thankya li'lle lady. Ah better get back to whatever ah wuz doin'");
    }
 

The state transition diagram for the miner, generated using the DOT graph language, is shown below.

Next is the KMachine definition that represents the miner’s wife:

name: minersWife
input: miners_wife
init: doHouseWork
states:
  - name: doHouseWork
    onEntry: startHouseWorkAction
    onExit:
  - name: visitBathroom
    onEntry: enterBathroomAction
    onExit: exitBathroomAction
  - name: cookStew
    onEntry: startCookingAction
    onExit: finishCookingAction
transitions:
  - type: continueHouseWork
    from: doHouseWork
    to:
    guard:
    onTransition: continueHouseWorkAction
  - type: natureCalls
    from: doHouseWork
    to: visitBathroom
    guard:
    onTransition:
  - type: natureCalls
    from: cookStew
    to: visitBathroom
    guard:
    onTransition:
  - type: continuePrevious
    from: visitBathroom
    to: revertToPreviousState
    toType: Function
    guard:
    onTransition:
  - type: hiHoneyImHome
    from: doHouseWork
    to: cookStew
    guard:
    onTransition: hiHoneyAction
  - type: hiHoneyImHome
    from: visitBathroom
    to: cookStew
    guard:
    onTransition: hiHoneyAction
  - type: continueCooking
    from: cookStew
    to:
    guard:
    onTransition: continueCookingAction
  - type: stewReady
    from: cookStew
    to: doHouseWork
    guard:
    onTransition: letsEatAction
data:
  location: shack
  cooking: false
functions:
  startHouseWorkAction: >-
    (ctx, key, value, data) => {
      console.log(key + ": Time to do some more housework!");
      ctx.sendMessage(ctx.topic(), key, { type: 'continueHouseWork' }, 0);
    }
  continueHouseWorkAction: >-
    (ctx, key, value, data) => {
      if (value.husband) {
        data.husband = value.husband;
      }
      switch (Math.floor(Math.random() * 3)) {
        case 0:
          console.log(key + ": Moppin' the floor");
          break;
        case 1:
          console.log(key + ": Washin' the dishes");
          break;
        case 2:
          console.log(key + ": Makin' the bed");
          break;
      }
      if (Math.random() < 0.1) {
        ctx.sendMessage(ctx.topic(), key, { type: 'natureCalls' }, 0);
      } else {
        ctx.sendMessage(ctx.topic(), key, { type: 'continueHouseWork' }, 1000);
      }
    }
  enterBathroomAction: >-
    (ctx, key, value, data) => {
      console.log(key + ": Walkin' to the can. Need to powda mah pretty li'lle nose");
      console.log(key + ": Ahhhhhh! Sweet relief!");
      ctx.sendMessage(ctx.topic(), key, { type: 'continuePrevious' }, 0);
    }
  exitBathroomAction: >-
    (ctx, key, value, data) => {
      console.log(key + ": Leavin' the Jon");
    }
  revertToPreviousState: >-
    (ctx, key, value, data) => {
      return data.cooking ? 'cookStew' : 'doHouseWork'
    }
  hiHoneyAction: >-
    (ctx, key, value, data) => {
      console.log(key + ": Hi honey. Let me make you some of mah fine country stew");
    }
  startCookingAction: >-
    (ctx, key, value, data) => {
      if (!data.cooking) {
        console.log(key + ": Putting the stew in the oven");
        ctx.sendMessage(ctx.topic(), key, { type: 'stewReady' }, 2000);
        data.cooking = true;
      }
      ctx.sendMessage(ctx.topic(), key, { type: 'continueCooking' }, 0);
    }
  continueCookingAction: >-
    (ctx, key, value, data) => {
      console.log(key + ": Fussin' over food");
      if (Math.random() < 0.1) {
        ctx.sendMessage(ctx.topic(), key, { type: 'natureCalls' }, 0);
      } else {
        ctx.sendMessage(ctx.topic(), key, { type: 'continueCooking' }, 1000);
      }
    }
  finishCookingAction: >-
    (ctx, key, value, data) => {
      console.log(key + ": Puttin' the stew on the table");
    }
  letsEatAction: >-
    (ctx, key, value, data) => {
      console.log(key + ": StewReady! Lets eat");
      if (data.husband) {
        ctx.sendMessage('miner', data.husband, { type: 'stewReady' }, 0);
      }
      data.cooking = false;
    }
 

The state transition diagram for the miner’s wife is shown below.

Each KMachine definition, for both the miner and his wife, is entirely contained in the YAML above. A KMachine definition describes an FSM as follows:

  • name – The id of the definition for the FSM.
  • input – The input topic.
  • init – The initial state of the FSM.
  • states – The states of the FSM.  Each state can have the following:
    • name – The name of the state.
    • onEntry – The function to invoke on entry to the state.
    • onExit – The function to invoke on exit of the state.
  • transitions – The state transitions.  Each transition can have the following:
    • type – The event type that triggers the transition.
    • from – The source state.
    • to – The destination, which can be
      • The name of the destination state.
      • The function to determine the destination state.
      • null, which represents an internal state transition, where the state does not change, the onEntry or onExit functions are not invoked, but the guard and onTransition functions are invoked.
    • toType – Either “State” or “Function”.
    • guard – A boolean function to indicate whether the transition should occur.
    • onTransition – The function to invoke on transition.
  • data – A set of key-value pairs that represent the internal data for the FSM.
  • functions – A set of JavaScript functions that can be attached to states and transitions.  Each function takes the following parameters:
    • ctx – A context object that provides the following methods:
      • topic() – The input topic.
      • sendMessage() – Sends a message to a topic.
    • key – The key of the event, as a JSON message.
    • value – The value of the event, as a JSON message.  The value is expected to have a property named “type” to trigger state transitions.
    • data – The local data, which can be mutated.

To see the miner and his wife in action, you’ll first need to start a local instance of Kafka. Then create two topics, one for the miner and one for his wife.

./bin/kafka-topics --create --topic miner --bootstrap-server localhost:9092
./bin/kafka-topics --create --topic miners_wife --bootstrap-server localhost:9092
 

Next, clone the project at https://github.com/rayokota/kmachines and start up the web application.

git clone https://github.com/rayokota/kmachines.git
cd kmachines
mvn clean install -DskipTests
mvn -pl kmachines-rest-app compile quarkus:dev -Dquarkus.http.port=8081
 

In a separate window, create the KMachine definitions for the miner and his wife.

cd kmachines
curl -X POST -H "Content-Type: text/yaml" --data-binary @kmachines-rest-app/src/test/resources/miner_messaging.yml "http://localhost:8081/kmachines"
curl -X POST -H "Content-Type: text/yaml" --data-binary @kmachines-rest-app/src/test/resources/miners_wife_messaging.yml "http://localhost:8081/kmachines"
 

Now produce an event to create a KMachine instance for a miner named “Bob”, and another event to create a KMachine instance for his wife named “Elsa”. This can be done with kafkacat, for example. Events are represented as a pair of JSON messages for the key and value. The key corresponds to a unique KMachine instance, while the value is the message used to trigger state transitions, which must have a “type” property to indicate the event type. In the command below, a dot (.) is used to separate the key from the value (using the -K option of kafkacat).

echo '"Bob".{ "type": "stayHome", "wife": "Elsa" }' | kafkacat -b localhost:9092 -K . -P -t miner
echo '"Elsa".{ "type": "continueHouseWork", "husband": "Bob" }' | kafkacat -b localhost:9092 -K . -P -t miners_wife
 

You should see the miner and his wife interacting as above. You can query the state of the FSM for the miner or the wife at any time.

curl -X POST -H "Content-Type: application/json" http://localhost:8081/kmachines/miner/state --data '"Bob"'
curl -X POST -H "Content-Type: application/json" http://localhost:8081/kmachines/minersWife/state --data '"Elsa"'
 

To stop the agents, run the following commands.

curl -X DELETE "http://localhost:8081/kmachines/minersWife" 
curl -X DELETE "http://localhost:8081/kmachines/miner" 
 

That’s it!

The Enterprise is Made of Events, Not Things