Using Data Contracts with the Rust Schema Registry Client

The Rust Schema Registry Client is a fully asynchronous Rust client library for interacting with the Confluent Schema Registry. It allows you to enforce a Data Contract between a producer and a consumer. For example, you can

  • Serialize and deserialize Kafka records using Avro, Protobuf, and JSON Schema
  • Specify data quality rules using Google Common Expression Language (CEL) expressions
  • Specify schema migration rules using JSONata expressions
  • Enforce client-side field-level encryption (CSFLE) rules using AWS KMS, Azure Key Vault, Google Cloud KMS, or HashiCorp Vault

This library can be used with rust-rdkafka but does not depend on it.

Below we’ll show some examples of how to use this library with Avro, Protobuf, and JSON Schema formats.

Avro

The Avro support relies on the Apache Avro Rust library.  There are two ways of handling Avro data in Rust:

  • as Avro-specialized data types based on an Avro schema
  • as generic Rust serde-compatible types deriving Serialize and Deserialize

On the producer side, first we create an Avro serializer.

let client_conf = SchemaRegistryClientConfig::new(vec![url.to_string()]);
    
let client = SchemaRegistryClient::new(client_conf);

// We pass the schema and use auto-registration.
let schema_str = r#"
 {
    "namespace": "confluent.io.examples.serialization.avro",
    "name": "User",
    "type": "record",
    "fields": [
        {"name": "name", "type": "string", "confluent:tags": [ "PII" ]},
        {"name": "favorite_number", "type": "long"},
        {"name": "favorite_color", "type": "string"}
    ]
}
"#;

let schema = Schema {
    schema_type: Some("AVRO".to_string()),
    references: None,
    metadata: None,
    rule_set: None,
    schema: schema_str.to_string(),
};

let ser_conf = SerializerConfig::new(true, None, true, false, HashMap::new());

let ser = AvroSerializer::new(
    &client, Some(&schema), None, ser_conf
)
.expect("Failed to create serializer");
 

If using the Avro-specific data types, we create an Avro record and serialize using the serialize() method.

let fields = vec![
    ("name".to_string(), Value::String("John Doe".to_string())),
    ("favorite_number".to_string(), Value::Int(7)),
    ("favorite_color".to_string(), Value::String("blue".to_string())),
];

let value = Value::Record(fields);

let ser_ctx = SerializationContext {
    topic: topic_name.to_string(),
    serde_type: SerdeType::Value,  // whether record key or value
    serde_format: SerdeFormat::Avro,
    headers: None,
};

let bytes: Vec<u8> = ser
    .serialize(&ser_ctx, value)
    .await
    .expect("Failed to serialize");
 

If using the serde-compatible types, we create an object and serialize it using the serialize_ser() method.

// The User type must implement Serialize and Deserialize
let obj = User {
    name: "John Doe".to_string(),,
    favorite_number: 7,
    favorite_color: "blue".to_string(),
};

let ser_ctx = SerializationContext {
    topic: topic_name.to_string(),
    serde_type: SerdeType::Value,  // whether record key or value
    serde_format: SerdeFormat::Avro,
    headers: None,

};

let bytes: Vec<u8> = ser
    .serialize_ser(&ser_ctx, obj)
    .await
    .expect("Failed to serialize");
 

On the consumer side, we create an Avro deserializer.

let client_conf = SchemaRegistryClientConfig::new(vec![url.to_string()]);
    
let client = SchemaRegistryClient::new(client_conf);

let deser_conf = DeserializerConfig::new(None, false, HashMap::new());
    
let deser = AvroDeserializer::new(
    &client, None, deser_conf
)
.expect("Failed to create deserializer");
 

Assuming we have a Kafka message, we deserialize it as follows.

let m: BorrowedMessage = ...;

let ser_ctx = SerializationContext {
    topic: m.topic().to_string(),
    serde_type: SerdeType::Value,  // whether record key or value
    serde_format: SerdeFormat::Avro,
    headers: None,
};

let payload = deser
    .deserialize(&ser_ctx, &m.payload().unwrap_or(b""))
    .await
    .unwrap();

info!("received: '{:?}', payload.value);
 

If you want to deserialize the message into a serde-compatible type, you can add the following lines:

let obj = from_value::<User>(&payload.value).unwrap();

info!("received: '{:?}', obj);
 

Protobuf

The Protobuf support relies on the prost and prost-reflect libraries.

On the producer side, first we create a Protobuf serializer.

let client_conf = SchemaRegistryClientConfig::new(vec![url.to_string()]);
    
let client = SchemaRegistryClient::new(client_conf);

let ser_conf = SerializerConfig::new(true, None, true, false, HashMap::new());
    
let ser = ProtobufSerializer::new(
    &client, None, ser_conf
)
.expect("Failed to create serializer");
 

If your Protobuf object implements the ReflectMessage trait from prost-reflect, then serializing it is straightforward.

// In this example, the Author type implements ReflectMessage
let obj = Author {
    name: "Franz Kafka".to_string(),
    works: vec!["Metamorphosis".to_string(), "The Trial".to_string()],
};

let ser_ctx = SerializationContext {
    topic: topic_name.to_string(),
    serde_type: SerdeType::Value,  // whether record key or value
    serde_format: SerdeFormat::Protobuf,
    headers: None,
};
                
let bytes: Vec<u8> = ser
    .serialize(&ser_ctx, &obj)
    .await
    .expect("Failed to serialize");
 

However, if your Protobuf object only implements the Message trait from prost, then you must pass both the fully-qualified name of the message type and the relevant FileDescriptorSet.

// In this example, the Author type only implements Message
let obj = Author {
    name: "Franz Kafka".to_string(),
    works: vec!["Metamorphosis".to_string(), "The Trial".to_string()],
};

let fds: FileDescriptorSet = ...

let ser_ctx = SerializationContext {
    topic: topic_name.to_string(),
    serde_type: SerdeType::Value,  // whether record key or value
    serde_format: SerdeFormat::Protobuf,
    headers: None,
};
                
let bytes: Vec<u8> = ser
    .serialize_with_file_desc_set(&ser_ctx, &obj, "test.Author", fds)
    .await
    .expect("Failed to serialize");
 

On the consumer side, we create a Protobuf deserializer.

let client_conf = SchemaRegistryClientConfig::new(vec![url.to_string()]);
    
let client = SchemaRegistryClient::new(client_conf);

let deser_conf = DeserializerConfig::new(None, false, HashMap::new());

let deser = ProtobufDeserializer::new(
    &client, None, deser_conf
)
.expect("Failed to create deserializer");
 

Assuming we have a Kafka message, we deserialize it as follows.

let m: BorrowedMessage = ...;

let ser_ctx = SerializationContext {
    topic: m.topic().to_string(),
    serde_type: SerdeType::Value,  // whether record key or value
    serde_format: SerdeFormat::Protobuf,
    headers: None,
};

let obj: Author = deser
    .deserialize(&ser_ctx, &m.payload().unwrap_or(b""))
    .await
    .unwrap();
                
info!("received: '{:?}', obj);
 

JSON

When serializing JSON, JSON Schema validation can be performed using a JSON Schema validation library.

On the producer side, first we create a JSON serializer.

let client_conf = SchemaRegistryClientConfig::new(vec![url.to_string()]);
    
let client = SchemaRegistryClient::new(client_conf);

// We pass the schema and use auto-registration.
let schema_str = r#"
{
  "type": "object",
  "properties": {
    "name": {
       "type": "string",
       "confluent:tags": [ "PII" ]
    },
    "favorite_number": { "type": "number" },
    "favorite_color": { "type": "string" }
  }
}
"#;

let schema = Schema {
    schema_type: Some("JSON".to_string()),
    references: None,
    metadata: None,
    rule_set: None,
    schema: schema_str.to_string(),
};

let ser_conf = SerializerConfig::new(
    true, None, true, /* validate */ false, HashMap::new()
);

let ser = JsonSerializer::new(
    &client, Some(&schema), None, ser_conf
)
.expect("Failed to create serializer");
 

We create a JSON value and serialize it as follows.

let value = serde_json::json!({
    "name": "John Doe",
    "favorite_number": 7,
    "favorite_color": "blue"
});

let ser_ctx = SerializationContext {
    topic: topic_name.to_string(),
    serde_type: SerdeType::Value,  // whether record key or value
    serde_format: SerdeFormat::Json,
    headers: None,
};
                
let bytes: Vec<u8> = ser
    .serialize(&ser_ctx, value)
    .await
    .expect("Failed to serialize");
 

On the consumer side, we create a JSON deserializer.

let client_conf = SchemaRegistryClientConfig::new(vec![url.to_string()]);
    
let client = SchemaRegistryClient::new(client_conf);

let deser_conf = DeserializerConfig::new(
    None, /* validate */ false, HashMap::new()
);
    
let deser = JsonDeserializer::new(
    &client, None, deser_conf
)
.expect("Failed to create deserializer");
 

Assuming we have a Kafka message, we deserialize it as follows.

let m: BorrowedMessage = ...;

let ser_ctx = SerializationContext {
    topic: m.topic().to_string(),
    serde_type: SerdeType::Value,
    serde_format: SerdeFormat::Json,
    headers: None,
};
                
let value = deser
    .deserialize(&ser_ctx, &m.payload().unwrap_or(b""))
    .await
    .unwrap();

info!("received: '{:?}', value);
 

See the examples for more information, including details on how to use the Rust Schema Registry Client with rust-rdkafka.

Using Data Contracts with the Rust Schema Registry Client

Leave a Reply