Giter VIP home page Giter VIP logo

kafka-rest's Introduction

Kafka REST Proxy

The Kafka REST Proxy provides a RESTful interface to a Kafka cluster. It makes it easy to produce and consume data, view the state of the cluster, and perform administrative actions without using the native Kafka protocol or clients. Examples of use cases include reporting data to Kafka from any front-end app built in any language, ingesting data into a stream processing framework that doesn't yet support Kafka, and scripting administrative actions.

Installation

You can download prebuilt versions of the Kafka REST Proxy as part of the Confluent Platform.

You can read our full installation instructions and the complete documentation.

To install from source, follow the instructions in the Development section below.

Deployment

The Kafka REST Proxy includes a built-in Jetty server and can be deployed after being configured to connect to an existing Kafka cluster.

Running mvn clean package runs all 3 of its assembly targets.

  • The development target assembles all necessary dependencies in a kafka-rest/target subfolder without packaging them in a distributable format. The wrapper scripts bin/kafka-rest-start and bin/kafka-rest-stop can then be used to start and stop the service.
  • The package target is meant to be used in shared dependency environments and omits some dependencies expected to be provided externally. It assembles the other dependencies in a kafka-rest/target subfolder as well as in distributable archives. The wrapper scripts bin/kafka-rest-start and bin/kafka-rest-stop can then be used to start and stop the service.
  • The standalone target packages all necessary dependencies as a distributable JAR that can be run as standard (java -jar $base-dir/kafka-rest/target/kafka-rest-X.Y.Z-standalone.jar).

Quickstart (v3 API)

The following assumes you have Kafka and an instance of the REST Proxy running using the default settings and some topics already created.

The v3 API is the latest version of the API. The cluster ID is a path parameter to enable a REST Proxy to work with multiple Kafka clusters. API responses often contain links to related resources, such as the list of a topic's partitions. The content type is always application/json.

Get the local cluster information

$ curl http://localhost:8082/v3/clusters

Response:
  {"kind":"KafkaClusterList",
   "metadata":{"self":"http://localhost:8082/v3/clusters","next":null},
   "data":[
    {"kind":"KafkaCluster",
     "metadata":{"self":"http://localhost:8082/v3/clusters/xFhUvurESIeeCI87SXWR-Q",
     "resource_name":"crn:///kafka=xFhUvurESIeeCI87SXWR-Q"},
     "cluster_id":"xFhUvurESIeeCI87SXWR-Q",
     "controller":{"related":"http://localhost:8082/v3/clusters/xFhUvurESIeeCI87SXWR-Q/brokers/0"},
     "acls":{"related":"http://localhost:8082/v3/clusters/xFhUvurESIeeCI87SXWR-Q/acls"},
     "brokers":{"related":"http://localhost:8082/v3/clusters/xFhUvurESIeeCI87SXWR-Q/brokers"},
     "broker_configs":{"related":"http://localhost:8082/v3/clusters/xFhUvurESIeeCI87SXWR-Q/broker-configs"},
     "consumer_groups":{"related":"http://localhost:8082/v3/clusters/xFhUvurESIeeCI87SXWR-Q/consumer-groups"},
     "topics":{"related":"http://localhost:8082/v3/clusters/xFhUvurESIeeCI87SXWR-Q/topics"},
     "partition_reassignments":{"related":"http://localhost:8082/v3/clusters/xFhUvurESIeeCI87SXWR-Q/topics/-/partitions/-/reassignment"}
    }
   ]
  }

The cluster ID in the output is xFhUvurESIeeCI87SXWR-Q.

Get a list of topics

$ curl http://localhost:8082/v3/clusters/xFhUvurESIeeCI87SXWR-Q/topics

Response:
  {"kind":"KafkaTopicList",
   "metadata":{"self":"http://localhost:8082/v3/clusters/xFhUvurESIeeCI87SXWR-Q/topics","next":null},
   "data":[
    {"kind":"KafkaTopic",
     "metadata":{"self":"http://localhost:8082/v3/clusters/xFhUvurESIeeCI87SXWR-Q/topics/jsontest",
     "resource_name":"crn:///kafka=xFhUvurESIeeCI87SXWR-Q/topic=jsontest"},
     "cluster_id":"xFhUvurESIeeCI87SXWR-Q",
     "topic_name":"jsontest",
     "is_internal":false,
     "replication_factor":1,
     "partitions_count":1,
     "partitions":{"related":"http://localhost:8082/v3/clusters/xFhUvurESIeeCI87SXWR-Q/topics/jsontest/partitions"},
     "configs":{"related":"http://localhost:8082/v3/clusters/xFhUvurESIeeCI87SXWR-Q/topics/jsontest/configs"},
     "partition_reassignments":{"related":"http://localhost:8082/v3/clusters/xFhUvurESIeeCI87SXWR-Q/topics/jsontest/partitions/-/reassignment"}
    }
   ]
  }

Create a topic

$ curl -X POST -H "Content-Type:application/json" -d '{"topic_name":"jsontest"}' \
       http://localhost:8082/v3/clusters/xFhUvurESIeeCI87SXWR-Q/topics

Response:
  {"kind":"KafkaTopic",
   "metadata":{"self":"http://localhost:8082/v3/clusters/xFhUvurESIeeCI87SXWR-Q/topics/jsontest",
   "resource_name":"crn:///kafka=xFhUvurESIeeCI87SXWR-Q/topic=jsontest"},
   "cluster_id":"xFhUvurESIeeCI87SXWR-Q",
   "topic_name":"jsontest",
   "is_internal":false,
   "replication_factor":1,
   "partitions_count":1,
   "partitions":{"related":"http://localhost:8082/v3/clusters/xFhUvurESIeeCI87SXWR-Q/topics/jsontest/partitions"},
   "configs":{"related":"http://localhost:8082/v3/clusters/xFhUvurESIeeCI87SXWR-Q/topics/jsontest/configs"},
   "partition_reassignments":{"related":"http://localhost:8082/v3/clusters/xFhUvurESIeeCI87SXWR-Q/topics/jsontest/partitions/-/reassignment"}
  }

Produce records with JSON data

$ curl -X POST -H "Content-Type: application/json" \
       -d '{"value":{"type":"JSON","data":{"name":"testUser"}}}' \
       http://localhost:8082/v3/clusters/xFhUvurESIeeCI87SXWR-Q/topics/jsontest/records

Response:
  {"error_code":200,
   "cluster_id":"xFhUvurESIeeCI87SXWR-Q",
   "topic_name":"jsontest",
   "partition_id":0,
   "offset":0,
   "timestamp":"2023-03-09T14:07:23.592Z",
   "value":{"type":"JSON","size":19}
  }

In the response, the error_code of 200 is an HTTP status code (OK) which indicates the operation was successful. Because you can use this API to stream multiple records into a topic as part of the same request, each record produced has its own error code. To send multiple records, simply concatentate the records like this:

$ curl -X POST -H "Content-Type: application/json" \
       -d '{"value":{"type":"JSON","data":"ONE"}} {"value":{"type":"JSON","data":"TWO"}}' \
       http://localhost:8082/v3/clusters/xFhUvurESIeeCI87SXWR-Q/topics/jsontest/records

Response:
  {"error_code":200,
   "cluster_id":"xFhUvurESIeeCI87SXWR-Q",
   "topic_name":"jsontest",
   "partition_id":0,
   "offset":1,
   "timestamp":"2023-03-09T14:07:23.592Z",
   "value":{"type":"JSON","size":5}
  }
  {"error_code":200,
   "cluster_id":"xFhUvurESIeeCI87SXWR-Q",
   "topic_name":"jsontest",
   "partition_id":0,
   "offset":2,
   "timestamp":"2023-03-09T14:07:23.592Z",
   "value":{"type":"JSON","size":5}
  }

Produce records with string data

$ curl -X POST -H "Content-Type: application/json" \
       -d '{"value":{"type":"STRING","data":"REST"}}' \
       http://localhost:8082/v3/clusters/xFhUvurESIeeCI87SXWR-Q/topics/jsontest/records

Response:
  {"error_code":200,
   "cluster_id":"xFhUvurESIeeCI87SXWR-Q",
   "topic_name":"jsontest",
   "partition_id":0,
   "offset":2,
   "timestamp":"2023-03-09T14:07:23.592Z",
   "value":{"type":"STRING","size":4}
  }

The data is treated as a string in UTF-8 encoding and follows JSON rules for escaping special characters.

Produce records in a batch

As an alternative to streaming mode, you can produce multiple records in a batch. This is not streaming, but it is easier to use with HTTP libraries that expect a straightforward request-response behavior.

Each entry in the batch has a unique identifier (a string of up to 80 characters) which can be used to correlate the responses. The identifiers of the entries in a batch must be unique.

$ curl -X POST -H "Content-Type: application/json" \
       -d '{"entries":[{"id":"first","value":{"type":"JSON","data":"ONE"}}, {"id":"second","value":{"type":"JSON","data":"TWO"}}]}' \
       http://localhost:8082/v3/clusters/xFhUvurESIeeCI87SXWR-Q/topics/jsontest/records:batch

Response:
  {"successes":[
    {"id":"first",
     "cluster_id":"xFhUvurESIeeCI87SXWR-Q",
     "topic_name":"jsontest",
     "partition_id":0,
     "offset":3,
     "timestamp":"2023-03-09T14:07:23.592Z",
     "value":{"type":"JSON","size":5}
    },
    {"id":"second",
     "cluster_id":"xFhUvurESIeeCI87SXWR-Q",
     "topic_name":"jsontest",
     "partition_id":0,
     "offset":4,
     "timestamp":"2023-03-09T14:07:23.592Z",
     "value":{"type":"JSON","size":5}
    }
   ],
   "failures":[]
  }

Successes and failures are returned in the response in separate arrays like this:

{
  "successes": [
    {
      "id": "1",
      "cluster_id": "xFhUvurESIeeCI87SXWR-Q",
      "topic_name": "jsontest",
      "partition_id": 0,
      "offset": 5,
      "timestamp": "2023-03-09T14:07:23.592Z",
      "value": {
        "type": "STRING",
        "size": 7
      }
    }
  ],
  "failures": [
    {
      "id": "2",
      "error_code": 400,
      "message": "Bad Request: data=\"Message$\" is not a valid base64 string."
    }
  ]
}

Quickstart (v2 API)

The earlier v2 API is a bit more concise.

Get a list of topics

$ curl http://localhost:8082/topics
  
Response:
  ["__consumer_offsets","jsontest"]

Get info about one topic

$ curl http://localhost:8082/topics/jsontest

Response:
  {"name":"jsontest",
   "configs":{},
   "partitions":[
    {"partition":0,
     "leader":0,
     "replicas":[
      {"broker":0,
       "leader":true,
       "in_sync":true
      }
     ]
    }
   ]
  }

Produce records with JSON data

$ curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
       -d '{"records":[{"value":{"name": "testUser"}}]}' \
       http://localhost:8082/topics/jsontest

Response:
  {"offsets":[
    {"partition":0,
     "offset":0,
     "error_code":null,
     "error":null
    }
   ],
   "key_schema_id":null,
   "value_schema_id":null
  }

Consume JSON data

First, create a consumer for JSON data, starting at the beginning of the topic. The consumer group is called my_json_consumer and the instance is my_consumer_instance.

$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" -H "Accept: application/vnd.kafka.v2+json" \
       -d '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}' \
       http://localhost:8082/consumers/my_json_consumer

Response:
  {"instance_id":"my_consumer_instance",
   "base_uri":"http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance"
  }

Subscribe the consumer to a topic.

$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
       -d '{"topics":["jsontest"]}' \
      http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription

Response:
  # No content in response

Then consume some data from a topic using the base URL in the first response.

$ curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
       http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance/records

Response:
  [
   {"key":null,
    "value":{"name":"testUser"},
    "partition":0,
    "offset":0,
    "topic":"jsontest"
   }
  ]

Finally, close the consumer with a DELETE to make it leave the group and clean up its resources.

$ curl -X DELETE -H "Accept: application/vnd.kafka.v2+json" \
       http://localhost:8082/consumers/my_json_consumer/instances/my_consumer_instance

Response:
  # No content in response

Development

To build a development version, you may need development versions of common, rest-utils, and schema-registry. After installing these, you can build the Kafka REST Proxy with Maven. All the standard lifecycle phases work.

You can avoid building development versions of dependencies by building on the latest (or earlier) release tag, or <release>-post branch, which will reference dependencies available pre-built from the public repository. For example, branch 7.3.0-post can be used as a base for patches for this version.

Contribute

License

This project is licensed under the Confluent Community License.

kafka-rest's People

Contributors

agg111 avatar ahuang98 avatar andrewegel avatar andrewjschofield avatar confluentjenkins avatar confluentsemaphore avatar dimitarndimitrov avatar ehumber avatar elismaga avatar ewencp avatar gwenshap avatar hachikuji avatar henrij avatar hqin avatar ijuma avatar joel-hamill avatar mageshn avatar maxzheng avatar msn-tldr avatar niteshmor avatar norwood avatar omkreddy avatar rayokota avatar rigelbm avatar rnpridgeon avatar stanislavkozlovski avatar trnguyencflt avatar xiangxin72 avatar xjin-confluent avatar xli1996 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-rest's Issues

Fix importance levels for configs

Some of the importance levels for the configs aren't set right. For example, the mediatype is set to high, but should really be low since users generally shouldn't set this value -- it was set to high in rest-utils because any rest-utils application should override it with a value specific for that application. The rest of the settings could also use review (and some in rest-utils may need updates as well).

Support Avro schemas as JSON strings or directly embedded JSON

Depending on the use case, both can be useful -- strings if that's how the schema is stored in your app, directly embedded JSON if that's a convenient way to store your schema (e.g. in languages like Python) or for more ad hoc usage.

We should be able to do this by using JsonNode. It will either be a TextNode that we can use the value from directly or it'll be an ObjectNode and we can just reserialize it before passing it to Avro for parsing. This is a bit wasteful, but this should be a relatively rare case anyway since most of the time clients should be using schema IDs.

Rename ProduceRecord

The current naming is confusing since Kafka uses the name ProducerRecord. See #16 for the original discussion.

Possible alternatives: RestProduceRecord, ProduceItem, ProduceMessage

Add basic metrics

We should be reporting basic metrics about the API server via JMX -- things like request throughput, latency, etc.

Override default port

We're currently using the default port from rest-utils, which results in conflicts with other rest-utils projects, e.g. confluentinc/schema-registry#103. We should either override to something different and/or change the rest-utils default. Changing rest-utils might actually be the simpler solution since it means we don't also have to make changes in any repositories that already rely on the 8080 address.

Get java.lang.NoSuchMethodError: scala.collection.JavaConversions.asScalaIterable when calling the /topics/:topic endpoint

Whenever I call the topics/:topic endpoint I get the following error

java.lang.NoSuchMethodError: scala.collection.JavaConversions.asScalaIterable(Ljava/util/Collection;)Lscala/collection/Iterable;
    at io.confluent.kafkarest.MetadataObserver.getTopic(MetadataObserver.java:91)
    at io.confluent.kafkarest.resources.TopicsResource.getTopic(TopicsResource.java:70)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483) 
       .....(stack trace continues)

From some preliminary research it appear this could be due to differences between the compile time version of scala and the runtime.

http://stackoverflow.com/questions/4393946/nosuchmethoderror-when-attempting-to-implicitly-convert-a-java-to-scala-collecti

I'm running on Centos. Here's the output of java -version

java version "1.8.0_05"
Java(TM) SE Runtime Environment (build 1.8.0_05-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.5-b02, mixed mode)

I installed the package confluent-platform-2.11.5

Empty message array when creating consumer followed by immediate read

I'm working on a simple web ui to read messages from a topic so my colleagues don't need to issue curl requests by hand. While futzing around with the code, I noticed that if I create a consumer on every page visit (in the same consumer group) I hit a upper bound where I can continue creating consumers and reading from the topic but cannot get any messages from the topic. Simply waiting a while seems to clear this condition.

Consumers are created using this data: {"format": "binary", "auto.offset.reset": "smallest"}

The process is something like this on page visit:

  1. Create consumer in mygroup
  2. Read from mytopic
  3. Messages retrieved
  4. Refresh page in an attempt to get the new messages (Repeats 1 & 2)
  5. Empty array returned even though I'm using a new consumer

Am I missing some finer point of consumers/consumer groups?

Reduce time required to run tests

The tests take a long time to run. Some of this overhead is unavoidable with the integration tests. However, there's probably some low-hanging fruit in ClusterTestHarness that might make a big difference:

  • Default number of brokers is 3. This is probably overkill for almost every test.
  • Might be able to pull up/tear down brokers in parallel, although it would be worth checking how long they take to start up/tear down before trying to make the parallel version work.
  • Schema registry only needs to be started if we're going to use Avro. Only a couple of the tests need Avro support.

Support setting any consumer configs when creating a new consumer

Currently auto.offset.reset and auto.commit.enable are handled manually, but we could just handle these in a generic way. These also need to work well with configs set on the REST proxy, i.e. we need to support settings in create consumer request > settings to override default consumer configs set by REST proxy operator > default consumer configs from Kafka.

One argument against doing this is that consumer config properties are likely to change with the new consumer implementation, which may make it harder for users to know what properties they should set. Carefully exposing a more limited subset a) keeps the API simpler and b) makes maintaining compatibility easier for both us and users.

Support for JSON messages

I was wondering if there was any possibility of adding support for messages in formats beyond just Avro/Base64. For my use case (and likely many others), only strings are passed back and forth, so encoding to Base64 seems like overkill. Having to encode in Base64 isn't a huge pain point, but just seems unnecessary. Apologies if this is already supported and I just didn't find it in the documentation since I just started working with this today! Looks like it will be quite useful regardless. :)

Allow per-request override of consumer.request.timeout.ms

If a particular client needs lower latency, it would be nice if it could override the consumer.request.timeout.ms setting on a per-request basis so batching can still be encouraged with a higher consumer.request.timeout.ms setting.

This will also require an update to the operations documentation to note that it can be overridden as well as an additional query parameter in the API docs.

Add advertised host name support

Consumers return absolute URIs, so we should have a config to control the host name used in this URI since it can't always be determined correctly automatically.

To stay consistent with schema-registry, the option should be "host.name" rather than "advertised.host.name" as used by Kafka.

Add support for compression, other global producer configs

Settings like compression can probably be configured globally just by exposing the producer config option as one of the REST proxy's config options and passing it along to both the binary and avro producers. The implementation should be trivial; the primary question is how we want to expose these options, e.g. by prefixing all the producer options with "producer." and forwarding everything, by allowing options to pass-through if they aren't processed by the KafkaRestConfig, or maybe some other approach.

Handle exceptions and convert to RestExceptions

We need to handle exceptions and properly convert them to RestExceptions. This includes

  • Async produce callbacks should check Exception types and convert them. New producer exceptions were modeled well so this should be straightforward
  • Consumer exceptions, which need to trigger errors in consume task to trigger the response.
  • All the metadata APIs, which probably mostly means catching ZkClient exceptions

Add support for SimpleConsumer-like access

Sometimes apps might not want to be part of a consumer group, or might have trivial consumption patterns such as consuming from a single partition. We can probably expose this pretty cleanly with URLs like GET /topics/foo/partitions/0/messages?offset=100[&count=100], although specifying per-partition offsets at the topic level may get messy.

Fix quickstart

Reviewing the overall product docs, I think there needs to be a certain continuation to the individual product docs from the global docs. For example, after going through the Confluent quick start [http://y2zjnzc0owi5nmy2m2jkmzfjm2m0mmi1yzq3mwjmnzu2ode0.s3-website-us-west-2.amazonaws.com/docs/quickstart.html] that assumes you've started the service, I expect to just see the steps to try out some features/APIs.

Specifically, let's separate the installation from the quick start. The quick start should only go through the APIs and not starting the service. The installation section should have the package install and service start including starting the dependent services.

Failing silently inside docker container

I've trying to run the rest proxy inside a docker container using the confluent/platform image. I'm using docker compose to expose the zookeper and schema registry services running in other containers. I can start the rest service and it runs for a brief moment before failing. Here's the output to the console

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/share/java/kafka/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/java/confluent-common/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/share/java/kafka-rest/slf4j-log4j12-1.7.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]

I'm running the service with /etc/kafka-rest# kafka-rest-start /etc/kafka-rest/kafka-rest.properties

My properties file looks like this

schema.registry.url=http://schemaRegistry:8081
zookeeper.connect=zookeeper:2181
id=1

I've tested those url's from inside of the container and they are correct. Note I am using the confluentinc images for zookeeper and the schema-registry.

Debian packages are missing files in /etc/kafka-rest

kafka-rest.properties and log4j.properties should be included. They made it into the RPM packages (which are generated from the Maven package step where they are getting into the right place in the layout). So this looks like something got lost when merging the debian branch such that the file is no longer copied, but somehow the directory is still created.

Improve handling of 404s for topics

Currently all endpoints that will interact with a topic explicitly check for the topics existence. However, for some endpoints, e.g. producing messages, this doesn't produce the expected behavior with auto.topics.create.enable=true. In some cases it probably makes sense to check directly and avoid creation (e.g. GET /topic/foo) whereas others should handle any errors that are generated dynamically (e.g. POST /topic/foo with auto.topics.create.enable=false).

Allow the consumer to return Avro JSON according to a specifc schema

Currently, the consumer outputs version-specific JSON for Avro data. It would be useful to have the output bound to a particular version of the Avro schema so that they all look the same (assuming all versions are compatible to the specified schema). We have to specify the schema at the topic level.

Add basic event logging

The server should, at a minimum, include basic event logging. Especially important are unhandled exceptions (GenericExceptionMapper) and we should probably also be doing simple per-request logging.

Invalid produce request missing envelope generates 422 with no error message

See https://groups.google.com/d/msg/confluent-platform/wQzIMt_xoiI/8QeMjAzGlzkJ

If you send a produce request using Avro where the body looks like:

{ "value_schema_id": 123, "records": [{"my_field": 12}]}

but the format should be

{ "value_schema_id": 123, "records": [{"value": {"my_field": 12}}]}

since an envelope is required to avoid any ambiguity with cases where you include a key or partition, you end up getting a 422 (correctly), but it does not include a useful error message.

This 422 should be generated when Jackson tries to decode the request body, so I'm not sure exactly how we could improve this since we normally expect Jackson to provide some sort of error message with the ConstraintViolationException. We need to first track down exactly how it is failing since it could still be under our control, e.g. via JacksonMessageBodyProvider in rest-utils.

Avro producer should reuse Avro Schema Parser

We create a new schema parser for every request (actually 2, one for key and one for value). This isn't expensive, but if we could reuse parsers then we would be able to take advantage of the internal caching Avro does, which is probably good both for performance and to avoid generating even more garbage for each request.

However, if we do this it needs to be done carefully. The cache doesn't allow different schemas with the same name and throws an exception if it sees that. However, that is valid for us since we may encounter different versions of the same schema, or even just have the same name used for schemas on different topics with completely different formats. We'd need to do something like catch the exception and then cycle the parser, but even in that case, if there are periods where 2 versions consistently appear, we'd probably end up falling back to performance like we get now since every request could use a version different from the one cached on the last request.

Some requests can block forever waiting for ZooKeeper

This is basically the same issue as https://issues.apache.org/jira/browse/KAFKA-1907 -- the ZkClient library doesn't have timeouts which means a lot of operations just block forever if they can't connect to ZooKeeper. For kafka-rest, this affects all the metadata APIs, e.g. listing and getting individual brokers, topics, partitions.

A fix for KAFKA-1907 will probably address the problem. It probably doesn't make sense to try to workaround this in kafka-rest since it should be rare, it's expected that ZK will eventually come back and requests can complete, and doing so would probably just duplicate the wrapper or patches to ZkClient that are going to provide support for timeouts. So mostly this is here to serve as a reminder and to track any changes that might be necessary if we need to change how we interact with ZkClient/ZkUtils.

Consumer read request hangs if you try to create two consumer instances with the same ID

If you:

  1. Create consumer with an ID
  2. Read from a topic
  3. Create consumer with same ID (don't delete previous one)
  4. Read from topic
    the second create call succeeds and the read hangs. The stack trace shows it stuck trying to create a zookeeper node in a method that can also handle conflicts, which we'd expect since the IDs are the same.

Turns out that we ended up conflating the meaning of "ID", which is used both to name the consumer nicely so you have readable consumer URLs and it got passed in as the ID for the underlying old Kafka consumer. The latter should really only be happening for debugging purposes since a random UUID (generated automatically for the consumer if the ID is omitted) is way more reliable.

Things get a bit more confusing when we consider moving to the new consumer, which has a separate client ID that it uses to make logging on the brokers clearer and traceable to specific applications.

A fix for this has to be careful about compatibility. I think the right thing to do is add a new parameter ("name") which will be used in the URL. If the "id" is specified and "name" isn't, "name" will default to "id". The underlying Kafka "consumer.id" will only be set if "id" was explicitly set. Then, the name is only relevant to the single instance of the REST proxy since consumer URLs are specific to a single process. We can return a 409 if we see a naming conflict during creation, which will prevent the subsequent hang during read. Finally, we'll want to deprecate and deemphasize the "id" field in the docs since "name" will now be the field they really should be using.

Unable to deserialise avro on kafka REST

Hi,

I am not sure whether there is a bug or there is some misconfiguration and looking through the code briefly I did find the problem.

I am creating a consumer as in the Quickstart example:

curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" --data '{"id": "my_instance", "format": "avro"}' http://kafka-0.internal:8082/consumers/my_avro_consumer

Then I am trying to consume some messages:

curl -X GET -H "Accept: application/vnd.kafka.avro.v1+json" http://kafka-0.internal:8082/consumers/my_avro_consumer/instances/my_instance/topics/valid-events

But I get the error:

{"error_code":50002,"message":"Kafka error: Error deserializing Avro message for id -1"}

The console consumer can deserialise events from the same topic just fine and I know I used to get this error when the avro messages were not properly formatted (with the magic byte and the schema id from the registry), but after fixing those and getting them to work with the avro console consumer, I am still unable to use the REST consumer.

Can anyone give me any insights as to why this may be?

TIA

Add regex consumer subscriptions

Besides supporting multiple topics as specified in #35, we should also support regex subscriptions. This would add another field to the consumer creation request where the regex could be specified. Subsequent reads would be from /consumers/{group}/instances/{instance}/topics/ and would have a slightly different response format since they would include the topic for each message as well as the existing key, value, partition, offset info.

This could actually cover the use case of #35 since you can just subscribe to n specific topics via the regex. However, we probably want to implement them separately since doing multi-topic subscriptions via regex isn't necessarily intuitive. Even if we implement the subscriptions differently, it would be good to support reading from /consumers/{group}/instances/{instance}/topics/ regardless of the number or type of subscriptions.

Long polling or web sockets

Am I correct to understand that I can't really use this proxy to actually subscribe to a topic and get messages as they arrive? That's pretty sad.

I think it would be pretty great if you could use web sockets or long polling to provide the ability to consume a stream as messages come in.

Add TLS support

I know we can run kafka-rest behind a secure proxy / load balancer, but in many organizations TLS is also required between the secure proxy / load balancer and the actual endpoint. Also, in some simple deployments there may not be a front-end gateway / load balancer.

I believe this would need to be implemented in rest-utils but would need to be exposed here

Multi-topic produce requests

A single request should be able to produce to multiple topics, just like a single request can post to a topic and publish to specific partitions. Implementing this should be straightforward, but we need to figure out which resource the request should be POSTed to.

review comments on the doc

The following are my review comments on doc.

1. curl -X POST -H "Content-Type: application/vnd.kafka.v1+json" \
      --data '{"records":[{"value":"S2Fma2E="}]}' "http://localhost:8080/topics/test"

It returns the null schema id. Shouldn't that only happen to Avro data?
{"offsets":[{"partition":0,"offset":0}],"key_schema_id":null,"value_schema_id":null}

2. Installation, should we change the following commands to use bin/kafka-rest-start ?

$ java io.confluent.kafkarest.Main [server.properties]
mvn exec:java

3. Under Errors, an request entity => a request entity

4. GET /topics/{topic_name}/partitions
The example uses 
GET /topics/test/partitions/1

We should remove /1.

Also, the topic test was created with only 1 partition earlier and the example shows 2 partitions. Also, partitions start from 0, instead of 1.

5. To send Avro data, we need to mention that we need to start schema registry server first.

6. Example Avro response. The following response misses the key_schema_id and value_schema_id.
{
  "value_schema_id": 32,
  "offsets": [
    {
      "partition": 1,
      "offset": 100,
    },
    {
      "partition": 1,
      "offset": 101,
    }
  ]
}

7. The following should probably be smallest/largest, instead of true.
"auto.offset.reset": "true"

8. The following response format is incorrect for 
GET /consumers/testgroup/instances/my_consumer/topics/test_topic. 

[
  {
    "topic": "test",
    "partition": 1,
    "consumed": 100,
    "committed": 100
  },
  {
    "topic": "test",
    "partition": 2,
    "consumed": 200,
    "committed": 200
  },
  {
    "topic": "test2",
    "partition": 1,
    "consumed": 50,
    "committed": 50
  }
]

It should be sth like [{"key":"AAAAAAE=","value":"AAAAAAAY","partition":0,"offset":0},{"key":"AAAAAAE=","value":"AAAAAAAa","partition":0,"offset":1}].

9. We need to make it clear that the same consumer instance can only consumer a single topic (i.e., can't change to a different topic later).

10. Producer: should we allow auto topic creation on the producer side?

11. Need to change
        "value_schema": "{\"name\":\"int\",\"type\": \"int\"}";",
to
        "value_schema": "{\"name\":\"int\",\"type\": \"int\"}",

Catch MessageStreamsExistException from ConsumerConnector.createMessageStreams

When ConsumersResource.readTopic is invoked, it calls createMessageStreams if there isn't already a stream. However, if the consumer has already invoked the method for a different topic, it will throw a MessageStreamsExistException. Currently this isn't being caught, so it'll be turned into a 500. We should catch it and turn it into a more useful error code.

This will also require updating the docs since it'll introduce a new error code.

Option for Dynamic Incrementing ID for Rest Proxy

Per the documentation:

id
Unique ID for this REST server instance. This is used in generating unique IDs for consumers that do not specify their ID. The ID is empty by default, which makes a single server setup easier to get up and running, but is not safe for multi-server deployments where automatic consumer IDs are used

The thought behind dynamic or incrementing IDs (perhaps kept straight in zookeeper) is that with Mesos, I can run the Rest Proxy in Marathon. Since I am using one common config, each instance would need to use a "new id" (essentially a new config file for each instance) If we instead had an option to use prefix(incrementingid) then we could just scale up and down with a single config on Mesos. Scaling would be automatic, rather than manual. It makes each instance of the rest-proxy truly independent. (seems to be a good goal). I know Mesos support isn't really a huge push, nor is "making it easier for mesos", however I think the idea behind making this just fire and forget on scaling could help folks even without Mesos.

Support multi-topic consumer subscriptions

Currently you can only subscribe to one topic because the old consumer only supports making one call to ConsumerConnector.createMessageStreams() and the REST proxy only subscribes to topics and calls that method when GET /consumer/{group}/instances/{instance}/topics/{topic} is called. It should also support specifying a set of topics up front in the initial consumer creation call to support multi-topic consumers. This should be optional and the old behavior should still work -- not specifying topics or even providing a null request entity body should still allow implicit subscriptions when the first topic is read.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.