Using Data Contracts with Confluent Schema Registry

The following post originally appeared in the Confluent blog on October 18, 2023.

The Confluent Schema Registry plays a pivotal role in ensuring that producers and consumers in a streaming platform are able to communicate effectively. Ensuring the consistent use of schemas and their versions allows producers and consumers to easily interoperate, even when schemas evolve over time.

As streaming has become more ubiquitous, enterprises find that downstream components such as consumers are often tasked with handling data inconsistencies, incompatible changes, and expensive and complicated transformations in order to be able to process the data effectively. This has led to an effort to shift these responsibilities to the source of the data, such as the producer, an activity often referred to as shift-left. In the context of modern data architectures, such as streaming and data mesh, the effort to shift-left has led to an emphasis on the data contract. As a result, Confluent Schema Registry, both in Confluent Platform Enterprise and Confluent Cloud, now supports the use of data contracts.

A data contract is a formal agreement between an upstream component and a downstream component on the structure and semantics of data that is in motion. The upstream component enforces the data contract, while the downstream component can assume that the data it receives conforms to the data contract. Data contracts are important because they provide transparency over dependencies and data usage in a streaming architecture. They help to ensure the consistency, reliability and quality of the data in event streams, and they provide a single source of truth for understanding the data in motion.

In this article, we’ll walk through an example of enhancing a schema to be a full-fledged data contract. Our example will involve the following actions:

  • Defining an initial schema for the data contract
  • Enhancing the data contract with business metadata
  • Adding data quality rules to the data contract
  • Specifying custom actions for the data contract
  • Adding migration rules to the data contract for a complex schema evolution

Defining an Order Schema

Let’s create a simple Avro schema to represent a customer order.

{
  "name": "Order",
  "namespace": "acme.com",
  "type": "record",
  "fields": [
    {
      "name": "orderId",
      "type": "int"
    },
    {
      "name": "customerId",
      "type": "int"
    },
    { 
      "name": "totalPriceCents",
      "type": "int"
    },
    {
      "name": "state",
      "type": {
        "type": "enum",
        "name": "OrderStatus",
        "symbols": [
          "Pending",
          "Processing",
          "Completed",
          "Canceled",
          "Unknown"
        ],
        "default": "Unknown"  
      }
    },
    {
      "name": "timestamp",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    }
  ]
}

Assuming the above schema is in a file named “order.avsc”, upload it to a local Schema Registry as follows:

jq -n --rawfile schema order.avsc '{schema: $schema}' |
  curl http://localhost:8081/subjects/orders-value/versions --json @-

Let’s try producing and consuming with our new schema using the Avro console producer and consumer, which you can find the documentation for here. First, start the consumer.

./bin/kafka-avro-console-consumer \
  --topic orders \
  --bootstrap-server localhost:9092

In a separate terminal, start the producer, and pass the schema ID that was returned during registration as the value of “value.schema.id”.

./bin/kafka-avro-console-producer \
  --topic orders \
  --broker-list localhost:9092 \
  --property value.schema.id=<id>

{"orderId": 1, "customerId": 2, "totalPriceCents": 12000, "state": "Pending", "timestamp": 1693591356 }

When the above record in JSON format is sent via the producer, it will be received by the consumer.

Enhancing the Data Contract with Metadata

As mentioned, in the effort to shift-left the responsibilities for the quality and timeliness of streaming data, data contracts are typically defined by the producers. By examining the data contract, a downstream consumer can determine which person or team is responsible for the data contract and what service level objectives (SLOs) the data contract is attempting to achieve.

In a file named “order_metadata.json”, we declare that this contract is owned by the Orders Team, and that one of the service level objectives is to ensure that orders are available to downstream consumers no later than 10 seconds after the order timestamp. We’ll make use of these metadata properties later in this article.

{
  "metadata": {
    "properties": {
       "owner": "OrdersTeam",
       "owner_email": "orders.team@acme.com",
       "slo_timeliness_secs": "10"
    } 
  }
}

Register the metadata directly to Schema Registry without specifying a schema. When omitting the schema while registering metadata, a new schema version will be created with the schema of the previous version.

curl http://localhost:8081/subjects/orders-value/versions \
  --json @order_metadata.json

Adding Data Quality Rules to the Data Contract

As shown in the previous section, data contracts can capture business metadata. They can also be used to capture integrity constraints or data quality rules to ensure the quality of the data. In a file named “order_ruleset.json”, let’s add a rule that the price needs to be a positive number. The rule is specified as a Google Common Expression Language (CEL) expression.

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "checkTotalPrice",
        "kind": "CONDITION",
        "type": "CEL",
        "mode": "WRITE",
        "expr": "message.totalPriceCents > 0"
      }
    ]
  }
}

Similarly to the metadata, register the rule set directly to Schema Registry without specifying a schema, as a new schema version will be created with the schema (as well as metadata) of the previous version.

curl http://localhost:8081/subjects/orders-value/versions \
  --json @order_ruleset.json

The above rule will cause all messages with a non-positive price to be rejected. Instead of just rejecting the invalid messages, we may want to capture them in a dead letter queue for further analysis or processing. To do so, set the “onFailure” action to “DLQ”.

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "checkTotalPrice",
        "kind": "CONDITION",
        "type": "CEL",
        "mode": "WRITE",
        "expr": "message.totalPriceCents > 0",
        "params": {
          "dlq.topic": "bad_orders"
        },
        "onFailure": "DLQ"
      }
    ]
  }
}

Let’s try out our data quality rule. First start the consumer.

./bin/kafka-avro-console-consumer \
  --topic orders \
  --bootstrap-server localhost:9092

In a separate terminal, start the producer. The last two properties passed to the console producer are used by the DLQ action.

./bin/kafka-avro-console-producer \
  --topic orders \
  --broker-list localhost:9092 \
  --property value.schema.id=<id> \
  --property bootstrap.servers=localhost:9092 #dlq \
  --property dlq.auto.flush=true #dlq

{"orderId": 1, "customerId": 2, "totalPriceCents": -1, "state": "Pending", "timestamp": 1693591356 }

We should see an error of the form:

Expr failed: 'message.totalPriceCents > 0'

If we check the DLQ, we’ll see the errant record.

To learn more about CEL, see Understanding CEL in Data Contract Rules.

Specifying Custom Rule Executors and Actions

The rule framework for data contracts is completely customizable. One can specify both custom rule executors and actions for a data contract.

Below is the Java interface for rule executors. The “transform” method is used by rules of both CONDITION and TRANSFORM. In the former case, it should return a Boolean value; in the latter case, it should return the transformed value.

public interface RuleExecutor extends RuleBase {
  String type();

  Object transform(RuleContext ctx, Object message) throws RuleException;
}

Below is the Java interface for rule actions. An exception is passed to the rule action if the rule executor failed, which includes a condition that returned false. After performing its work, the rule action should throw this exception if it exists.

public interface RuleAction extends RuleBase {
  String type();

  void run(RuleContext ctx, Object message, RuleException ex) 
    throws RuleException;
}

Assume that on the consumer side we want to check the timeliness of data to see if it conforms to the declared service level objective. If it doesn’t, we want to send an alert in the form of an email message. Below is a custom rule action to check the timeliness SLO. Note that the “type” of the action is specified as “EMAIL”. Also, the expression “ctx.getParameter(name)” will look for the value of a rule parameter with the given name. If not found, it will then look for the value of a metadata property with the given name.

public class EmailAction implements RuleAction {

  private static final String USERNAME = "username";
  private static final String PASSWORD = "password";

  private String username;
  private String password;

  public String type() {
    return "EMAIL";
  }

  @Override
  public void configure(Map<String, ?> configs) {
    this.username = (String) configs.get(USERNAME);
    this.password = (String) configs.get(PASSWORD);
  }

  public void run(RuleContext ctx, Object message, RuleException ex)
    throws RuleException {
    if (!meetsSloTimeliness(ctx, message)) {
      sendMail(ctx, message);
    }
    if (ex != null) {
      throw ex;
    }
  }

  private boolean meetsSloTimeliness(RuleContext ctx, Object message) 
    throws RuleException {
    try {
      String sloStr = ctx.getParameter("slo_timeliness_secs");
      int slo = Integer.parseInt(sloStr);
      long timestamp = (Long) ((GenericData.Record) message).get("timestamp");
      long now = System.currentTimeMillis();
      return now - timestamp <= slo * 1000L;
    } catch (Exception e) {
      throw new RuleException(e);
    }
  }

  private void sendMail(RuleContext ctx, Object message)
      throws RuleException {
    try {
      String from = ctx.getParameter("mail.smtp.from");
      String to = ctx.getParameter("owner_email");
      String sloTimeliness = ctx.getParameter("slo_timeliness_secs");

      Properties props = new Properties();
      props.put("mail.smtp.host", ctx.getParameter("mail.smtp.host"));
      props.put("mail.smtp.port", ctx.getParameter("mail.smtp.port"));
      props.put("mail.smtp.auth", "true");
      props.put("mail.smtp.starttls.enable", "true");
      Session session = Session.getInstance(props, new Authenticator() {
        @Override
        protected PasswordAuthentication getPasswordAuthentication() {
          return new PasswordAuthentication(username, password);
        }
      });

      Message mail = new MimeMessage(session);
      mail.setFrom(new InternetAddress(from));
      mail.setRecipients(Message.RecipientType.TO, InternetAddress.parse(to));
      mail.setSubject("Mail Subject");

      String msg = "Order '" + ((GenericData.Record) message).get("orderId")
          + "' failed to meet the timeliness SLO of "
          + sloTimeliness + " seconds";

      MimeBodyPart mimeBodyPart = new MimeBodyPart();
      mimeBodyPart.setContent(msg, "text/html; charset=utf-8");
      Multipart multipart = new MimeMultipart();
      multipart.addBodyPart(mimeBodyPart);
      mail.setContent(multipart);

      Transport.send(mail);
    } catch (Exception e) {
      throw new RuleException(e);
    }
  }
}

Now that we’ve implemented a rule action to check the timeliness SLO, let’s add it to the rule set. Note that the “mode” of the rule is “READ” since the timeliness check happens on the consumer. We use a CEL expression of “true”, to ensure that the action is always invoked. Finally, the “onSuccess” action is set to “EMAIL”.

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "checkTotalPrice",
        "kind": "CONDITION",
        "type": "CEL",
        "mode": "WRITE",
        "expr": "message.totalPriceCents > 0",
        "params": {
          "dlq.topic": "bad_orders"
        },
        "onFailure": "DLQ"
      },
      {
        "name": "checkSloTimeliness",
        "kind": "CONDITION",
        "type": "CEL",
        "mode": "READ",
        "expr": "true",
        "params": {
          "mail.smtp.host": "smtp.acme.com",
          "mail.smtp.port": "25",
          "mail.smtp.from": "billing.team@acme.com"
        },
        "onSuccess": "EMAIL"
      }
    ]
  }
}

The above code is available in a GitHub repository. After cloning the repository, register the schema with the following command:

mvn schema-registry:register

To run the consumer:

mvn compile exec:java \
  -Dexec.mainClass="io.confluent.data.contracts.ConsumerApp" \
  -Dexec.args="./src/main/resources/data-contracts.properties group-1 client-1 <emailUsername> <emailPassword>"

To generate random Order data, run the producer:

mvn compile exec:java \
  -Dexec.mainClass="io.confluent.data.contracts.ProducerApp" \
  -Dexec.args="./src/main/resources/data-contracts.properties client-1"

You should now see emails being sent for any records that do not meet the timeliness SLO.

Adding Migration Rules to the Data Contract

The consumers may be satisfied with the data contract so far, but perhaps one day the Orders Team decides that they want to change the schema in a backward-incompatible manner. For example, they decide to change the field named “state” to “status”. In Avro, one possibility is to use an alias, but some Avro implementations might not support aliases. So for the sake of this article, we’ll consider changing the name of an Avro field to be a backward-incompatible change (as is the case for Protobuf and JSON Schema).

To support breaking changes within a schema version history, we need a way to partition the history into subsets of versions that are compatible with one another. We can achieve this with the notion of a compatibility group. We choose an arbitrary name for a metadata property, such as “major_version”, and then use that property to specify which data contracts belong to the same compatibility group.

First, we configure Schema Registry to only perform compatibility checks for schemas within a compatibility group.

curl http://localhost:8081/config/orders-value \
  -X PUT --json '{ "compatibilityGroup": "major_version" }'

Next we change the field named “state” to “status”, as specified in a file named “order2.avsc”.

{
  "name": "Order",
  "namespace": "acme.com",
  "type": "record",
  "fields": [
    {
      "name": "orderId",
      "type": "int"
    },
    {
      "name": "customerId",
      "type": "int"
    },
    { 
      "name": "totalPriceCents",
      "type": "int"
    },
    {
      "name": "status",
      "type": {
        "type": "enum",
        "name": "OrderStatus",
        "symbols": [
          "Pending",
          "Processing",
          "Completed",
          "Canceled",
          "Unknown"
        ],
        "default": "Unknown"  
      }
    },
    {
      "name": "timestamp",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    }
  ]
}

Register the schema, and pass a new metadata property with name “major_version” and value “2”, so that the schema need not be backward compatible with previous versions.

jq -n --rawfile schema order2.avsc '{ schema: $schema, metadata: { properties: { owner: "OrdersTeam", email: "orders.team@acme.com", slo_timeliness_secs: 10, major_version: 2 } } }' |
  curl http://localhost:8081/subjects/orders-value/versions --json @-

If desired, we can now pin a client to the latest version of a specific compatibility group using the following client-side properties:

use.latest.with.metadata=major_version=2
latest.cache.ttl.sec=1800

Above we’ve specified that the client should check for a new latest version after every 30 minutes.

Now that our subject supports breaking changes in the version history as partitioned by compatibility groups, we need a way for the consumer to transform messages for the previous compatibility group to the current one. We can achieve this with migration rules using JSONata. Below the UPGRADE rule allows new consumers to read old messages, while the DOWNGRADE rule allows old consumers to read new messages. If you plan to upgrade all consumers, you can omit the DOWNGRADE rule. In each migration rule, we use the JSONata function called “$sift()” to remove a field with one name, and then use a JSON property to add a field with another name.

{
  "ruleSet": {
    "domainRules": [
      ...
    ],
    "migrationRules": [
      {
        "name": "changeStateToStatus",
        "kind": "TRANSFORM",
        "type": "JSONATA",
        "mode": "UPGRADE",
        "expr": "$merge([$sift($, function($v, $k) {$k != 'state'}), {'status': $.'state'}])"
      },
      {
        "name": "changeStatusToState",
        "kind": "TRANSFORM",
        "type": "JSONATA",
        "mode": "DOWNGRADE",
        "expr": "$merge([$sift($, function($v, $k) {$k != 'status'}), {'state': $.'status'}])"
      }
    ]
  }
}

The new rule set, in a file named “order_ruleset2.json”, has both domain rules and migration rules, and can be registered separately from the schema, as before.

curl http://localhost:8081/subjects/orders-value/versions \
  --json @order_ruleset2.json

After updating the metadata and rule set, let’s try out our migration rule. We’ll produce a record using the original schema version with field “state” and see it transformed by the consumer to conform to the latest schema version with field “status”.

First, start the consumer. Note that we specify “use.latest.version=true” to tell the consumer that we want the record to conform to the latest version even if it was produced with an older version. As mentioned, we could alternatively specify “use.latest.with.metadata=major_version=2”. Either setting will cause the migration rule to be invoked.

./bin/kafka-avro-console-consumer \
  --topic orders \
  --bootstrap-server localhost:9092 \
  --property use.latest.version=true

In a separate terminal, start the producer. Note that we should pass the ID of the original schema that has a field named “state” instead of “status”.

./bin/kafka-avro-console-producer \
  --topic orders \
  --broker-list localhost:9092 \
  --property value.schema.id=<id>

{"orderId": 1, "customerId": 2, "totalPriceCents": 12000, "state": "Pending", "timestamp": 1693591356 }

When the above record is sent to the producer, the following record is received by the consumer, where the JSONata transform has modified the record to have a field named “status” instead of “state”.

{"orderId": 1, "customerId": 2, "totalPriceCents": 12000, "status": "Pending", "timestamp": 1693591356 }

The combination of compatibility groups with migration rules for transforming data across those groups is a powerful combination for evolving schemas in arbitrarily complex ways.

To learn more about JSONata, see Understanding JSONata.

Summary

The use of data contracts helps to shift-left the responsibility of ensuring data quality, interoperability, and compliance to the upstream component, which is the source of the data. By doing so, a data contract can provide a formal agreement between an upstream component and a downstream component for the structure and semantics of the data in motion. Today Confluent Schema Registry supports the use of data contracts to make streaming more reliable and powerful than ever before.

To learn more, see the documentation on Data Contracts for Schema Registry.

Using Data Contracts with Confluent Schema Registry

Understanding CEL In Data Contract Rules

The Confluent Schema Registry recently added support for tags, metadata, and rules, which together support the concept of a data contract. Data quality rules in a data contract can be expressed using the Google Common Expression Language (CEL), while migration rules can be expressed using JSONata. In this article, I’ll provide some tips on understanding the capabilities of CEL in the context of data contract rules.

The CEL Type System

One of the most important aspects of CEL is that it is a strongly typed language. In CEL, all expressions have a well-defined type, and all operators and functions check that their arguments have the expected types. Consider the following CEL expression:

'hello' == 3  // raises an error
 

One would expect this expression to return false, but instead it raises an error. That’s because the CEL type checker ensures that both arguments to the equality operator have the same type. The CEL type system includes the usual built-in types (int, uint, double, bool, string, bytes, list, map, null_type), but also has two built-in types that deserve further commentary: type and dyn.

Every value in CEL has a type, which is also considered a value. That means that the type of a value can be used in expressions. Below we use the type function to compare the type of the value “hello” with the type of the value “world”.

type('hello') == type('world')  // both sides evaluate to the 'string' type
 

The other type that deserves discussion is dyn. This type is the union of all other types, similar to Object in Java. A value can have its type converted to the dyn type using the dyn function. The dyn function can often be used to prevent the type checker from raising an error. For example, the following expression may raise an error because both the equality and conditional operators require arguments to have the same type.

value == null ? message.name + ' ' + message.lastName : value
 

However, the following expression using the dyn function will not raise an error.

dyn(value) == null ? message.name + ' ' + message.lastName : dyn(value)
 

Guards for Field-Level Rules

When defining a data contract rule, a CEL expression can be used at the message level or at the field level. Below we express a message-level rule of type CEL to check that the ssn field is not empty.

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "checkSsn",
        "kind": "CONDITION",
        "type": "CEL",
        "mode": "WRITE",
        "expr": "message.ssn != ''"
      }
    ]
  }
}

Message-level rules with type CEL are passed a variable named message, which represents the message being processed.

We could have instead expressed the above condition as a field-level rule using the CEL_FIELD rule type.

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "checkSsn",
        "kind": "CONDITION",
        "type": "CEL_FIELD",
        "mode": "WRITE",
        "expr": "name == 'ssn' ; value != ''"
      }
    ]
  }
}

Rules with type CEL_FIELD are executed for every field in a message. Such rules are passed the following variables:

  • value – the field value
  • fullName – the fully-qualified name of the field
  • name – the field name
  • typeName – the name of the field type, one of STRING, BYTES, INT, LONG, FLOAT, DOUBLE, BOOLEAN
  • tags – tags that apply to the field
  • message – the containing message

Note that the expr for a rule of type CEL_FIELD is of the following form, where the guard is an optional CEL expression preceding the CEL expression for the rule body.

<CEL expr for guard> ; <CEL expr for rule body>
 

Guards are useful for preventing the type checker from raising an error. Without a guard, the following CEL expression will raise an error for any field in the message that is not of type string, because the inequality operator requires that both arguments have the same type.

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "checkSsn",
        "kind": "CONDITION",
        "type": "CEL_FIELD",
        "mode": "WRITE",
        "expr": "name == 'ssn' && value != ''"
      }
    ]
  }
}

One could fix the above expression using the dyn function as shown below, but that is less obvious than using a guard.

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "checkSsn",
        "kind": "CONDITION",
        "type": "CEL_FIELD",
        "mode": "WRITE",
        "expr": "name == 'ssn' && dyn(value) != ''"
      }
    ]
  }
}

If we want to apply the rule body to all fields with the same type, we can use a guard that checks the typeName:

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "checkSsn",
        "kind": "CONDITION",
        "type": "CEL_FIELD",
        "mode": "WRITE",
        "expr": "typeName == 'STRING' ; value != ''"
      }
    ]
  }
}

Checking for Empty or Missing Fields

Confluent Schema Registry supports schemas for Avro, Protobuf, and JSON Schema. When using CEL expressions, checking for an empty or missing field in an object may need to be performed differently for each of the corresponding schema types.

In Protobuf, a missing field is set to the default value for the field type. From the Protobuf documentation:

  • For strings, the default value is the empty string.
  • For bytes, the default value is empty bytes.
  • For bools, the default value is false.
  • For numeric types, the default value is zero.
  • For enums, the default value is the first defined enum value, which must be 0.
  • For message fields, the field is not set. Its exact value is language-dependent.

For Protobuf, a field with type message is the only type of field that might be null, depending on the language.

In Avro, any field can be null as long as it has been declared as optional, which is a union of null and one or more other types.

Similarly, in JSON Schema, any field can be null if it has been declared as optional, which is a oneOf of null and one or more other types. Furthermore, any field can be missing unless it has been declared as required.

As an example, for Protobuf, to check that a string is empty or missing, we use the following expression at the message level.

message.ssn == ''
 

Alternatively for Protobuf, one can use the has macro to determine whether the field is set to its default value.

!has(message.ssn)
 

For Avro, we would use a rule like the following. Note that we need to use the dyn function since in the CEL type system, the null type is distinct from the other types.

message.ssn == '' || dyn(message.ssn) == null
 

For JSON, we would use a rule with the has macro, since fields can be missing in JSON.

!has(message.ssn) || message.ssn == '' || dyn(message.ssn) == null
 

CEL String Literals

CEL supports several kinds of string literals. Quoted string literals can use either single-quotes or double-quotes.

message.name == 'hello' // equivalent to: message.name == "hello"
 

Since CEL expressions are used as JSON values in data contract rules, single quotes are to be preferred.

A triple-quoted string is delimited by either three single-quotes or three double-quotes, and may contain newlines.

message.name == '''I'm happy for y'all'''
 

Finally, a string preceded by the r or R character is a raw string, which does not interpret escape sequences. A raw string is useful for representing regular expressions.

message.ssn.matches(r'\\d{3}-\\d{2}-\\d{4}')
 

Transformations using CEL Maps and Messages

So far, all examples using the CEL and CEL_FIELD rule types have used CEL expressions to represent conditions. Both rule types can also use CEL expressions to represent transformations. Below we use a rule of type CEL_FIELD to set an empty status value to the string ‘unknown’.

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "transformStatus",
        "kind": "TRANFORM",
        "type": "CEL_FIELD",
        "mode": "WRITE",
        "expr": "name == 'status' ; value == '' ? 'unknown' : value"
      }
    ]
  }
}

To use the CEL rule type as a message-level transformation, return a CEL map. For example, assume that the Order message has two fields and we want to transform the status field as before. The following message-level rule of type CEL will return a CEL map with the desired result.

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "transformStatus",
        "kind": "TRANFORM",
        "type": "CEL",
        "mode": "WRITE",
        "expr": "{ 'orderId': message.orderId, 'status': message.status == '' ? 'unknown' : message.status }"
      }
    ]
  }
}

If possible, the resulting CEL map will be automatically converted to an object of the appropriate schema type, either Avro, Protobuf, or JSON.

CEL also has first-class support for Protobuf messages. To return a Protobuf message, the expression M{f1: e1, f2: e2, ..., fN: eN} can be used, where M is the simple or qualified name of the message type. For example, if com.acme.Order is a Protobuf message type, the following rule can be used to return a Protobuf object.

{
  "ruleSet": {
    "domainRules": [
      {
        "name": "transformStatus",
        "kind": "TRANFORM",
        "type": "CEL",
        "mode": "WRITE",
        "expr": "com.acme.Order{ orderId: message.orderId, status: message.status == '' ? 'unknown' : message.status }"
      }
    ]
  }
}

Of course, returning a Protobuf object should only be done if the data contract has a Protobuf schema.

Understanding CEL In Data Contract Rules

Understanding JSONata

JSONata is a declarative functional language for querying and transforming JSON data. It was inspired by the path semantics of XPath. Whereas other languages that are inspired by XPath also incorporate SQL-like elements1, JSONata stays true to its XPath roots and augments the path semantics of XPath with additional features to increase its power.

JSONata is powerful but it can take a while to get accustomed to its terse syntax. In this article I’ll discuss three aspects of JSONata that will help you understand some of the subtleties of the language.

  1. Sequences and Path Operators
  2. Object Constructors
  3. Contexts and Context Variable Binding

Sequences and Path Operators

Since XPath 2.0, every value in XPath is a sequence of zero or more items. An atomic value, such as a Boolean, number, or string, is just a special case of a sequence of length one. JSONata uses the same data model as XPath.

Since JSONata is a functional language, and its main data construct is a sequence, one would expect the well-known map, filter, and reduce functional operators to be available for sequences. This is indeed the case, but these operators often hide in plain sight.

For example, with the JSON below, assume we want to find the number of products in each category that are priced greater than 5.

{
  "products": [
    { "name": "broiler", "category": "kitchen", "price": 100, "cost": 70 },
    { "name": "toaster", "category": "kitchen", "price": 30, "cost": 10 },
    { "name": "blender", "category": "kitchen", "price": 50, "cost": 25 },
    { "name": "socks", "category": "clothes", "price": 5, "cost": 2 },
    { "name": "shirt", "category": "clothes", "price": 10, "cost": 3 }
  ]
}

Here is a JSONata query that will return the desired results.

$.products[price > 5]{category: $count(name)}
{
  "kitchen": 3,
  "clothes": 1
}

You can see this JSONata query in action here.

Let’s break down the above query into four elements:

  1. $
  2. .products
  3. [price > 5]
  4. {category: $count(name)}

The first element, $, is a built-in variable. When at the beginning of a JSONata expression, it represents the entire input document.

The second element, .products, is composed of the map operator (.) along with a function to be applied to each element of the input sequence. In this case the function is the products path expression, and the input sequence is the entire document.

The third element, [price > 5], is the filter operator ([...]) along with a predicate expression to determine which elements of the input sequence should be returned.

The fourth element, {category: $count(name)}, is the reduce operator ({...}) along with an object to hold the groupings, determined by the key expressions, and the aggregated values.

For those familiar with functional programming, being able to parse JSONata path expressions as compositions of the usual functional operators will help you understand the results that are returned.

Object Constructors

One of the features of JSONata that might initially be confusing to those first using it are the object constructors. There are two types of object constructors: one produces a single object and the other produces an array of objects. We’ve seen the first one in the previous section:

$.products[price > 5]{category: $count(name)}
 
{
  "kitchen": 3,
  "clothes": 1
}

The second one looks similar:

$.products[price > 5].{category: $count(name)}
 
[
  {
    "kitchen": 1
  },
  {
    "kitchen": 1
  },
  {
    "kitchen": 1
  },
  {
    "clothes": 1
  }
]

As can be seen, one object constructor is of the form pathExpr{...} while the other is of the form pathExpr.{...}. Note the dot (.) between the path expression and the object expression.

Now that we’ve learned to parse these expressions as compositions of functional operators, we can make sense of the resulting output. The first object constructor is simply the reduce operator ({...}), as we’ve discussed previously, while the second object constructor is the map operator (.) followed by a function that returns an object for each element of the input sequence. This explains why the output of the second object constructor is an array.

Contexts and Context Variable Binding

Previously we explained that $ is a built-in variable that represents the input document when at the beginning of a JSONata expression. In general, the variable $ refers to the context item, which is the sequence currently being evaluated.

The context item is implied at the beginning of a path expression, but can also be explicitly referenced. For example, the following two queries are equivalent:

products[price > 5]{category: $count(name)}
$.products[$.price > 5]{$.category: $count($.name)}

After applying the map (.) operator, the context item changes. However, there is one exception, and that is when using a context variable binding. The context variable binding feature is unique to JSONata, and was introduced to support join semantics in a path expression. We show an example below.

Assume our data looks as follows:

{
  "sales": [
    { "product": "broiler", "store number": 1, "quantity": 20  },
    { "product": "toaster", "store number": 2, "quantity": 100 },
    { "product": "toaster", "store number": 2, "quantity": 50 },
    { "product": "toaster", "store number": 3, "quantity": 50 },
    { "product": "blender", "store number": 3, "quantity": 100 },
    { "product": "blender", "store number": 3, "quantity": 150 },
    { "product": "socks", "store number": 1, "quantity": 500 },
    { "product": "socks", "store number": 2, "quantity": 10 },
    { "product": "shirt", "store number": 3, "quantity": 10 }
  ],
  "stores": [
    { "store number": 1, "state": "CA" },
    { "store number": 2, "state": "CA" },
    { "store number": 3, "state": "MA" },
    { "store number": 4, "state": "MA" }
  ]
}

We want to join the sales data with the store data to find out how many products were sold in each state. Here’s the JSONata query to achieve this, which you can also see here.

$.stores@$st.sales@$sl[$sl.`store number` = $st.`store number`]{
    $st.state: $sum($sl.quantity)
}
{
  "CA": 680,
  "MA": 310
}

At the beginning of our JSONata expression, the context item is the entire input document. We use the map operator followed by a path expression with a context variable binding (.stores@$st) to bind the variable $st to the result of mapping the stores path expression to the input document. Since we use a context variable binding, the context item does not change to the result of the stores path expression, but instead remains as the input document. The next map operator followed by a path expression (.sales$sl) is again applied to the input document as the context item.

After the $st and $sl variables are bound to the result of the stores and sales path expressions, respectively, the filter expression [$sl.`store number` = $st.`store number`] performs the join, and the reduce expression { $st.state: $sum($sl.quantity) } aggregates the results.

As demonstrated, the reason that the context item does not change when using a context variable binding is to more easily support joins between sibling nodes in the JSON document.

Sample Queries

The rest of this article shows various sample queries using JSONata. These examples are adapted from an enumeration of various use cases for a different JSON transformation language that was also inspired by XPath. You can see these examples implemented in the alternative language here. The JSONata examples tend to be more concise.

Join

The JSON below is based on a social media site that allows users to interact with their friends.

[
  {
    "name": "Sarah",
    "age": 13,
    "gender": "female",
    "friends": [
      "Jim",
      "Mary",
      "Jennifer"
    ]
  },
  {
    "name": "Jim",
    "age": 13,
    "gender": "male",
    "friends": [
      "Sarah"
    ]
  }
]

The following query performs a join on Sarah’s friend list to return the object representing each of her friends:

$[name in $$[name = "Sarah"].friends]
{
  "name": "Jim",
  "age": 13,
  "gender": "male",
  "friends": [
    "Sarah"
  ]
}

See https://try.jsonata.org/1A5n67svD.

Grouping Queries for JSON

The JSON below contains data about sales, products, and stores.

{
  "sales": [
    { "product": "broiler", "store number": 1, "quantity": 20  },
    { "product": "toaster", "store number": 2, "quantity": 100 },
    { "product": "toaster", "store number": 2, "quantity": 50 },
    { "product": "toaster", "store number": 3, "quantity": 50 },
    { "product": "blender", "store number": 3, "quantity": 100 },
    { "product": "blender", "store number": 3, "quantity": 150 },
    { "product": "socks", "store number": 1, "quantity": 500 },
    { "product": "socks", "store number": 2, "quantity": 10 },
    { "product": "shirt", "store number": 3, "quantity": 10 }
  ],
  "products": [
    { "name": "broiler", "category": "kitchen", "price": 100, "cost": 70 },
    { "name": "toaster", "category": "kitchen", "price": 30, "cost": 10 },
    { "name": "blender", "category": "kitchen", "price": 50, "cost": 25 },
    { "name": "socks", "category": "clothes", "price": 5, "cost": 2 },
    { "name": "shirt", "category": "clothes", "price": 10, "cost": 3 }
  ],
  "stores": [
    { "store number": 1, "state": "CA" },
    { "store number": 2, "state": "CA" },
    { "store number": 3, "state": "MA" },
    { "store number": 4, "state": "MA" }
  ]
}

We want to group sales by product, across stores.

sales{ product: [$sum(quantity)]}
{ "broiler": 20, "toaster": 200, "blender":250, "socks": 510, "shirt": 10 }

See https://try.jsonata.org/cGRl7Xi1T.

Now let’s do a more complex grouping query, showing sales by category within each state. The following query groups by state, then by category, then lists individual products and the sales associated with each.

(
    /* First join all the rows */
    $join := stores@$st
        .sales@$sl[$sl.`store number` = $st.`store number`]
        .products@$p[$sl.product = $p.name].{
            "state": $st.state,
            "category": $p.category,
            "product": $sl.product,
            "quantity": $sl.quantity
        };

    /* Next perform the grouping */
    $join{state: {category: {product: $sum(quantity)}}}
)
{
  "CA": {
    "kitchen": {
      "broiler": 20,
      "toaster": 150
    },
    "clothes": {
      "socks": 510
    }
  },
  "MA": {
    "kitchen": {
      "toaster": 50,
      "blender": 250
    },
    "clothes": {
      "shirt": 10
    }
  }
}

See https://try.jsonata.org/-0LDeg4hI.

JSON to JSON Transformations

The following query takes satellite data, and summarizes which satellites are visible. The data for the query is a simplified version of a Stellarium file that contains this information.

{
  "creator": "Satellites plugin version 0.6.4",
  "satellites": {
    "AAU CUBESAT": {
      "tle1": "1 27846U 03031G 10322.04074654  .00000056  00000-0  45693-4 0  8768",
      "visible": false
    },
    "AJISAI (EGS)": {
      "tle1": "1 16908U 86061A 10321.84797408 -.00000083  00000-0  10000-3 0  3696",
      "visible": true
    },
    "AKARI (ASTRO-F)": {
      "tle1": "1 28939U 06005A 10321.96319841  .00000176  00000-0  48808-4 0  4294",
      "visible": true
    }
  }
}

We want to query this data to return a summary. The following is a JSONata query that returns the desired result.

(
    $sats := $each(satellites, function($v, $k) {$merge([{"name": $k}, $v])});
    {
        "visible": [$sats[visible].name],
        "invisible": [$sats[$not(visible)].name]
    }
)
{
  "visible": [
    "AJISAI (EGS)",
    "AKARI (ASTRO-F)"
  ],
  "invisible": [
    "AAU CUBESAT"
  ]
}

See https://try.jsonata.org/45R_q6dVJ.

JSON Updates

Suppose an application receives an order that contains a credit card number, and needs to put the user on probation.

Data for the users:

[
  {
    "name": "Deadbeat Jim",
    "address": "1 E 161st St, Bronx, NY 10451",
    "risk tolerance": "high"
  },
  {
    "name": "Rich Jim",
    "address": "123 Main St, Azusa, CA 91010",
    "risk tolerance": "low"
  }
]

The following query adds "status": "credit card declined" to the user’s record.

[$.$merge([$, name = "Deadbeat Jim" ? {"status": "credit card declined"} : {}])]
 

After the update is finished, the user’s record looks like this:

[
  {
    "name": "Deadbeat Jim",
    "address": "1 E 161st St, Bronx, NY 10451",
    "risk tolerance": "high",
    "status": "credit card declined"
  },
  {
    "name": "Rich Jim",
    "address": "123 Main St, Azusa, CA 91010",
    "risk tolerance": "low"
  }
]

See https://try.jsonata.org/btnJJFsBV.

Data Transformations

Many applications need to modify data before forwarding it to another source. Suppose an application make videos available using feeds from Youtube. The following data comes from one such feed:

{
  "encoding": "UTF-8",
  "feed": {
    "author": [
      {
        "name": {
          "$t": "YouTube"
        },
        "uri": {
          "$t": "http://www.youtube.com/"
        }
      }
    ],
    "category": [
      {
        "scheme": "http://schemas.google.com/g/2005#kind",
        "term": "http://gdata.youtube.com/schemas/2007#video"
      }
    ],
    "entry": [
      {
        "app$control": {
          "yt$state": {
            "$t": "Syndication of this video was restricted by its owner.",
            "name": "restricted",
            "reasonCode": "limitedSyndication"
          }
        },
        "author": [
          {
            "name": {
              "$t": "beyonceVEVO"
            },
            "uri": {
              "$t": "http://gdata.youtube.com/feeds/api/users/beyoncevevo"
            }
          }
        ]
      }
    ]
  }
}

The following query creates a modified copy of the feed by removing all entries that restrict syndication.

{
    "encoding": encoding,
    "feed": {
        "author": feed.author,
        "category": feed.category,
        "entry": [feed.entry[app$control.yt$state.name != "restricted"]]
    }
}
{
  "encoding": "UTF-8",
  "feed": {
    "author": [
      {
        "name": {
          "$t": "YouTube"
        },
        "uri": {
          "$t": "http://www.youtube.com/"
        }
      }
    ],
    "category": [
      {
        "scheme": "http://schemas.google.com/g/2005#kind",
        "term": "http://gdata.youtube.com/schemas/2007#video"
      }
    ],
    "entry": []
  }
}

See https://try.jsonata.org/iUwXbcPT2.

Understanding JSONata

kgiraffe: GraphQL for Kafka and Schema Registry

While working with Apache Kafka and Confluent Schema Registry, I often spend time switching between tools like

  • kcat (formerly kafkacat)
  • the Kafka Avro console consumer/producer
  • the Kafka JSON Schema console consumer/producer
  • the Kafka Protobuf console consumer/producer
  • the Schema Registry Maven plugin

Recently I decided to combine much of the functionality of the above tools into a single tool called kgiraffe, which is sort of a companion to kcat. While kcat excels at the command line, kgiraffe exposes its functionality with a GraphQL interface. In addition, kgiraffe fully supports topics using Avro, JSON Schema, and Protobuf schemas. kgiraffe provides the following features for managing both topics and schemas out of the box:

  • Topic Management
    • Query topics
    • Publish to topics
    • Subscribe to topics
    • Full support for Avro, JSON Schema, and Protobuf records
  • Schema Management
    • Validate and stage schemas
    • Test schema compatibility
    • Query schemas and subjects
    • Register schemas

Before kgiraffe starts, you indicate which schemas are associated with which topics. Based on the specified schemas, kgiraffe will automatically generate a GraphQL schema that can be used to read, write, and subscribe to your topics. kgiraffe also allows you to validate and stage schemas, as well as test them for compatibility, before you register them to Schema Registry.

After downloading a release, starting kgiraffe is as easy as

$ bin/kgiraffe -b mybroker -t mytopic -r http://schema-registry-url:8081
 

Once kgiraffe is running, browse to http://localhost:8765/kgiraffe to launch the GraphQL Playground.

Example GraphQL queries can be found at the README.

If you find yourself bouncing between tools while working with Kafka and Schema Registry, please give kgiraffe a try and let me know what you think.

kgiraffe: GraphQL for Kafka and Schema Registry

Understanding Protobuf Compatibility

In a recent article, I presented rules for evolving JSON Schemas in a backward, forward, and fully compatible manner. The backward compatibility rules for Protocol Buffers (Protobuf) are much simpler, and most of them are outlined in the section “Updating A Message Type” in the Protobuf Language Guide.

The Confluent Schema Registry uses these rules to check for compatibility when evolving a Protobuf schema. However, unlike some other tools, the Confluent Schema Registry does not require that a removed field be marked as reserved when evolving a schema in a backward compatible manner. The reserved keyword can be used to prevent incompatible changes in future versions of a schema. Instead of requiring the reserved keyword, the Confluent Schema Registry achieves compatibility across multiple versions by allowing compatibility checks to be performed transitively. The reserved keyword is most useful for those tools that do not perform transitive compatibility checks. Using it when removing fields is still considered a best practice, however.

For the Protobuf oneof construct, there are four additional compatibility issues that are mentioned in the Protobuf Language Guide.  Since the text of the language guide is a bit terse, for the rest of this article I’ll discuss each of these four issues further. First, let me reproduce the text below.

  1. Be careful when adding or removing oneof fields. If checking the value of a oneof returns None/NOT_SET, it could mean that the oneof has not been set or it has been set to a field in a different version of the oneof. There is no way to tell the difference, since there’s no way to know if an unknown field on the wire is a member of the oneof.
  2. Move fields into or out of a oneof: You may lose some of your information (some fields will be cleared) after the message is serialized and parsed. However, you can safely move a single field into a new oneof and may be able to move multiple fields if it is known that only one is ever set.
  3. Delete a oneof field and add it back: This may clear your currently set oneof field after the message is serialized and parsed.
  4. Split or merge oneof: This has similar issues to moving regular fields.

Each of the above issues can cause information to be lost. For that reason, these are considered backward compatibility issues.

Adding or removing oneof fields

Let’s look at an example of when a oneof field is removed.  Let’s say we have the following Protobuf message with fields f1 and f2 in a oneof.

message SampleMessage {
  oneof test_oneof {
    string f1 = 1;
    string f2 = 2;
  }
}

Later we decide to remove the field f2.

message SampleMessage {
  oneof test_oneof {
    string f1 = 1;
  }
}

If an old binary with the first version of SampleMessage creates a message and sets f2 in test_oneof, then a new binary with the second version of SampleMessage will see test_oneof as unset. The value of f2 will be contained in the unknown fields of the message, but there’s no way to tell which, if any, of the unknown fields were previously in test_oneof. The information that is lost is whether the oneof was really set or not.

This behavior may be unexpected when comparing it to how Protobuf handles removing a value from an enum. Any unrecognized enum values encountered on the wire are retained by the binary, and a field using the enum will still appear as set (to an unrecognized value) if the binary encounters a value that is no longer enumerated in the enum.

The information loss from removing a oneof field can cascade and cause other information to be lost, depending on the scenario. For example, if the new binary reads the message with f2 set by the old binary and modifies the same message by setting f1, then when the old binary reads the modified message, the old binary may see f2 as still set! In this scenario, f2 can still appear as set because setting f1 in the new binary does not clear the unknown field for f2, since for the new binary, f2 is not associated with test_oneof. When reading from the wire, the old binary may read f2 after f1, and then clear f1. This assumes that f2 is deserialized after f1. The value of f1 will also be lost for the old binary.

Furthermore, when reading the modified message, a binary in a different programming language may read f1 after f2, thus clearing the value of f2. In this case, the value for f2 will be lost. The actual serialization order is implementation-specific and subject to change. Most implementations serialize fields in ascending order of their field numbers, but this is not guaranteed, especially in the presence of unknown fields. Also, inconsistencies in how various programming languages handle unknown fields prevents a canonical serialization order from being defined. The Protobuf Encoding documentation states the following:

  • By default, repeated invocations of serialization methods on the same protocol buffer message instance may not return the same byte output; i.e. the default serialization is not deterministic.
  • Deterministic serialization only guarantees the same byte output for a particular binary. The byte output may change across different versions of the binary.

So even a later version of the binary in the same programming language may see different results.

For these reasons, removing a field from a oneof is considered a backward incompatible change. Likewise, adding a field to a oneof is considered a forward incompatible change (since a forward compatibility check is just a backward compatibility check with the schemas switched).

Moving fields into or out of a oneof

Consider the following Protobuf message with f1 in a oneof and f2 and f3 outside of the oneof.

message SampleMessage {
  oneof test_oneof {
    string f1 = 1;
  }
  string f2 = 2;
  string f3 = 3;
}

Later we decide to move both f2 and f3 into test_oneof.

message SampleMessage {
  oneof test_oneof {
    string f1 = 1;
    string f2 = 2;
    string f3 = 3;
  }
}

If an old binary with the first version of SampleMessage creates a message and sets f1, f2, and f3, then when a new binary with the second version of SampleMessage reads the message from the wire, it will clear two of the fields (depending on serialization order) and leave only one field, say f3, with its value. Thus the values of the other two fields will be lost.

As mentioned, the order in which fields are serialized is implementation-specific. For the fields in the oneof, only the value of the last field read from the wire is retained. Therefore, an additional problem is that a different field in the oneof may appear as set when using a different implementation.

For these reasons, moving fields into a oneof is considered a backward incompatible change, unless the oneof is new and only a single field is moved into it. Moving fields out of a oneof is also a backward incompatible change, since this has the effect of removing the fields from the oneof.

Deleting a oneof and adding it back

In this scenario there are three versions involved. The first version has f1 and f2 in a oneof.

message SampleMessage {
  oneof test_oneof {
    string f1 = 1;
    string f2 = 2;
  }
}

In the second version, we remove the field f2 from the oneof.

message SampleMessage {
  oneof test_oneof {
    string f1 = 1;
  }
}

Later we add the field f2 back to the oneof.

message SampleMessage {
  oneof test_oneof {
    string f1 = 1;
    string f2 = 2;
  }
}

If an old binary with the first version of SampleMessage creates a message and sets f2 in test_oneof, then next a binary with the second version reads the message and sets f1 in test_oneof, and finally a later binary with the third version reads the modified message, it may see f2 as set, and the value of f1 as lost. This is a similar scenario to that described above involving only two binaries when removing a field from the oneof. Here the modified message is being read by a third later binary, rather than by the first original binary in the scenario involving only two binaries.

Splitting or merging a oneof

Consider a Protobuf message with two oneof constructs.

message SampleMessage {
  oneof test_oneof {
    string f1 = 1;
  }
  oneof test_oneof {
    string f2 = 2;
  }
}

Later we decide to merge the oneof constructs.

message SampleMessage {
  oneof test_oneof {
    string f1 = 1;
    string f2 = 2;
  }
}

If an old binary with the first version of SampleMessage creates a message and sets both f1 and f2, then when a new binary with the second version of SampleMessage reads the message, it will see only one of the fields as set, with the value of the other field being lost. Again, which field is set and which is lost depends on the implementation. This issue is similar to that described above involving moving existing fields into a oneof.

Summary

In Protobuf, evolving schemas with oneof constructs can be problematic for a couple of reasons:

  1. Information may be lost, such as the values of fields that were moved into a oneof, and whether a oneof was set or not. The information loss can cascade and result in further loss, such as when an unknown field reappears in a oneof, causing the previous value of the oneof to be cleared.
  2. A schema change can unfortunately cause multiple fields which have been set to be pulled into the same oneof. If this happens, all of the fields except one will be cleared. Since the order in which fields are serialized to the wire is implementation-specific and subject to change, especially in the presence of unknown fields, the field that is retained may be different between implementations.

For these reasons, Confluent Schema Registry implements two backward compatibility checks for the oneof construct:

  1. Removing a field from a oneof is a backward incompatible change.
  2. Moving existing fields into a oneof is a backward incompatible change, unless it is a single field being moved into a new oneof.
Understanding Protobuf Compatibility

The Enterprise is Made of Events, Not Things

“The world is made of events, not things.” — Carlo Rovelli

“Every company is becoming software.” — Jay Kreps

In The Order of Time, the physicist Carlo Rovelli argues that the theory of relativity compels us to view the world not as made of things or entities, but as events or processes. In the world of technology, Jay Kreps argues that the core processes that a company executes are increasingly being captured in software, and these processes consume and produce the business events that drive the company. From the viewpoint of both Rovelli and Kreps, one can view a company as made of events or processes.

The dichotomy between events and things (with state) has been noted by many. Martin Kleppmann captured it elegantly in his book Designing Data-Intensive Applications as follows:

One technique often used in Domain-Driven Development (DDD) is event sourcing, which derives application state from immutable business events. Event sourcing often involves arbitrary logic to derive the state. A more formal model would use a finite-state machine (FSM) to derive the state.

In a software architecture involving a network of FSMs, each FSM can perform state transitions when receiving events, produce events for other FSMs to consume, and persist some internal data. With FSMs, one can implement several other models, such as the following:

  • Simple CRUD entities.  This is the most common use-case for event sourcing.
  • Function as a Service (FaaS).  In this case, each FSM only has one state, and a single transition to and from that state, during which it performs the function.
  • Actor model.  In the actor model, actors receive and send events, but are otherwise passive when no events occur.
  • Intelligent agents (IA).  Intelligent agents are similar to actors in that they receive and send events, but are generally viewed as continuously active in order to achieve some goal.

In the rest of this article I’ll show how to implement a network of intelligent agents using Kafka Streams and finite-state machines.

The implementation is comprised of two parts:

  1. An FSM implementation that sits atop Kafka Streams, called a KMachine.  A KMachine definition is comprised of a set of states, a set of state transitions, some internal data, and a set of functions that can be attached to state transitions.  The entire KMachine definition can be expressed in YAML, in which the functions are written as JavaScript.
  2. A REST-based web application that can be used to create and manage both KMachine definitions and instances. A KMachine instance is created for each unique key in the input stream.

To demonstrate how KMachine can be used to implement a network of intelligent agents, I’ve borrowed an example from “Programming Game AI By Example,” by Mat Buckland.  In this example, two intelligent agents inhabit a gaming environment that represents a miners’ town in the Wild West.  One agent is a gold miner, and the other agent is the miner’s wife.

As a preview, here is sample output of the interaction between the miner and his wife:

Miner Bob: Walkin' to the goldmine
Miner Bob: Pickin' up a nugget
Elsa: Makin' the bed
Miner Bob: Pickin' up a nugget
Elsa: Makin' the bed
Miner Bob: Pickin' up a nugget
Miner Bob: Ah'm leavin' the goldmine with mah pockets full o' sweet gold
Miner Bob: Goin' to the bank. Yes siree
Miner Bob: Depositing gold. Total savings now: 3
Miner Bob: Leavin' the bank
Miner Bob: Walkin' to the goldmine
Miner Bob: Pickin' up a nugget
Elsa: Washin' the dishes
Miner Bob: Pickin' up a nugget
Elsa: Moppin' the floor
Miner Bob: Pickin' up a nugget
Miner Bob: Ah'm leavin' the goldmine with mah pockets full o' sweet gold
Miner Bob: Goin' to the bank. Yes siree
Miner Bob: Depositing gold. Total savings now: 6
Miner Bob: WooHoo! Rich enough for now. Back home to mah li'lle lady
Miner Bob: Leavin' the bank
Miner Bob: Walkin' home
Elsa: Hi honey. Let me make you some of mah fine country stew
Miner Bob: ZZZZ... 
Elsa: Putting the stew in the oven
Elsa: Fussin' over food
Miner Bob: ZZZZ... 
Elsa: Fussin' over food
Elsa: Puttin' the stew on the table
Elsa: StewReady! Lets eat
Miner Bob: All mah fatigue has drained away. Time to find more gold!
Elsa: Time to do some more housework!
Miner Bob: Walkin' to the goldmine

Both the miner and his wife are implemented as separate KMachine definitions. Here is the KMachine definition that represents the miner:

name: miner
input: miner
init: goHomeAndSleepTilRested
states:
  - name: enterMineAndDigForNugget
    onEntry: enterMineAction
    onExit: exitMineAction
  - name: visitBankAndDepositGold
    onEntry: enterBankAction
    onExit: exitBankAction
  - name: goHomeAndSleepTilRested
    onEntry: enterHomeAction
    onExit: exitHomeAction
  - name: quenchThirst
    onEntry: enterSaloonAction
    onExit: exitSaloonAction
  - name: eatStew
    onEntry: startEatingAction
    onExit: finishEatingAction
transitions:
  - type: stayInMine
    from: enterMineAndDigForNugget
    to:
    guard:
    onTransition: stayInMineAction
  - type: visitBank
    from: enterMineAndDigForNugget
    to: visitBankAndDepositGold
    guard:
    onTransition:
  - type: quenchThirst
    from: enterMineAndDigForNugget
    to: quenchThirst
    guard:
    onTransition:
  - type: goHome
    from: visitBankAndDepositGold
    to: goHomeAndSleepTilRested
    guard:
    onTransition:
  - type: enterMine
    from: visitBankAndDepositGold
    to: enterMineAndDigForNugget
    guard:
    onTransition:
  - type: enterMine
    from: goHomeAndSleepTilRested
    to: enterMineAndDigForNugget
    guard:
    onTransition:
  - type: enterMine
    from: quenchThirst
    to: enterMineAndDigForNugget
    guard:
    onTransition:
  - type: stayHome
    from: goHomeAndSleepTilRested
    to:
    guard:
    onTransition: stayHomeAction
  - type: stewReady
    from: goHomeAndSleepTilRested
    to: eatStew
    guard:
    onTransition: imComingAction
  - type: finishEating
    from: eatStew
    to: goHomeAndSleepTilRested
    guard:
    onTransition:
data:
  location: shack
  goldCarried: 0
  moneyInBank: 0
  thirst: 0
  fatigue: 0
functions:
  enterMineAction: >-
    (ctx, key, value, data) => {
      if (data.location != 'goldMine') {
        console.log("Miner " + key + ": Walkin' to the goldmine");
        data.location = 'goldMine';
      }
      ctx.sendMessage(ctx.topic(), key, { type: 'stayInMine' }, 0);
    }
  stayInMineAction: >-
    (ctx, key, value, data) => {
      data.goldCarried++;
      data.fatigue++;
      console.log("Miner " + key + ": Pickin' up a nugget");
      if (data.goldCarried >= 3) {
        ctx.sendMessage(ctx.topic(), key, { type: 'visitBank' }, 0);
      } else if (data.thirst >= 5) {
        ctx.sendMessage(ctx.topic(), key, { type: 'quenchThirst' }, 0);
      } else {
        ctx.sendMessage(ctx.topic(), key, { type: 'stayInMine' }, 1000);
      }
    }
  exitMineAction: >-
    (ctx, key, value, data) => {
      console.log("Miner " + key + ": Ah'm leavin' the goldmine with mah pockets full o' sweet gold");
    }
  enterBankAction: >-
    (ctx, key, value, data) => {
      console.log("Miner " + key + ": Goin' to the bank. Yes siree");
      data.location = 'bank';
      data.moneyInBank += data.goldCarried;
      data.goldCarried = 0;
      console.log("Miner " + key + ": Depositing gold. Total savings now: " + data.moneyInBank);
      if (data.moneyInBank >= 5) {
        console.log("Miner " + key + ": WooHoo! Rich enough for now. Back home to mah li'lle lady");
        ctx.sendMessage(ctx.topic(), key, { type: 'goHome' }, 0);
      } else {
        ctx.sendMessage(ctx.topic(), key, { type: 'enterMine' }, 0);
      }
    }
  exitBankAction: >-
    (ctx, key, value, data) => {
      console.log("Miner " + key + ": Leavin' the bank");
    }
  enterHomeAction: >-
    (ctx, key, value, data) => {
      if (data.location != 'shack') {
        console.log("Miner " + key + ": Walkin' home");
        data.location = 'shack';
        if (data.wife) {
          ctx.sendMessage('miners_wife', data.wife, { type: 'hiHoneyImHome' }, 0);
        }
      }
      ctx.sendMessage(ctx.topic(), key, { type: 'stayHome' }, 0);
    }
  stayHomeAction: >-
    (ctx, key, value, data) => {
      if (value.wife) {
        data.wife = value.wife;
      }
      if (data.fatigue < 5) {
        console.log("Miner " + key + ": All mah fatigue has drained away. Time to find more gold!");
        data.location = 'shack';
        ctx.sendMessage(ctx.topic(), key, { type: 'enterMine' }, 0);
      } else {
        data.fatigue--;
        console.log("Miner " + key + ": ZZZZ... ");
        ctx.sendMessage(ctx.topic(), key, { type: 'stayHome' }, 1000);
      }
    }
  exitHomeAction: >-
    (ctx, key, value, data) => {
    }
  enterSaloonAction: >-
    (ctx, key, value, data) => {
      if (data.moneyInBank >= 2) {
        data.thirst = 0;
        data.moneyInBank -= 2;
        console.log("Miner " + key + ": That's mighty fine sippin liquer");
      }
      ctx.sendMessage(ctx.topic(), key, { type: 'enterMine' }, 0);
    }
  exitSaloonAction: >-
    (ctx, key, value, data) => {
      console.log("Miner " + key + ": Leavin' the saloon, feelin' good");
    }
  imComingAction: >-
    (ctx, key, value, data) => {
      console.log("Miner " + key + ": Okay Hun, ahm a comin'!");
    }
  startEatingAction: >-
    (ctx, key, value, data) => {
      console.log("Miner " + key + ": Smells Reaaal goood Elsa!");
      console.log("Miner " + key + ": Tastes real good too!");
      ctx.sendMessage(ctx.topic(), key, { type: 'finishEating' }, 0);
    }
  finishEatingAction: >-
    (ctx, key, value, data) => {
      console.log("Miner " + key + ": Thankya li'lle lady. Ah better get back to whatever ah wuz doin'");
    }
 

The state transition diagram for the miner, generated using the DOT graph language, is shown below.

Next is the KMachine definition that represents the miner’s wife:

name: minersWife
input: miners_wife
init: doHouseWork
states:
  - name: doHouseWork
    onEntry: startHouseWorkAction
    onExit:
  - name: visitBathroom
    onEntry: enterBathroomAction
    onExit: exitBathroomAction
  - name: cookStew
    onEntry: startCookingAction
    onExit: finishCookingAction
transitions:
  - type: continueHouseWork
    from: doHouseWork
    to:
    guard:
    onTransition: continueHouseWorkAction
  - type: natureCalls
    from: doHouseWork
    to: visitBathroom
    guard:
    onTransition:
  - type: natureCalls
    from: cookStew
    to: visitBathroom
    guard:
    onTransition:
  - type: continuePrevious
    from: visitBathroom
    to: revertToPreviousState
    toType: Function
    guard:
    onTransition:
  - type: hiHoneyImHome
    from: doHouseWork
    to: cookStew
    guard:
    onTransition: hiHoneyAction
  - type: hiHoneyImHome
    from: visitBathroom
    to: cookStew
    guard:
    onTransition: hiHoneyAction
  - type: continueCooking
    from: cookStew
    to:
    guard:
    onTransition: continueCookingAction
  - type: stewReady
    from: cookStew
    to: doHouseWork
    guard:
    onTransition: letsEatAction
data:
  location: shack
  cooking: false
functions:
  startHouseWorkAction: >-
    (ctx, key, value, data) => {
      console.log(key + ": Time to do some more housework!");
      ctx.sendMessage(ctx.topic(), key, { type: 'continueHouseWork' }, 0);
    }
  continueHouseWorkAction: >-
    (ctx, key, value, data) => {
      if (value.husband) {
        data.husband = value.husband;
      }
      switch (Math.floor(Math.random() * 3)) {
        case 0:
          console.log(key + ": Moppin' the floor");
          break;
        case 1:
          console.log(key + ": Washin' the dishes");
          break;
        case 2:
          console.log(key + ": Makin' the bed");
          break;
      }
      if (Math.random() < 0.1) {
        ctx.sendMessage(ctx.topic(), key, { type: 'natureCalls' }, 0);
      } else {
        ctx.sendMessage(ctx.topic(), key, { type: 'continueHouseWork' }, 1000);
      }
    }
  enterBathroomAction: >-
    (ctx, key, value, data) => {
      console.log(key + ": Walkin' to the can. Need to powda mah pretty li'lle nose");
      console.log(key + ": Ahhhhhh! Sweet relief!");
      ctx.sendMessage(ctx.topic(), key, { type: 'continuePrevious' }, 0);
    }
  exitBathroomAction: >-
    (ctx, key, value, data) => {
      console.log(key + ": Leavin' the Jon");
    }
  revertToPreviousState: >-
    (ctx, key, value, data) => {
      return data.cooking ? 'cookStew' : 'doHouseWork'
    }
  hiHoneyAction: >-
    (ctx, key, value, data) => {
      console.log(key + ": Hi honey. Let me make you some of mah fine country stew");
    }
  startCookingAction: >-
    (ctx, key, value, data) => {
      if (!data.cooking) {
        console.log(key + ": Putting the stew in the oven");
        ctx.sendMessage(ctx.topic(), key, { type: 'stewReady' }, 2000);
        data.cooking = true;
      }
      ctx.sendMessage(ctx.topic(), key, { type: 'continueCooking' }, 0);
    }
  continueCookingAction: >-
    (ctx, key, value, data) => {
      console.log(key + ": Fussin' over food");
      if (Math.random() < 0.1) {
        ctx.sendMessage(ctx.topic(), key, { type: 'natureCalls' }, 0);
      } else {
        ctx.sendMessage(ctx.topic(), key, { type: 'continueCooking' }, 1000);
      }
    }
  finishCookingAction: >-
    (ctx, key, value, data) => {
      console.log(key + ": Puttin' the stew on the table");
    }
  letsEatAction: >-
    (ctx, key, value, data) => {
      console.log(key + ": StewReady! Lets eat");
      if (data.husband) {
        ctx.sendMessage('miner', data.husband, { type: 'stewReady' }, 0);
      }
      data.cooking = false;
    }
 

The state transition diagram for the miner’s wife is shown below.

Each KMachine definition, for both the miner and his wife, is entirely contained in the YAML above. A KMachine definition describes an FSM as follows:

  • name – The id of the definition for the FSM.
  • input – The input topic.
  • init – The initial state of the FSM.
  • states – The states of the FSM.  Each state can have the following:
    • name – The name of the state.
    • onEntry – The function to invoke on entry to the state.
    • onExit – The function to invoke on exit of the state.
  • transitions – The state transitions.  Each transition can have the following:
    • type – The event type that triggers the transition.
    • from – The source state.
    • to – The destination, which can be
      • The name of the destination state.
      • The function to determine the destination state.
      • null, which represents an internal state transition, where the state does not change, the onEntry or onExit functions are not invoked, but the guard and onTransition functions are invoked.
    • toType – Either “State” or “Function”.
    • guard – A boolean function to indicate whether the transition should occur.
    • onTransition – The function to invoke on transition.
  • data – A set of key-value pairs that represent the internal data for the FSM.
  • functions – A set of JavaScript functions that can be attached to states and transitions.  Each function takes the following parameters:
    • ctx – A context object that provides the following methods:
      • topic() – The input topic.
      • sendMessage() – Sends a message to a topic.
    • key – The key of the event, as a JSON message.
    • value – The value of the event, as a JSON message.  The value is expected to have a property named “type” to trigger state transitions.
    • data – The local data, which can be mutated.

To see the miner and his wife in action, you’ll first need to start a local instance of Kafka. Then create two topics, one for the miner and one for his wife.

./bin/kafka-topics --create --topic miner --bootstrap-server localhost:9092
./bin/kafka-topics --create --topic miners_wife --bootstrap-server localhost:9092
 

Next, clone the project at https://github.com/rayokota/kmachines and start up the web application.

git clone https://github.com/rayokota/kmachines.git
cd kmachines
mvn clean install -DskipTests
mvn -pl kmachines-rest-app compile quarkus:dev -Dquarkus.http.port=8081
 

In a separate window, create the KMachine definitions for the miner and his wife.

cd kmachines
curl -X POST -H "Content-Type: text/yaml" --data-binary @kmachines-rest-app/src/test/resources/miner_messaging.yml "http://localhost:8081/kmachines"
curl -X POST -H "Content-Type: text/yaml" --data-binary @kmachines-rest-app/src/test/resources/miners_wife_messaging.yml "http://localhost:8081/kmachines"
 

Now produce an event to create a KMachine instance for a miner named “Bob”, and another event to create a KMachine instance for his wife named “Elsa”. This can be done with kafkacat, for example. Events are represented as a pair of JSON messages for the key and value. The key corresponds to a unique KMachine instance, while the value is the message used to trigger state transitions, which must have a “type” property to indicate the event type. In the command below, a dot (.) is used to separate the key from the value (using the -K option of kafkacat).

echo '"Bob".{ "type": "stayHome", "wife": "Elsa" }' | kafkacat -b localhost:9092 -K . -P -t miner
echo '"Elsa".{ "type": "continueHouseWork", "husband": "Bob" }' | kafkacat -b localhost:9092 -K . -P -t miners_wife
 

You should see the miner and his wife interacting as above. You can query the state of the FSM for the miner or the wife at any time.

curl -X POST -H "Content-Type: application/json" http://localhost:8081/kmachines/miner/state --data '"Bob"'
curl -X POST -H "Content-Type: application/json" http://localhost:8081/kmachines/minersWife/state --data '"Elsa"'
 

To stop the agents, run the following commands.

curl -X DELETE "http://localhost:8081/kmachines/minersWife" 
curl -X DELETE "http://localhost:8081/kmachines/miner" 
 

That’s it!

The Enterprise is Made of Events, Not Things

Understanding JSON Schema Compatibility

Confluent Schema Registry provides a centralized repository for an organization’s schemas and a version history of the schemas as they evolve over time.  The first format supported by Schema Registry was Avro. Avro was developed with schema evolution in mind, and its specification clearly states the rules for backward compatibility, where a schema used to read an Avro record may be different from the schema used to write it.

In addition to Avro, today Schema Registry supports both Protobuf and JSON Schema. JSON Schema does not explicitly define compatibility rules, so in this article I will explain some nuances of how compatibility works for JSON Schema.

Grammar-Based and Rule-Based Schema Languages

In general, schema languages can be either grammar-based or rule-based.1 Grammar-based languages are used to specify the structure of a document instance. Both Avro and Protobuf are grammar-based schema languages. Rule-based languages typically specify a set of boolean constraints that the document must satisfy.

JSON Schema combines aspects of both a grammar-based language and a rule-based one. Its rule-based nature can be seen by its use of conjunction (allOf), disjunction (oneOf), negation (not), and conditional logic (if/then/else). The elements of these boolean operators tend to be grammar-based constraints, such as constraining the type of a property.

Open and Closed Content Models

A JSON Schema can be represented as a JSON object or a boolean. In the case of a boolean, the value true will match any valid JSON document, whereas the value false will match no documents. The value true is a synonym for {}, which is a JSON schema (represented as an empty JSON object) with no constraints. Likewise, the value false is a synonym for { "not": {} }.

By default, a JSON schema provides an open content model. For example, the following JSON schema constrains the properties “foo” and “bar” to be of type “string”, but allows any additional properties of arbitrary type.

{
  "type": "object",
  "properties": {
    "foo": { "type": "string" },
    "bar": { "type": "string" }
  }
}

The above schema would accept the following JSON document, containing a property named “zap” that does not appear in the schema:

{ 
  "foo": "hello",
  "bar": "world",
  "zap": 123
}

In order to specify a closed content model, in which additional properties such as “zap” would not be accepted, the schema can be specified with “additionalProperties” as false.

{
  "type": "object",
  "properties": {
    "foo": { "type": "string" },
    "bar": { "type": "string" }
  },
  "additionalProperties": false
}

Backward, Forward, and Full Compatibility

In terms of schema evolution, there are three types of compatibility2:

  1. Backward compatibility – all documents that conform to the previous version of the schema are also valid according to the new version
  2. Forward compatibility – all documents that conform to the new version are also valid according to the previous version of the schema
  3. Full compatibility – the previous version of the schema and the new version are both backward compatible and forward compatible

For the schemas above, Schema 1 is backward compatible with Schema 2, which implies that Schema 2 is forward compatible with Schema 1. This is because any document that conforms to Schema 2 will also conform to Schema 1. Since the default value of “additionalProperties” is true, Schema 1 is equivalent to

{
  "type": "object",
  "properties": {
    "foo": { "type": "string" },
    "bar": { "type": "string" }
  },
  "additionalProperties": true
}

Note that the schema true is backward compatible with false. In fact

  1. The schema true (or {}) is backward compatible with all schemas.
  2. The only schema backward compatible with true is true.
  3. The schema false (or { "not": {} }) is forward compatible with all schemas.
  4. The only schema forward compatible with false is false.

Partially Open Content Models

You may want to allow additional unspecified properties, but only of a specific type. In these scenarios, you can use a partially open content model. One way to specify a partially open content model is to specify a schema other than true or false for “additionalProperties”.

{
  "type": "object",
  "properties": {
    "foo": { "type": "string" },
    "bar": { "type": "string" }
  },
  "additionalProperties": { "type": "string" }
}

The above schema would accept a document containing a string value for “zap”:

{ 
  "foo": "hello",
  "bar": "world",
  "zap": "champ"
}

but not a document containing an integer value for “zap”:

{ 
  "foo": "hello",
  "bar": "world",
  "zap": 123
}

Later one could explicitly specify “zap” as a property with type “string”:

{
  "type": "object",
  "properties": {
    "foo": { "type": "string" },
    "bar": { "type": "string" },
    "zap": { "type": "string" }
  },
  "additionalProperties": { "type": "string" }
}

Schema 5 is backward compatible with Schema 4.

One could even accept other types for “zap”, using a oneOf for example.

{
  "type": "object",
  "properties": {
    "foo": { "type": "string" },
    "bar": { "type": "string" },
    "zap": { 
      "oneOf": [ { "type": "string" }, { "type": "integer" } ] 
    }
  },
  "additionalProperties": { "type": "string" }
}

Schema 6 is also backward compatible with Schema 4.

Another type of partially open content model is one that constrains the additional properties with a regular expression for matching the property name, using a patternProperties construct.

{
  "type": "object",
  "properties": {
    "foo": { "type": "string" },
    "bar": { "type": "string" }
  },
  patternProperties": {
    "^s_": { "type": "string" }
  },
  "additionalProperties": false
}

The above schema allows any other properties other than “foo” and “bar” to appear, as long as the property name starts with “s_” and the type is “string”.

Understanding Full Compatibility

When evolving a schema in a backward compatible manner, it’s easy to add properties to a closed content model, or to remove properties from an open content model. In general, there are two rules to follow to evolve a schema in a backward compatible manner:

  1. When adding a property in a backward compatible manner, the schema of the property being added must be backward compatible with the schema of “additionalProperties” in the previous version of the schema.
  2. When removing a property in a backward compatible manner, the schema of “additionalProperties” in the new version of the schema must be backward compatible with the schema of the property being removed.

The rules for forward compatibility are similar.

  1. When adding a property in a forward compatible manner, the schema of the property being added must be forward compatible with the schema of “additionalProperties” in the previous version of the schema.
  2. When removing a property in a forward compatible manner, the schema of “additionalProperties” in the new version of the schema must be forward compatible with the schema of the property being removed.

For example, to add a property to an open content model, such as Schema 3, in a backward compatible manner, one can add it with type true, since true is the only schema that is backward compatible with true, as previously mentioned.

{
  "type": "object",
  "properties": {
    "foo": { "type": "string" },
    "bar": { "type": "string" },
    "zap": true
  }
  "additionalProperties": true
}

The property “zap” has been added, but it’s been specified with type true, which means that it can match any valid JSON. Schema 3 and Schema 8 are also fully compatible, since they both accept the same set of documents.

This leads to a way to evolve a closed content model, such as Schema 2, in a fully compatible manner, by adding a property of type false.

{
  "type": "object",
  "properties": {
    "foo": { "type": "string" },
    "bar": { "type": "string" },
    "zap": false
  }
  "additionalProperties": false
}

Admittedly, Schema 9 is not very interesting, because in this case the property “zap” matches nothing.

The rules for full compatibility can now be stated as follows.

  1. When adding a property in a fully compatible manner, the schema of the property being added must be fully compatible with the schema of “additionalProperties” in the previous version of the schema.
  2. When removing a property in a fully compatible manner, the schema of “additionalProperties” in the new version of the schema must be fully compatible with the schema of the property being removed.

Using Partially Open Content Models for Full Compatibility

The previous examples of full compatibility are of limited use, since they only allow new properties to match anything using true, in the case of an open content model, or to match nothing using false, in the case of a closed content model. To achieve full compatibility in a meaningful manner, one can use a partially open content model, such as Schema 4, which I repeat below.

{
  "type": "object",
  "properties": {
    "foo": { "type": "string" },
    "bar": { "type": "string" }
  },
  "additionalProperties": { "type": "string" }
}

Schema 4 allows one to add and remove properties of type “string” in a fully compatible manner. What if you want to add properties of either type “string” or “integer”? You could specify additionalProperties with a oneOf, as in the following schema:

{
  "type": "object",
  "properties": {
    "foo": { "type": "string" },
    "bar": { "type": "string" }
  },
  "additionalProperties": { 
    "oneOf": [ { "type": "string" }, { "type": "integer" } ] 
  }
}

But with the above schema, every fully compatible schema that adds a new property would have to specify the type of the property as a oneOf as well:

{
  "type": "object",
  "properties": {
    "foo": { "type": "string" },
    "bar": { "type": "string" },
    "zap": { 
      "oneOf": [ { "type": "string" }, { "type": "integer" } ] 
    }
  },
  "additionalProperties": { 
    "oneOf": [ { "type": "string" }, { "type": "integer" } ] 
  }
}

An alternative would be to use patternProperties. The rules in the previous section regarding adding and removing properties do not apply when using patternProperties.

{
  "type": "object",
  "properties": {
    "foo": { "type": "string" },
    "bar": { "type": "string" }
  },
  "patternProperties": {
    "^s_": { "type": "string" },
    "^i_": { "type": "integer" }
  },
  "additionalProperties": false
}

With Schema 12, one can add properties of type “string” that start with “s_”, or properties of type “integer” that start with “i_”, in a fully compatible manner, as shown below with “s_zap” and “i_zap”.

{
  "type": "object",
  "properties": {
    "foo": { "type": "string" },
    "bar": { "type": "string" },
    "s_zap": { "type": "string" },
    "i_zap": { "type": "integer" }
  },
  patternProperties": {
    "^s_": { "type": "string" },
    "^i_": { "type": "integer" }
  },
  "additionalProperties": false
}

Achieving full compatibility in a meaningful way is possible, but requires some up-front planning, possibly with the use of patternProperties.

Summary

JSON Schema is unique when compared to other schema languages like Avro and Protobuf in that it has aspects of both a rule-based language and a grammar-based language. A better understanding of open, closed, and partially-open content models can help you when evolving schemas in a backward, forward, or fully compatible manner.

Understanding JSON Schema Compatibility

My 10 Favorite Computer Science Books

I’ve seen other lists of favorite books over the last few months, so I thought I’d jump into the fray.  If I were stuck on an island (or in my room during a pandemic), here are the ten computer science books that I would want by my side.  These ten books also provide a great overview of the entire field of computer science.

Structure and Interpretation of Computer Programs

I was fortunate enough to take the course for which this book was developed when I was an undergraduate at MIT.   Since then this book has become a classic.  If you can invest the time, the text has the potential to expand your mind and make you a better programmer.

Artificial Intelligence: A Modern Approach

When I was at MIT, the textbook for the Artificial Intelligence class was written by Professor Winston.  While that was a great book, since then the above textbook by Russell and Norvig has become the bible for AI, and for good reason.  Not only is the text comprehensive, but it is extremely clear and easy to read.

The Language of Machines

I met Professor Floyd while working toward a Master’s degree at Stanford.  He taught an advanced class on automata and computability that later led to the above textbook.  The text is unique in that it introduces a unified model of computation that ties together seemingly disparate concepts.  The class was also memorable in that Professor Floyd was one of the kindest, gentlest teachers I have ever known.  Unfortunately he passed away in 2001.

Abstraction and Specification in Program Development

Professor Liskov is famous for the Liskov Substitution Principle, as well as being a Turing Award winner.  Professor Guttag was my undergraduate advisor, and later became head of the Electrical Engineering and Computer Science department at MIT.  While this is probably the least known book on my list, its influence is greater than is recognized. Although the book uses the CLU programming language to convey its ideas, the ideas were carried over to a subsequent textbook, Program Development in Java.   The above textbook also influenced Introduction to Computation and Programming in Python, which is the foundational text for today’s MIT undergraduate program in computer science.

Compilers: Principles, Techniques, and Tools

The above book is often simply referred to as the Dragon Book.  It remains the bible for compiler theory.  Professor Lam taught an advanced compiler course that I took at Stanford.

The Design and Implementation of the FreeBSD OS

The above textbook is not only comprehensive, but easy to read.  It gives an in-depth view of an influential operating system that continues to be relevant today.

Introduction to Algorithms

When I was at MIT, the authors were just starting to develop this text.  From the chapter notes that were provided during class, I could already tell that a great textbook was taking shape.  Of course, this book is now considered the bible of its field.

TCP/IP Illustrated, Volume 1

This is probably the best book from which to learn about modern day networking.  The level of depth is unmatched and the text continues to reward those who return to it again and again.  W. Richard Stevens passed away in 1999, but fortunately his books continue to be revised for today’s readers.

Transaction Processing: Concepts and Techniques

The authors introduced the ACID (atomicity, consistency, isolation, durability) properties of database transactions, and this book is a comprehensive summary of their work.  The database community lost a true giant when Jim Gray was lost at sea in 2007, while on a short trip to scatter his mother’s ashes near San Francisco.

Concrete Mathematics: A Foundation for Computer Science

This text started as an expanded treatment of the “Mathematical Preliminaries” chapter of The Art of Computer Programming.  While I have not yet read The Art of Computer Programming, I was fortunate enough to take the Stanford course that used the above book, taught by Professor Knuth.  The book provides a more leisurely introduction to the mathematical analysis of algorithms, in a manner that is both challenging and fun.

 

My 10 Favorite Computer Science Books

Keta: A Metadata Store Backed By Apache Kafka

Recently I added the ability for KCache to be configured with different types of backing caches. KCache, by providing an ordered key-value abstraction for a compacted topic in Kafka, can be used as a foundation for a highly-available service for storing metadata, similar to ZooKeeper, etcd, and Consul.

For such a metadata store, I wanted the following features:

  • Ordered key-value model
  • APIs available through gRPC
  • Transactional support using multiversion concurrency control (MVCC)
  • High availability through leader election and failover
  • Ability to be notified of changes to key-value tuples
  • Ability to expire key-value tuples

It turns out that etcd already has these features, so I decided to use the same gRPC APIs and data model as in etcd v3.  In addition, a comprehensive set of Jepsen tests have been built for etcd, so by using the same APIs, I could make use of the same Jepsen tests.

The resulting system is called Keta1.

Hello, Keta

By adopting the etcd v3 APIs, Keta can be used by any client that supports these APIs. Etcd clients are available in go, Java, Python, JavaScript, Ruby, C++, Erlang, and .NET.  In addition, Keta can be used with the etcdctl command line client that ships with etcd.

To get started with Keta, download a release, unpack it, and then modify config/keta.properties to point to an existing Kafka broker.  Then run the following:

$ bin/keta-start config/keta.properties
 

Next download etcd as described here. At a separate terminal, start etcdctl:

$ etcdctl put mykey "this is awesome"
$ etcdctl get mykey
 

The etcd APIs have a concise way for expressioning transactions.

$ etcdctl put user1 bad
$ etcdctl txn --interactive

compares:
value("user1") = "bad"      

success requests (get, put, delete):
del user1  

failure requests (get, put, delete):
put user1 good
 

To expire key-value tuples, use a lease.

$ etcdctl lease grant 300
# lease 2be7547fbc6a5afa granted with TTL(300s)

$ etcdctl put sample value --lease=2be7547fbc6a5afa
$ etcdctl get sample

$ etcdctl lease keep-alive 2be7547fbc6a5afa
$ etcdctl lease revoke 2be7547fbc6a5afa
# or after 300 seconds
$ etcdctl get sample
 

To receive change notifications, use a watch.

$ etcdctl watch stock --prefix

Then at a separate terminal, enter the following:

$ etcdctl put stock1 10
$ etcdctl put stock2 20

If you prefer a GUI, you can use etcdmanager when working with Keta.

Leader Election using the Magical Rebalance Protocol

Keta achieves high availability by allowing any number of Keta instances to be run as a cluster. One instance is chosen as the leader, and all other instances act as followers. The followers will forward both reads and writes to the leader. If the leader dies, another leader is chosen.

Leader election is accomplished by using the rebalance protocol of Kafka, which is the same protocol that is used to assign topic-partitions to consumers in a consumer group.

Jepsen-Driven Development

As mentioned, one nice aspect of using the etcd APIs is that a set of Jepsen tests are already available. This allowed me to use Jepsen-Driven Development (JDD) when developing Keta, which is like Test-Driven Development (TDD), but on steroids.

Jepsen is an awesome framework written in Clojure for testing (or more like breaking) distributed systems. It comes with an in-depth tutorial for writing new Jepsen tests.

I was able to modify the existing Jepsen tests for etcd by having the tests install and start Keta instead of etcd. The client code in the test, which uses the native Java client for etcd, remained untouched. The modified tests can be found here.

I was able to successfully run three types of etcd tests:

  1. A set test, which uses a compare-and-set transaction to concurrently read a set of integers from a single key and append a value to that set.   This test is designed to measure stale reads.
  2. An append test, which uses transactions to concurrently read and append to lists of unique integers.  This test is designed to verify strict serializability.
  3. A register test, which concurrently performs randomized reads, writes, and compare-and-set operations over single keys.  This test is designed to verify linearizability.

Jepsen has a built-in component called nemesis that can inject faults into the system during test runs. For the etcd tests, nemesis was used to kill the leader and to create network partitions.

One challenge of running Keta with Jepsen is that leader election using the rebalance protocol can take several seconds, whereas leader election in etcd, which uses Raft, only takes a second or less. This means that the number of unsuccessful requests is higher in the tests when using Keta than when using etcd, but this is to be expected.

In any case, Keta passes all of the above tests.2

# Run the Jepsen set test
$ lein run test --concurrency 2n --workload set --nemesis kill,partition
...
Everything looks good! ヽ(‘ー`)ノ

# or sometimes
...
Errors occurred during analysis, but no anomalies found. ಠ~ಠ
 

What’s Not in the Box

Keta only provides a subset of the functionality of etcd and so is not a substitute for it. In particular, it is missing

  • A lock service for clients
  • An election service for clients
  • An immutable version history for key-value tuples
  • Membership reconfiguration

For example, Keta only keeps the latest value for a specific key, and not its prior history as is available with etcd.  However, if you’re interested in a highly-available transactional metadata store backed by Apache Kafka that provides most of the features of etcd, please give Keta a try.

Keta: A Metadata Store Backed By Apache Kafka

Using KCache with a Persistent Cache

KCache is a library that provides an ordered key-value store (OKVS) abstraction for a compacted topic in Kafka. As an OKVS, it can be used to treat Kafka as a multi-model database, allowing Kafka to represent graphs, documents, and relational data.

Initially KCache stored data from the compacted topic in an in-memory cache. In newer releases, KCache can be configured to use a persistent cache that stores data to disk. This allows KCache to handle larger data sets, and also improves startup times. The persistent cache is itself an embedded OKVS, and can be configured to be one of the following implementations:

  • Berkeley DB JE
  • LMDB
  • MapDB
  • RocksDB

Here is a quick comparison of the different embedded OKVS libraries before I go into more detail.

Embedded OKVS Data Structure Language Transactions Secondary Indexes License
BDB JE B+ tree Java Yes Yes Apache
LMDB B+ tree C Yes No BSD-like
MapDB B+ tree Java Yes No Apache
RocksDB LSM tree C++ Yes No Apache

Below are additional details on the various libraries.  I also add some historical notes that I find interesting.

Berkeley DB JE

Berkeley DB JE is the Java Edition of the Berkeley DB library.  It is similar but not compatible to the C edition that predates it.  The C edition is simply referred to as Berkeley DB.

Berkeley DB grew out of efforts at the University of California, Berkeley as part of BSD to replace the popular dbm library that existed in AT&T Unix, due to patent issues.  It was first released in 1991.

Berkeley DB JE is the core library in Oracle NoSQL, which extends the capabilities of Berkeley DB JE to a sharded, clustered environment.  Berkeley DB JE supports transactions, and is unique in that it also supports secondary indexes.  It has additional advanced features such as replication and hot backups.  Internally it uses a B+ tree to store data.

LMDB

LMDB, short for Lightening Memory-Mapped Database, is another OKVS that uses the B++ tree data structure.  It was initially designed in 2009 to replace Berkeley DB in the OpenLDAP project.

LMDB supports transactions but not secondary indexes.  LMDB uses a copy-on-write semantics that allows it to not use a transaction log.

MapDB

MapDB is a pure Java implementation of an OKVS.  It evolved from a project started in 2001 called JDBM, which was meant to be a pure Java implementation of the dbm library in AT&T Unix. MapDB provides several collection APIs, including maps, sets, lists, and queues.

MapDB uses a B+ tree data structure and supports transactions, but not secondary indexes.  MapDB also supports snapshots and incremental backups.

RocksDB

RocksDB was created by Facebook in 2012 as a fork of LevelDB.  LevelDB is a library created by Google in 2011 based on ideas from BigTable, the inspiration for HBase.  Both BigTable and HBase can be viewed as distributed OKVSs.

Unlike the OKVSs mentioned above, RocksDB uses an LSM tree to store data.  It supports different compaction styles for merging SST files.  It adds many features that do not exist in LevelDB, including column families, transactions, backups, and checkpoints.  RocksDB is written in C++.

Selecting a Persistent Cache

When selecting a persistent cache for KCache, the first consideration is whether your application is read-heavy vs write-heavy.  In general, an OKVS based on a B+ tree is faster for reads, while one based on an LSM tree is faster for writes.  There’s a good discussion of the pros and cons of B+ trees and LSM trees in Chapter 3 of Designing Data-Intensive Applications, by Martin Kleppmann.

For further performance comparisons, the LMDB project has some good benchmarks here, although they don’t include Berkeley DB JE.  I’ve ported the LMDB benchmarks for KCache and included Berkeley DB JE, so that you can try the benchmarks for yourself on your platform of choice.

Using KCache with a Persistent Cache