The Confluent Schema Registry often serves as the heart of a streaming platform, as it provides centralized management and storage of the schemas for an organization. One feature of the Schema Registry that deserves more attention is its ability to incorporate pluggable resource extensions.
In this post I will show how resource extensions can be used to implement the following:
- Subject modes. For example, one might want to “freeze” a subject so that no further changes can be made.
- A Schema Registry browser. This is a complete single-page application for managing and visualizing schemas in a web browser.
Along the way I will show how to use the KCache library that I introduced in my last post.
Subject Modes
The first resource extension that I will demonstrate is one that provides support for subject modes. With this extension, a subject can be placed in “read-only” mode so that no further changes can be made to the subject. Also, an entire Schema Registry cluster can be placed in “read-only” mode. This may be useful, for example, when using Confluent Replicator to replicate Schema Registry from one Kafka cluster to another. If one wants to keep the two registries in sync, one could mark the Schema Registry that is the target of replication as “read-only”.
When implementing the extension, we want to associate a mode, either “read-only” or “read-write”, to a given subject (or *
to indicate all subjects). The association needs to be persistent, so that it can survive a restart of the Schema Registry. We could use a database, but the Schema Registry already has a dependency on Kafka, so perhaps we can store the association in Kafka. This is a perfect use case for KCache, which is an in-memory cache backed by Kafka. Using an instance of KafkaCache
, saving and retrieving the mode for a given subject is straightforward:
public class ModeRepository implements Closeable { // Used to represent all subjects public static final String SUBJECT_WILDCARD = "*"; private final Cache<String, String> cache; public ModeRepository(SchemaRegistryConfig schemaRegistryConfig) { KafkaCacheConfig config = new KafkaCacheConfig(schemaRegistryConfig.originalProperties()); cache = new KafkaCache<>(config, Serdes.String(), Serdes.String()); cache.init(); } public Mode getMode(String subject) { if (subject == null) subject = SUBJECT_WILDCARD; String mode = cache.get(subject); if (mode == null && subject.equals(SUBJECT_WILDCARD)) { // Default mode for top level return Mode.READWRITE; } return mode != null ? Enum.valueOf(Mode.class, mode) : null; } public void setMode(String subject, Mode mode) { if (subject == null) subject = SUBJECT_WILDCARD; cache.put(subject, mode.name()); } @Override public void close() throws IOException { cache.close(); } }
Using the ModeRepository
, we can provide a ModeResource
class that provides REST APIs for saving and retrieving modes:
public class ModeResource { private static final int INVALID_MODE_ERROR_CODE = 42299; private final ModeRepository repository; private final KafkaSchemaRegistry schemaRegistry; public ModeResource( ModeRepository repository, SchemaRegistry schemaRegistry ) { this.repository = repository; this.schemaRegistry = (KafkaSchemaRegistry) schemaRegistry; } @Path("/{subject}") @PUT public ModeUpdateRequest updateMode( @PathParam("subject") String subject, @Context HttpHeaders headers, @NotNull ModeUpdateRequest request ) { Mode mode; try { mode = Enum.valueOf( Mode.class, request.getMode().toUpperCase(Locale.ROOT)); } catch (IllegalArgumentException e) { throw new RestConstraintViolationException( "Invalid mode. Valid values are READWRITE and READONLY.", INVALID_MODE_ERROR_CODE); } try { if (schemaRegistry.isMaster()) { repository.setMode(subject, mode); } else { throw new RestSchemaRegistryException( "Failed to update mode, not the master"); } } catch (CacheException e) { throw Errors.storeException("Failed to update mode", e); } return request; } @Path("/{subject}") @GET public ModeGetResponse getMode(@PathParam("subject") String subject) { try { Mode mode = repository.getMode(subject); if (mode == null) { throw Errors.subjectNotFoundException(); } return new ModeGetResponse(mode.name()); } catch (CacheException e) { throw Errors.storeException("Failed to get mode", e); } } @PUT public ModeUpdateRequest updateTopLevelMode( @Context HttpHeaders headers, @NotNull ModeUpdateRequest request ) { return updateMode(ModeRepository.SUBJECT_WILDCARD, headers, request); } @GET public ModeGetResponse getTopLevelMode() { return getMode(ModeRepository.SUBJECT_WILDCARD); } }
Now we need a filter to reject requests that attempt to modify a subject when it is in read-only mode:
@Priority(Priorities.AUTHORIZATION) public class ModeFilter implements ContainerRequestFilter { private static final Set<ResourceActionKey> subjectWriteActions = new HashSet<>(); private ModeRepository repository; @Context ResourceInfo resourceInfo; @Context UriInfo uriInfo; @Context HttpServletRequest httpServletRequest; static { initializeSchemaRegistrySubjectWriteActions(); } private static void initializeSchemaRegistrySubjectWriteActions() { subjectWriteActions.add( new ResourceActionKey(SubjectVersionsResource.class, "POST")); subjectWriteActions.add( new ResourceActionKey(SubjectVersionsResource.class, "DELETE")); subjectWriteActions.add( new ResourceActionKey(SubjectsResource.class, "DELETE")); subjectWriteActions.add( new ResourceActionKey(ConfigResource.class, "PUT")); } public ModeFilter(ModeRepository repository) { this.repository = repository; } @Override public void filter(ContainerRequestContext requestContext) { Class resource = resourceInfo.getResourceClass(); String restMethod = requestContext.getMethod(); String subject = uriInfo.getPathParameters().getFirst("subject"); Mode mode = repository.getMode(subject); if (mode == null) { // Check the top level mode mode = repository.getMode(ModeRepository.SUBJECT_WILDCARD); } if (mode == Mode.READONLY) { ResourceActionKey key = new ResourceActionKey(resource, restMethod); if (subjectWriteActions.contains(key)) { requestContext.abortWith( Response.status(Response.Status.UNAUTHORIZED) .entity("Subject is read-only.") .build()); } } } private static class ResourceActionKey { private final Class resourceClass; private final String restMethod; public ResourceActionKey(Class resourceClass, String restMethod) { this.resourceClass = resourceClass; this.restMethod = restMethod; } ... } }
Finally, the resource extension simply creates the mode repository and then registers the resource and the filter:
public class SchemaRegistryModeResourceExtension implements SchemaRegistryResourceExtension { private ModeRepository repository; @Override public void register( Configurable<?> configurable, SchemaRegistryConfig schemaRegistryConfig, SchemaRegistry schemaRegistry ) { repository = new ModeRepository(schemaRegistryConfig); configurable.register(new ModeResource(repository, schemaRegistry)); configurable.register(new ModeFilter(repository)); } @Override public void close() throws IOException { repository.close(); } }
The complete source code listing can be found here.
To use our new resource extension, we first copy the extension jar (and the KCache
jar) to ${CONFLUENT_HOME}/share/java/schema-registry
. Next we add the following to ${CONFLUENT_HOME}/etc/schema-registry/schema-registry.properties
:
kafkacache.bootstrap.servers=localhost:9092 resource.extension.class=io.yokota.schemaregistry.mode.SchemaRegistryModeResourceExtension
Now after we start the Schema Registry, we can save and retrieve modes for subjects via the REST API:
$ curl -X PUT -H "Content-Type: application/json" \ http://localhost:8081/mode/topic-key --data '{"mode": "READONLY"}' {"mode":"READONLY"} $ curl localhost:8081/mode/topic-key {"mode":"READONLY"} $ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \ --data '{ "schema": "{ \"type\": \"string\" }" }' \ http://localhost:8081/subjects/topic-key/versions Subject is read-only.
It works!
A Schema Registry Browser
Resource extensions can not only be used to add new REST APIs to the Schema Registry; they can also be used to add entire web-based user interfaces. As a demonstration, I’ve developed a resource extension that provides a Schema Registry browser by bundling a single-page application based on Vue.js, which resides here. To use the Schema Registry browser, place the resource extension jar in ${CONFLUENT_HOME}/share/java/schema-registry
and then add the following properties to ${CONFLUENT_HOME}/etc/schema-registry/schema-registry.properties
:1
resource.static.locations=static resource.extension.class=io.yokota.schemaregistry.browser.SchemaRegistryBrowserResourceExtension
The resource extension merely indicates which URLs should use the static resources:
public class SchemaRegistryBrowserResourceExtension implements SchemaRegistryResourceExtension { @Override public void register( Configurable<?> configurable, SchemaRegistryConfig schemaRegistryConfig, SchemaRegistry kafkaSchemaRegistry ) throws SchemaRegistryException { configurable.property(ServletProperties.FILTER_STATIC_CONTENT_REGEX, "/(static/.*|.*\\.html|.*\\.js)"); } @Override public void close() { } }
Once we start the Schema Registry and navigate to http://localhost:8081/index.html, we will be greeted with the home page of the Schema Registry browser:
From the Entities dropdown, we can navigate to a listing of all subjects:
If we view a specific subject, we can see the complete version history for the subject, with schemas nicely formatted:
I haven’t shown all the functionality of the Schema Registry browser, but it supports all the APIs that are available through REST, including creating schemas, retrieving schemas by ID, etc.
Hopefully this post has inspired you to create your own resource extensions for the Confluent Schema Registry. You might even have fun. 🙂
- Support for
resource.static.locations
is available in this PR.