Giter VIP home page Giter VIP logo

schema-registry's Introduction

Schema Registry

Confluent Schema Registry provides a serving layer for your metadata. It provides a RESTful interface for storing and retrieving your Avro®, JSON Schema, and Protobuf schemas. It stores a versioned history of all schemas based on a specified subject name strategy, provides multiple compatibility settings and allows evolution of schemas according to the configured compatibility settings and expanded support for these schema types. It provides serializers that plug into Apache Kafka® clients that handle schema storage and retrieval for Kafka messages that are sent in any of the supported formats.

This README includes the following sections:

Documentation

Here are a few links to Schema Registry pages in the Confluent Documentation.

Quickstart API Usage examples

The following assumes you have Kafka and an instance of the Schema Registry running using the default settings. These examples, and more, are also available at API Usage examples on docs.confluent.io.

# Register a new version of a schema under the subject "Kafka-key"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://localhost:8081/subjects/Kafka-key/versions
  {"id":1}

# Register a new version of a schema under the subject "Kafka-value"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
     http://localhost:8081/subjects/Kafka-value/versions
  {"id":1}

# List all subjects
$ curl -X GET http://localhost:8081/subjects
  ["Kafka-value","Kafka-key"]

# List all schema versions registered under the subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions
  [1]

# Fetch a schema by globally unique id 1
$ curl -X GET http://localhost:8081/schemas/ids/1
  {"schema":"\"string\""}

# Fetch version 1 of the schema registered under subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/1
  {"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}

# Fetch the most recently registered schema under subject "Kafka-value"
$ curl -X GET http://localhost:8081/subjects/Kafka-value/versions/latest
  {"subject":"Kafka-value","version":1,"id":1,"schema":"\"string\""}

# Delete version 3 of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/3
  3

# Delete all versions of the schema registered under subject "Kafka-value"
$ curl -X DELETE http://localhost:8081/subjects/Kafka-value
  [1, 2, 3, 4, 5]

# Check whether a schema has been registered under subject "Kafka-key"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://localhost:8081/subjects/Kafka-key
  {"subject":"Kafka-key","version":1,"id":1,"schema":"\"string\""}

# Test compatibility of a schema with the latest schema under subject "Kafka-value"
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"schema": "{\"type\": \"string\"}"}' \
    http://localhost:8081/compatibility/subjects/Kafka-value/versions/latest
  {"is_compatible":true}

# Get top level config
$ curl -X GET http://localhost:8081/config
  {"compatibilityLevel":"BACKWARD"}

# Update compatibility requirements globally
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"compatibility": "NONE"}' \
    http://localhost:8081/config
  {"compatibility":"NONE"}

# Update compatibility requirements under the subject "Kafka-value"
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
    --data '{"compatibility": "BACKWARD"}' \
    http://localhost:8081/config/Kafka-value
  {"compatibility":"BACKWARD"}

Installation

You can download prebuilt versions of the schema registry as part of the Confluent Platform. To install from source, follow the instructions in the Development section.

Deployment

The REST interface to schema registry includes a built-in Jetty server. The wrapper scripts bin/schema-registry-start and bin/schema-registry-stop are the recommended method of starting and stopping the service.

Development

To build a development version, you may need a development versions of common and rest-utils. After installing these, you can build the Schema Registry with Maven.

This project uses the Google Java code style to keep code clean and consistent.

To build:

mvn compile

To run the unit and integration tests:

mvn test

To run an instance of Schema Registry against a local Kafka cluster (using the default configuration included with Kafka):

mvn exec:java -pl :kafka-schema-registry -Dexec.args="config/schema-registry.properties"

To create a packaged version, optionally skipping the tests:

mvn package [-DskipTests]

It produces:

  • Schema registry in package-schema-registry/target/kafka-schema-registry-package-$VERSION-package
  • Serde tools for avro/json/protobuf in package-kafka-serde-tools/target/kafka-serde-tools-package-$VERSION-package

Each of the produced contains a directory layout similar to the packaged binary versions.

You can also produce a standalone fat JAR of schema registry using the standalone profile:

mvn package -P standalone [-DskipTests]

This generates package-schema-registry/target/kafka-schema-registry-package-$VERSION-standalone.jar, which includes all the dependencies as well.

OpenAPI Spec

OpenAPI (formerly known as Swagger) specifications are built automatically using swagger-maven-plugin on compile phase.

Contribute

Thanks for helping us to make Schema Registry even better!

License

The project is licensed under the Confluent Community License, except for the client-* and avro-* libs, which are under the Apache 2.0 license. See LICENSE file in each subfolder for detailed license agreement.

schema-registry's People

Contributors

akhileshm1 avatar andrewegel avatar c0urante avatar clarence97 avatar confluentjenkins avatar confluentsemaphore avatar dragosvictor avatar elismaga avatar ewencp avatar hachikuji avatar ijuma avatar ishiihara avatar janjwerner-confluent avatar jcustenborder avatar joel-hamill avatar junrao avatar kkonstantine avatar londoncalling avatar mageshn avatar maxzheng avatar nehanarkhede avatar rayokota avatar rhauch avatar suj-krishnan avatar varunpv avatar xiangxin72 avatar xiaoyali97 avatar xjin-confluent avatar xli1996 avatar ybyzek avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

schema-registry's Issues

Add multi colo setup support

Multi colo setup for schema registry includes setting up a mirror maker for the schema registry log topic between the schema registry cluster in the master colo to the ones in the slave colos. Before setting up mirror maker, the admin also needs to delete the schema registry log topic from the slave schema registry cluster, if one exists. It will be good to have scripts and documentation for this.

Qualify 500 error codes with more details about the type of server side error

As per @ewencp's suggestion -

More specific error codes to qualify 500 errors are useful. For example, we could have a 50001 - Kafka error, which generically covers underlying issues with communicating with Kafka. That's generic, doesn't require defining hundreds of error codes, but still gives more useful information than just a plain 500 (which many people interpret as a buggy web service rather than a legitimate issue with another service).

Evaluate and improve performance of de/serializers

The AbstractKafkaAvroSerializer and AbstractKafkaAvroDeserializer both use the straightforward implementation of Avro serialization that allocates a new encoder and DatumWriter for every call.

We should start by adding a small microbenchmark tool to evaluate performance, then add some caching. Any cache implementation needs to be careful to handle multiple threads well. Additionally, to support applications like kafka-rest that may be long-running, use a lot of schemas, and survive across schema upgrades, it should be able to expire and dispose of cached DatumWriters.

Update API

Update API to reflect today's discussion. We decided that HTTP POST method is legitimately used to make queries when data in the message body is required. The argument against using GET was that passing message body data is uncommon and may not be supported in some http libraries.

Change compatibility check:

  • Add support for POST /compatibility/subjects//versions/
    schema string in message body is tested against schema in /
    return true, or false with incompatibility metadata produced by Avro
    this includes adding support for == 'latest'
  • Remove dry_run support

Add ability to check whether a schema is registered under a topic:

  • POST /subjects/
    schema string in message body
    return ?

Change /schemas/ to /schemas/ids/

We also concluded that POST to a collections resource should canonically be considered a create request, but that POST used elsewhere can be more flexible. E.g. POST /subjects//versions creates a version under the collection of versions.

Clean up dependencies

It looks like we have some extra dependencies that aren't actually required. At a minimum, I see Jersey, which should be pulled in transitively via rest-utils. In order to generate binary packages as efficiently as possible we should get rid of any dependencies that aren't strictly required and aren't provided transitively by other dependencies.

schema registration - check local caches before forwarding request

Currently a register request forwards immediately to master if the given process is not master. The cache-checking to see if the schema has already been registered should take place locally before forwarding the request.

This should help latency and decrease sensitivity to partitions, particularly in a multicolo setup.

Change schema registry APIs to use subjects

Changed the APIs to refer to subjects as discussed last week. Also added schema retrieval using a globally unique id. The id is allocated on the master by allocating a batch of ids from zookeeper and using those to assign to schemas during registration.

Simplifying the request in registering schemas

Currently, we use the following http body in the registering schema request.
'{"schema": "{"type": "string"}"}'

Now that we don't support deprecation, we can probably just simplify it to a string (that contains the schema).

Clean up flags in Schema.java

The "compatible" and "latest" boolean flags in Schema class should probably be removed - these values can be derived.

KAFKASTORE_TIMEOUT_CONFIG is too low in VMs

Tests are failing when run in VMs because this value is too low for some operations. Here's the stacktrace:

Running io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClientTest
[2015-02-03 21:47:55,533] INFO SchemaRegistryConfig values:
        port = 41077
        kafkastore.timeout.ms = 500
        kafkastore.write.max.retries = 5
        debug = true
        request.logger.name = io.confluent.rest-utils.requests
        metrics.sample.window.ms = 30000
        kafkastore.zk.session.timeout.ms = 10000
        kafkastore.topic = _schemas
        kafkastore.write.retry.backoff.ms = 100
        avro.compatibility.level = none
        shutdown.graceful.ms = 1000
        response.mediatype.preferred = [application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json]
        metrics.jmx.prefix = kafka.schema.registry
        host.name = localhost
        metric.reporters = []
        kafkastore.commit.interval.ms = -1
        kafkastore.connection.url = 127.0.0.1:60125
        metrics.num.samples = 2
        response.mediatype.default = application/vnd.schemaregistry.v1+json
        kafkastore.topic.replication.factor = 3
 (io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig:135)
[2015-02-03 21:47:56,578] INFO Initialized the consumer offset to -1 (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:80)
[2015-02-03 21:47:56,955] WARN Creating the schema topic _schemas using a replication factor of 1, which is less than the desired one of 3. If this is a production environment, it's crucial to add more brokers and increase the replication factor of the topic. (io.confluent.kafka.schemaregistry.storage.KafkaStore:169)
[2015-02-03 21:47:57,489] INFO [kafka-store-reader-thread-_schemas], Starting  (io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThread:68)
[2015-02-03 21:47:58,009] ERROR Error starting the schema registry (io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication:57)
io.confluent.kafka.schemaregistry.storage.exceptions.SchemaRegistryException: Error while initializing the datastore
        at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:128)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:55)
        at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:37)
        at io.confluent.rest.Application.createServer(Application.java:104)
        at io.confluent.kafka.schemaregistry.RestApp.start(RestApp.java:51)
        at io.confluent.kafka.schemaregistry.ClusterTestHarness.setUp(ClusterTestHarness.java:135)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
        at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
        at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
        at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
        at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
        at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:252)
        at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:141)
        at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
        at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
        at org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
        at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115)
        at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)
Caused by: io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException: io.confluent.kafka.schemaregistry.storage.exceptions.StoreException: Can't fetch latest offset of Kafka topic _schemas after 500 ms
        at io.confluent.kafka.schemaregistry.storage.KafkaStore.init(KafkaStore.java:148)
        at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.init(KafkaSchemaRegistry.java:126)
        ... 35 more
Caused by: io.confluent.kafka.schemaregistry.storage.exceptions.StoreException: Can't fetch latest offset of Kafka topic _schemas after 500 ms
        at io.confluent.kafka.schemaregistry.storage.KafkaStore.getLatestOffsetOfKafkaTopic(KafkaStore.java:347)
        at io.confluent.kafka.schemaregistry.storage.KafkaStore.waitUntilBootstrapCompletes(KafkaStore.java:217)
        at io.confluent.kafka.schemaregistry.storage.KafkaStore.init(KafkaStore.java:146)
        ... 36 more

I was going to just submit a patch to increase it, but it looks like this timeout is used in quite a few places. I suspect the issue here is that the tests rely on topic auto creation and in the VM that is taking longer for some reason.

I think the right solution is probably to add a second timeout option which is used in certain situations, like during KafkaStore.init(), which should be more lax since we sometimes expect it to take longer. Thoughts from people who are more familiar with this code?

fix /topics

Currently, /topics returns a list of the form
["topic/subtype", ...]

instead of listing all Kafka topics.

Use a key serializer in KafkaSchemaRegistry

Methods such as register, get, getAll, getAllVersions manually serialize keys with logic like:
String keyEarliestVersion = topic + "," + earliestVersion;
String keyLatestVersion = topic + "," + latestVersion;

This should probably be done with a key serializer.

Get canonical avro string from an Avro schema

The canonical string form of an Avro schema strips off docs and aliases. schema.toString() doesn't seem to be canonical (e.g., reordering fields give different strings). We may need to implement a full canonical string representation of an Avro schema ourselves.

schema request result format

When fetching a schema by id, it doesn't make sense to return 'version'.

Also note that the 'name' returned when fetching by id will be the name of one of the subjects used when registering the schema.

Fetching a schema by subject/version gives you data in this format:
e.g. GET /subjects/geoff/versions/1
{u'id': 0,
u'name': u'geoff',
u'schema': u'{"type":"record","name":"myrecord","fields":[{"name":"f11","type":"string"}]}',
u'version': 1}

Fetching a schema by id gives the same format:
e.g. GET /schemas/0
{u'id': 0,
u'name': u'geoff',
u'schema': u'{"type":"record","name":"myrecord","fields":[{"name":"f11","type":"string"}]}',
u'version': 1}

Byte, Character, and Short not handled by Avro serializer

These primitive Java types aren't handled by AbstractKafkaAvroSerializer.getSchema() because they don't have direct equivalents in Avro. I think there are two possible solutions:

  1. Don't support them, but document it very clearly in the user documentation. In particular, these may warrant specialized exception messages since it would probably be easy to pass them in by accident, especially if you're using Producer<Object,Object>.
  2. Support them via conversion to Avro types. Maybe Byte -> Bytes, Character -> String and Short -> Integer. These would require additional logic in serializeImpl and clear documentation about the conversions. Ideally in that case the conversion back could be done on the other side, although I'm now realizing the deserializer interface probably can't provide enough type information to accomplish that.

At first I was thinking option 2 would be better, but I think it may end up being too messy and isn't symmetrical on producer vs. consumer, so I think option 1 may be better.

Add proper log4j logging

The schema registry prototype currently doesn't have proper log4j logging. This issue includes fixing that

Embedded storage engine

Current schema registry prototype has a limited capability RocksDB engine that hasn't been unit tested. Stopped at a point where I ran into core dump issues possibly due to gcc issues on my Mac. This needs testing on Linux. This issue includes getting the RocksDb storage engine to work with unit tests

ClientErrorExeption in *Resource classes

Several *Resource.java files throw exceptions of the form:
throw new ClientErrorException(Response.Status.INTERNAL_SERVER_ERROR, e);

ClientErrorException does additional validation to make sure the response status is in the right family -
in our case, this validation fails and causes an additional exception to be thrown because we're instantiating a ClientErrorException with a server error response code.

Avro compatibility

The schema registry prototype includes accepting string schemas and storing those. This issue includes adding the capability of accepting only Avro JSON schemas to the schema registry and registering schemas only if schemas are backwards compatible. Custom compatibility is tracked in a separate issue.

id - change long to int

Using int for id gives enough possible schemas without the extra 4 bytes of overhead incurred by using long.

Make keys comparable

Keys in the Store interface are implicitly comparable (getAll method promises to return keys within a specific range), but there is currently no mechanism for ordering keys.

Add key/value schema awareness

Every topic can have a key schema or value schema registered under the topic. Currently the schema registry prototype doesn't differentiate between key and value schemas - for example, POST /topics//value/versions, POST /topics//key/versions both do the same thing.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.