Giter VIP home page Giter VIP logo

kafka-websocket's Introduction

kafka-websocket

kafka-websocket is a simple websocket server interface to the kafka distributed message broker. It supports clients subscribing to topics, including multiple topics at once, and sending messages to topics. Messages may be either text or binary, the format for each is described below.

A client may produce and consume messages on the same connection.

Consuming from topics

Clients subscribe to topics by specifying them in a query parameter when connecting to kafka-websocket:

/v2/broker/?topics=my_topic,my_other_topic

If no topics are given, the client will not receive messages. The format of messages sent to clients is determined by the subprotocol negotiated: kafka-text or kafka-binary. If no subprotocol is specified, kafka-text is used.

By default, a new, unique group.id is generated per session. The group.id for a consumer can be controlled by passing a group.id as an additional query parameter: ?group.id=my_group_id

Producing to topics

Clients publish to topics by connecting to /v2/broker/ and sending either text or binary messages that include a topic and a message. Text messages may optionally include a key to influence the mapping of messages to partitions. A client need not subscribe to a topic to publish to it.

Message transforms

By default, kafka-websocket will pass messages to and from kafka as is. If your application requires altering messages in transit, for example to add a timestamp field to the body, you can implement a custom transform class. Transforms extend us.b3k.kafka.ws.transforms.Transform and can override the initialize methods, or the transform methods for text and binary messages.

Transforms can be applied to messages received from clients before they are sent to kafka (inputTransform) or to messages received from kafka before they are sent to clients (outputTransform). See conf/server.properties for an example of configuring the transform class.

Binary messages

Binary messages are formatted as:

[topic name length byte][topic name bytes (UTF-8)][message bytes]

Text messages

Text messages are JSON objects with two mandatory attributes: topic and message. They may also include an optional key attribute:

{ "topic" : "my_topic", "message" : "my amazing message" }

{ "topic" : "my_topic", "key" : "my_key123", "message" : "my amazing message" }

Configuration

See property files in conf/

TLS/SSL Transport

kafka-websocket can be configured to support TLS transport between client and server (not from kafka-websocket to kafka). Client certificates can also be used, if desired. Client auth can be set to none, optional, or required, each being, I hope, self-explanatory. See conf/server.properties for various configuration options.

Docker

Build a Docker image using the source code in the working directory:

docker build -t kafka-websocket .

After the Docker image is finished building, run it with:

docker run -it -p 7080:7080 kafka-websocket

License

kafka-websocket is copyright 2014 Benjamin Black, and distributed under the Apache License 2.0.

kafka-websocket's People

Contributors

b avatar davidburhans avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-websocket's Issues

possible fd leak

We have kafka-websocket running behind haproxy with not health checks enabled. I'm watching kafka-websocket repeatedly and steadily march towards it's fd ulimit.

lsof reports lots of stale looking fd's:

java    18052 produser 1185w  FIFO                0,8      0t0 99505498 pipe
java    18052 produser 1186u   REG                0,9        0     3822 [eventpoll]
java    18052 produser 1187u  IPv6           99505499      0t0      TCP 10.0.1.3:49029->10.0.1.34:2181 (ESTABLISHED)
java    18052 produser 1188u  IPv6           99505502      0t0      TCP 10.0.1.3:45571->10.0.1.4:8092 (ESTABLISHED)
java    18052 produser 1189r  FIFO                0,8      0t0 99505501 pipe
java    18052 produser 1190w  FIFO                0,8      0t0 99505501 pipe
java    18052 produser 1191u   REG                0,9        0     3822 [eventpoll]
java    18052 produser 1192r  FIFO                0,8      0t0 99505503 pipe
java    18052 produser 1193w  FIFO                0,8      0t0 99505503 pipe

Can't run docker

docker build gives error related to mvn plugins.

I tried by pulling the image, When running the 'docker run' command on mac, it gives the following error knowing I do have kafka running fine locally.

$ docker run -it -p 7080:7080 timwaizenegger/kafka-websocket
ERROR 19:19:07,928 Failed to start the server: DNS resolution failed for url in bootstrap.servers: kafka:9092

Webserver to Kafka communiction

I am not a java expert. I was curious as to how the webserver talks to Kafka via the consumer ? It seems like the consumer is running on a separate thread ? What was the rationale behind the decision ? Also does it mean there is one consumer per request into the webserver ? Does the solution scale to many many consumers ?

EDIT: reposted as an FAQ wiki, but please feel free to answer here.

Unable to consume topic from browser

Hi,
๐Ÿ‘ for your great project !

Using this sample code, im able to produce messages straight from the browser but not consuming them (onmessage is never fired)

I tried using the console-producer tool from kafka, but got same result.

Im running a really basic config since i just downloaded kafka 0.8.1 (scala 2.10), create a test topic (wstest) and launched the kafka-websocket-jar

No error reported by zookeeper or by the broker.

Here is the complete log from kafka-websocket
http://pastebin.com/S6wiEKzn

high cpu for low traffic topics

Love the websocket gateway. It's allowing us to make use of kafka across the open internet protected in a TLS tunnel.

I'm seeing pretty high CPU utilization for a topic with very, very low message rates ( < 1msg/sec.) Our setup is that we have ~70 clients all in their own consumer group since we want to broadcast messages. I have a feeling the cpu is due to the MaxWait of 100ms and having so many consumer groups.

TRACE 22:54:55,331 [ConsumerFetcherThread-imgix-worker-0062_svab3-1408575091046-704ad865-0-104], issuing to broker 104 of fetch request Name: FetchRequest; Version: 0; CorrelationId: 2028; ClientId: imgix-worker-0062-ConsumerFetcherThread-imgix-worker-0062_svab3-1408575091046-704ad865-0-104; ReplicaId: -1; MaxWait: 100 ms; MinBytes: 1 bytes; RequestInfo: [purge.origin,0] -> PartitionFetchInfo(496,1048576)

Could you expose these consumer config params to the command line or through the URL?

binary consumer, text websocket

Hi, 1st of all, thanks for your great work.
I really like KafkaWebsocket. We are using it for our realtime dashboards. We feed binary AVRO serialized data into Kafka and send it to a Angular based web app.

The issue is, that I receive a Binary data stream (AVRO) and (!) want to send Text data through WebSocket (converted to JSON String in a custom Transform class). I patched KafkaConsumers.sendBinary() to do that. Is there a better way?

Change topics list

Hi,

I am impressed by your project, really useful thing!
When I started thinking about its production use I found out that there was no way to change topics list without reconnection. This feature is supposed to be a feature request. Do you have any ideas/suggestions how to do that?

How can I connect to a secured websocket ?

Hi,

I am able to use this without the ssl using ws:// protocol. Now I need to secure it so changed the properties file and make "ssl: true" , now I am using wss:// protocol but its not connecting. I am using Jquery and Browser Websocket API to connect to Kafka topic.

Can you please share anything which shows ssl connectivity to kafka topic using Websocket api

Lost messages when consumer restart

Hi !
Im producing every second a message on a test topic "topicABC". Kafka-websocket producer is in 'sync' mode.
My consumer has the group.id "test-group" using your client.rb script.
I configured kafka-websocket consumer with the "auto.offset.reset=smallest" option to give the opportunity to my consumer to fetch missing messages if it stops.
Everything works fine except that the consumer always loose 5 or 6 messages (5 or 6 seconds of messages) when it restart.

$ ./client-consumer.rb
[:open]
[:message, "{"topic":"topicABC","message":"Tue May 06 2014 17:56:02 GMT+0200 (CEST)"}"]
[:message, "{"topic":"topicABC","message":"Tue May 06 2014 17:56:03 GMT+0200 (CEST)"}"]
[:message, "{"topic":"topicABC","message":"Tue May 06 2014 17:56:04 GMT+0200 (CEST)"}"]
[:message, "{"topic":"topicABC","message":"Tue May 06 2014 17:56:05 GMT+0200 (CEST)"}"]
[:message, "{"topic":"topicABC","message":"Tue May 06 2014 17:56:06 GMT+0200 (CEST)"}"]
^C

// waiting few seconds to restart

$ ./client-consumer.rb
[:open]
[:message, "{"topic":"topicABC","message":"Tue May 06 2014 17:56:12 GMT+0200 (CEST)"}"]
[:message, "{"topic":"topicABC","message":"Tue May 06 2014 17:56:13 GMT+0200 (CEST)"}"]
[:message, "{"topic":"topicABC","message":"Tue May 06 2014 17:56:14 GMT+0200 (CEST)"}"]
[:message, "{"topic":"topicABC","message":"Tue May 06 2014 17:56:15 GMT+0200 (CEST)"}"]
[:message, "{"topic":"topicABC","message":"Tue May 06 2014 17:56:16 GMT+0200 (CEST)"}"]
^C

Messages at 17:56:07, 08, 09, 10, 11 havent been consumed at all.
I did some debug and KafkaConsumer.java from kafka-websocket doesnt get these messages too
Maybe some stream issue ?

Consumer Client Timeout

I have a Java client subscribing to a topic and the client has the socket closed after 5 minutes. I was expecting my client to have a persistent connection until it ended the session (maybe this is not the intended use for consumer?)The jetty socket default is that time. The service log shows

[DEBUG] 2015-05-21 13:43:50,319 org.eclipse.jetty.io.IdleTimeout checkIdleTimeout - SelectChannelEndPoint@7915e83{/192.168.80.1:63140<->7080,Open,in,out,R,-,300000,WebSocketServerConnection}{io=1,kio=1,kro=1} idle timeout check, elapsed: 300001 ms, remaining: -1 ms
[DEBUG] 2015-05-21 13:43:50,319 org.eclipse.jetty.io.IdleTimeout checkIdleTimeout - SelectChannelEndPoint@7915e83{/192.168.80.1:63140<->7080,Open,in,out,R,-,300000,WebSocketServerConnection}{io=1,kio=1,kro=1} idle timeout expired

I tried adding connector.setIdleTimeout(-1) to KafkaWebsocketServer.java but that did not change the result. The code I added looks like this:

    public void run() {
    try {
        Server server = new Server();
        ServerConnector connector = new ServerConnector(server);
        connector.setPort(Integer.parseInt(wsProps.getProperty("ws.port", DEFAULT_PORT)));
        connector.setIdleTimeout(-1);

Maybe the version of Jetty does not support infinite? I did find this article and the code shows a slightly different way of setting the server timeout but the comment at the bottom indicates what I did as the best result.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.