Fun with Confluent Schema Registry Extensions

The Confluent Schema Registry often serves as the heart of a streaming platform, as it provides centralized management and storage of the schemas for an organization. One feature of the Schema Registry that deserves more attention is its ability to incorporate pluggable resource extensions.

In this post I will show how resource extensions can be used to implement the following:

  1. Subject modes. For example, one might want to “freeze” a subject so that no further changes can be made.
  2. A Schema Registry browser. This is a complete single-page application for managing and visualizing schemas in a web browser.

Along the way I will show how to use the KCache library that I introduced in my last post.

Subject Modes

The first resource extension that I will demonstrate is one that provides support for subject modes. With this extension, a subject can be placed in “read-only” mode so that no further changes can be made to the subject. Also, an entire Schema Registry cluster can be placed in “read-only” mode. This may be useful, for example, when using Confluent Replicator to replicate Schema Registry from one Kafka cluster to another. If one wants to keep the two registries in sync, one could mark the Schema Registry that is the target of replication as “read-only”.

When implementing the extension, we want to associate a mode, either “read-only” or “read-write”, to a given subject (or * to indicate all subjects). The association needs to be persistent, so that it can survive a restart of the Schema Registry. We could use a database, but the Schema Registry already has a dependency on Kafka, so perhaps we can store the association in Kafka. This is a perfect use case for KCache, which is an in-memory cache backed by Kafka. Using an instance of KafkaCache, saving and retrieving the mode for a given subject is straightforward:

public class ModeRepository implements Closeable {

    // Used to represent all subjects
    public static final String SUBJECT_WILDCARD = "*";

    private final Cache<String, String> cache;

    public ModeRepository(SchemaRegistryConfig schemaRegistryConfig) {
        KafkaCacheConfig config =
            new KafkaCacheConfig(schemaRegistryConfig.originalProperties());
        cache = new KafkaCache<>(config, Serdes.String(), Serdes.String());
        cache.init();
    }

    public Mode getMode(String subject) {
        if (subject == null) subject = SUBJECT_WILDCARD;
        String mode = cache.get(subject);
        if (mode == null && subject.equals(SUBJECT_WILDCARD)) {
            // Default mode for top level
            return Mode.READWRITE;
        }
        return mode != null ? Enum.valueOf(Mode.class, mode) : null;
    }

    public void setMode(String subject, Mode mode) {
        if (subject == null) subject = SUBJECT_WILDCARD;
        cache.put(subject, mode.name());
    }

    @Override
    public void close() throws IOException {
        cache.close();
    }
}

Using the ModeRepository, we can provide a ModeResource class that provides REST APIs for saving and retrieving modes:

public class ModeResource {

    private static final int INVALID_MODE_ERROR_CODE = 42299;

    private final ModeRepository repository;
    private final KafkaSchemaRegistry schemaRegistry;

    public ModeResource(
        ModeRepository repository, 
        SchemaRegistry schemaRegistry
    ) {
        this.repository = repository;
        this.schemaRegistry = (KafkaSchemaRegistry) schemaRegistry;
    }

    @Path("/{subject}")
    @PUT
    public ModeUpdateRequest updateMode(
        @PathParam("subject") String subject,
        @Context HttpHeaders headers,
        @NotNull ModeUpdateRequest request
    ) {
        Mode mode;
        try {
            mode = Enum.valueOf(
                Mode.class, request.getMode().toUpperCase(Locale.ROOT));
        } catch (IllegalArgumentException e) {
            throw new RestConstraintViolationException(
                "Invalid mode. Valid values are READWRITE and READONLY.", 
                INVALID_MODE_ERROR_CODE);
        }
        try {
            if (schemaRegistry.isMaster()) {
                repository.setMode(subject, mode);
            } else {
                throw new RestSchemaRegistryException(
                    "Failed to update mode, not the master");
            }
        } catch (CacheException e) {
            throw Errors.storeException("Failed to update mode", e);
        }

        return request;
    }

    @Path("/{subject}")
    @GET
    public ModeGetResponse getMode(@PathParam("subject") String subject) {
        try {
            Mode mode = repository.getMode(subject);
            if (mode == null) {
                throw Errors.subjectNotFoundException();
            }
            return new ModeGetResponse(mode.name());
        } catch (CacheException e) {
            throw Errors.storeException("Failed to get mode", e);
        }
    }

    @PUT
    public ModeUpdateRequest updateTopLevelMode(
        @Context HttpHeaders headers,
        @NotNull ModeUpdateRequest request
    ) {
        return updateMode(ModeRepository.SUBJECT_WILDCARD, headers, request);
    }

    @GET
    public ModeGetResponse getTopLevelMode() {
        return getMode(ModeRepository.SUBJECT_WILDCARD);
    }
}

Now we need a filter to reject requests that attempt to modify a subject when it is in read-only mode:

@Priority(Priorities.AUTHORIZATION)
public class ModeFilter implements ContainerRequestFilter {

    private static final Set<ResourceActionKey> subjectWriteActions = 
        new HashSet<>();

    private ModeRepository repository;

    @Context
    ResourceInfo resourceInfo;

    @Context
    UriInfo uriInfo;

    @Context
    HttpServletRequest httpServletRequest;

    static {
        initializeSchemaRegistrySubjectWriteActions();
    }

    private static void initializeSchemaRegistrySubjectWriteActions() {
        subjectWriteActions.add(
            new ResourceActionKey(SubjectVersionsResource.class, "POST"));
        subjectWriteActions.add(
            new ResourceActionKey(SubjectVersionsResource.class, "DELETE"));
        subjectWriteActions.add(
            new ResourceActionKey(SubjectsResource.class, "DELETE"));
        subjectWriteActions.add(
            new ResourceActionKey(ConfigResource.class, "PUT"));
    }

    public ModeFilter(ModeRepository repository) {
        this.repository = repository;
    }

    @Override
    public void filter(ContainerRequestContext requestContext) {
        Class resource = resourceInfo.getResourceClass();
        String restMethod = requestContext.getMethod();
        String subject = uriInfo.getPathParameters().getFirst("subject");

        Mode mode = repository.getMode(subject);
        if (mode == null) {
            // Check the top level mode
            mode = repository.getMode(ModeRepository.SUBJECT_WILDCARD);
        }
        if (mode == Mode.READONLY) {
            ResourceActionKey key = new ResourceActionKey(resource, restMethod);
            if (subjectWriteActions.contains(key)) {
                requestContext.abortWith(
                    Response.status(Response.Status.UNAUTHORIZED)
                        .entity("Subject is read-only.")
                        .build());
            }
        }
    }

    private static class ResourceActionKey {

        private final Class resourceClass;
        private final String restMethod;

        public ResourceActionKey(Class resourceClass, String restMethod) {
            this.resourceClass = resourceClass;
            this.restMethod = restMethod;
        }

        ...
    }
}

Finally, the resource extension simply creates the mode repository and then registers the resource and the filter:

public class SchemaRegistryModeResourceExtension 
    implements SchemaRegistryResourceExtension {

    private ModeRepository repository;

    @Override
    public void register(
        Configurable<?> configurable,
        SchemaRegistryConfig schemaRegistryConfig,
        SchemaRegistry schemaRegistry
    ) {
        repository = new ModeRepository(schemaRegistryConfig);
        configurable.register(new ModeResource(repository, schemaRegistry));
        configurable.register(new ModeFilter(repository));
    }

    @Override
    public void close() throws IOException {
        repository.close();
    }
}

The complete source code listing can be found here.

To use our new resource extension, we first copy the extension jar (and the KCache jar) to ${CONFLUENT_HOME}/share/java/schema-registry. Next we add the following to ${CONFLUENT_HOME}/etc/schema-registry/schema-registry.properties:

kafkacache.bootstrap.servers=localhost:9092
resource.extension.class=io.yokota.schemaregistry.mode.SchemaRegistryModeResourceExtension

Now after we start the Schema Registry, we can save and retrieve modes for subjects via the REST API:

$ curl -X PUT -H "Content-Type: application/json" \
  http://localhost:8081/mode/topic-key --data '{"mode": "READONLY"}'
{"mode":"READONLY"}

$ curl localhost:8081/mode/topic-key
{"mode":"READONLY"}

$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  --data '{ "schema": "{ \"type\": \"string\" }" }' \
  http://localhost:8081/subjects/topic-key/versions                                          
Subject is read-only.

It works!

A Schema Registry Browser

Resource extensions can not only be used to add new REST APIs to the Schema Registry; they can also be used to add entire web-based user interfaces. As a demonstration, I’ve developed a resource extension that provides a Schema Registry browser by bundling a single-page application based on Vue.js, which resides here. To use the Schema Registry browser, place the resource extension jar in ${CONFLUENT_HOME}/share/java/schema-registry and then add the following properties to ${CONFLUENT_HOME}/etc/schema-registry/schema-registry.properties:1

resource.static.locations=static
resource.extension.class=io.yokota.schemaregistry.browser.SchemaRegistryBrowserResourceExtension

The resource extension merely indicates which URLs should use the static resources:

public class SchemaRegistryBrowserResourceExtension 
    implements SchemaRegistryResourceExtension {

    @Override
    public void register(
        Configurable<?> configurable,
        SchemaRegistryConfig schemaRegistryConfig,
        SchemaRegistry kafkaSchemaRegistry
    ) throws SchemaRegistryException {
        configurable.property(ServletProperties.FILTER_STATIC_CONTENT_REGEX, 
            "/(static/.*|.*\\.html|.*\\.js)");
    }

    @Override
    public void close() {
    }
}

Once we start the Schema Registry and navigate to http://localhost:8081/index.html, we will be greeted with the home page of the Schema Registry browser:

From the Entities dropdown, we can navigate to a listing of all subjects:

If we view a specific subject, we can see the complete version history for the subject, with schemas nicely formatted:

I haven’t shown all the functionality of the Schema Registry browser, but it supports all the APIs that are available through REST, including creating schemas, retrieving schemas by ID, etc.

Hopefully this post has inspired you to create your own resource extensions for the Confluent Schema Registry. You might even have fun. 🙂

Fun with Confluent Schema Registry Extensions