Giter VIP home page Giter VIP logo

message_bus's Introduction

MessageBus

A reliable, robust messaging bus for Ruby processes and web clients.

MessageBus implements a Server to Server channel based protocol and Server to Web Client protocol (using polling, long-polling or long-polling + streaming)

Since long-polling is implemented using Rack Hijack and Thin::Async, all common Ruby web servers (Thin, Puma, Unicorn, Passenger) can run MessageBus and handle a large number of concurrent connections that wait on messages.

MessageBus is implemented as Rack middleware and can be used by any Rails / Sinatra or pure Rack application.

Read the generated docs: https://www.rubydoc.info/gems/message_bus

Ruby version support

MessageBus only support officially supported versions of Ruby; as of 2021-03-31 this means we only support Ruby version 2.6 and up.

Can you handle concurrent requests?

Yes, MessageBus uses Rack Hijack and this interface allows us to take control of the underlying socket. MessageBus can handle thousands of concurrent long polls on all popular Ruby webservers. MessageBus runs as middleware in your Rack (or by extension Rails) application and does not require a dedicated server. Background work is minimized to ensure it does not interfere with existing non-MessageBus traffic.

Is this used in production at scale?

Yes, MessageBus was extracted out of Discourse and is used in thousands of production Discourse sites at scale.

Installation

Add this line to your application's Gemfile:

gem 'message_bus'

And then execute:

$ bundle

Or install it yourself as:

$ gem install message_bus

Usage

Server to Server messaging

message_id = MessageBus.publish "/channel", "message"

# in another process / spot

MessageBus.subscribe "/channel" do |msg|
  # block called in a background thread when message is received
end

# subscribe to channel and receive the entire backlog
MessageBus.subscribe "/channel", 0 do |msg|
  # block called in a background thread when message is received
end

# subscribe to channel and receive the backlog starting at message 6
MessageBus.subscribe "/channel", 5 do |msg|
  # block called in a background thread when message is received
end
# get the ID of the last message on a channel
id = MessageBus.last_id("/channel")

# returns all messages after some id
MessageBus.backlog "/channel", id

Targeted messages

Messages can be targeted to particular clients by supplying the client_ids option when publishing a message.

MessageBus.publish "/channel", "hello", client_ids: ["XXX", "YYY"] # (using MessageBus.clientId)

By configuring the user_id_lookup and group_ids_lookup options with a Proc or Lambda which will be called with a Rack specification environment, messages can be targeted to particular clients users or groups by supplying either the user_ids or group_ids options when publishing a message.

MessageBus.configure(user_id_lookup: proc do |env|
  # this lookup occurs on JS-client polling, so that server can retrieve backlog
  # for the client considering/matching/filtering user_ids set on published messages
  # if user_id is not set on publish time, any user_id returned here will receive the message
  # return the user id here
end)

# Target user_ids when publishing a message
MessageBus.publish "/channel", "hello", user_ids: [1, 2, 3]

MessageBus.configure(group_ids_lookup: proc do |env|
  # return the group ids the user belongs to
  # can be nil or []
end)

# Target group_ids when publishing a message
MessageBus.publish "/channel", "hello", group_ids: [1, 2, 3]

# example of MessageBus to set user_ids from an initializer in Rails and Devise:
# config/initializers/message_bus.rb
MessageBus.user_id_lookup do |env|
  req = Rack::Request.new(env)

  if req.session && req.session["warden.user.user.key"] && req.session["warden.user.user.key"][0][0]
    user = User.find(req.session["warden.user.user.key"][0][0])
    user.id
  end
end

If both user_ids and group_ids options are supplied when publishing a message, the message will be targeted at clients with lookup return values that matches on either the user_ids or the group_ids options.

MessageBus.publish "/channel", "hello", user_ids: [1, 2, 3], group_ids: [1, 2, 3]

If the client_ids option is supplied with either the user_ids or group_ids options when publishing a message, the client_ids option will be applied unconditionally and messages will be filtered further using user_id or group_id clauses.

MessageBus.publish "/channel", "hello", client_ids: ["XXX", "YYY"], user_ids: [1, 2, 3], group_ids: [1, 2, 3]

Passing nil or [] to either client_ids, user_ids or group_ids is equivalent to allowing all values on each option.

Filtering Client Messages

Custom client message filters can be registered via MessageBus#register_client_message_filter. This can be useful for filtering away messages from the client based on the message's payload.

For example, ensuring that only messages seen by the server in the last 20 seconds are published to the client:

MessageBus.register_client_message_filter('/test') do |message|
  (Time.now.to_i - message.data[:published_at]) <= 20
end

MessageBus.publish('/test/5', { data: "somedata", published_at: Time.now.to_i })

Error handling

MessageBus.configure(on_middleware_error: proc do |env, e|
   # If you wish to add special handling based on error
   # return a rack result array: [status, headers, body]
   # If you just want to pass it on return nil
end)

Disabling message_bus

In certain cases, it is undesirable for message_bus to start up on application start, for example in a Rails application during the db:create rake task when using the Postgres backend (which will error trying to connect to the non-existent database to subscribe). You can invoke MessageBus.off before the middleware chain is loaded in order to prevent subscriptions and publications from happening; in a Rails app you might do this in an initializer based on some environment variable or some other conditional means. If you want to just disable subscribing to the bus but want to continue to allow publications to be made, you can do MessageBus.off(disable_publish: false).

Debugging

When setting up MessageBus, it's useful to manually inspect channels before integrating a client application.

You can curl MessageBus; this is helpful when trying to debug what may be going wrong. This example uses https://chat.samsaffron.com:

curl -H "Content-Type: application/x-www-form-urlencoded" -X POST --data "/message=0" https://chat.samsaffron.com/message-bus/client-id/poll\?dlp\=t

You should see a reply with the messages of that channel you requested (in this case /message) starting at the message ID you requested (0). The URL parameter dlp=t disables long-polling: we do not want this request to stay open.

Transport

MessageBus ships with 3 transport mechanisms.

  1. Long Polling with chunked encoding (streaming)
  2. Long Polling
  3. Polling

Long Polling with chunked encoding allows a single connection to stream multiple messages to a client, and this requires HTTP/1.1.

Chunked encoding provides all the benefits of EventSource with greater browser support (as it works on IE10 and up as well)

To setup NGINX to proxy to your app correctly be sure to enable HTTP1.1 and disable buffering:

location /message-bus/ {
  ...
  proxy_http_version 1.1;
  proxy_buffering off;
  ...
}

NOTE: do not set proxy_buffering off globally, it may have unintended consequences.

In order to disable chunked encoding for a specific client in Javascript:

MessageBus.enableChunkedEncoding = false;

or as a server-side policy in Ruby for all clients:

MessageBus.configure(chunked_encoding_enabled: false)

Long Polling requires no special setup; as soon as new data arrives on the channel the server delivers the data and closes the connection.

Polling also requires no special setup; MessageBus will fallback to polling after a tab becomes inactive and remains inactive for a period.

Multisite support

MessageBus can be used in an environment that hosts multiple sites by multiplexing channels. To use this mode:

# define a site_id lookup method, which is executed
# when `MessageBus.publish` is called
MessageBus.configure(site_id_lookup: proc do
  some_method_that_returns_site_id_string
end)

# you may post messages just to this site
MessageBus.publish "/channel", "some message"

# you can also choose to pass the `:site_id`.
# This takes precedence over whatever `site_id_lookup`
# returns
MessageBus.publish "/channel", "some message", site_id: "site-id"

# you may publish messages to ALL sites using the /global/ prefix
MessageBus.publish "/global/channel", "will go to all sites"

Client support

JavaScript Client

MessageBus ships a simple ~300 line JavaScript library which provides an API to interact with the server.

JavaScript clients can listen on any channel and receive messages via polling or long polling. You may simply include the source file (located in assets/ within the message_bus source code):

<script src="message-bus.js" type="text/javascript"></script>

or when used in a Rails application, import it through the asset pipeline:

//= require message-bus

In your application Javascript, you can then subscribe to particular channels and define callback functions to be executed when messages are received:

MessageBus.start(); // call once at startup

// how often do you want the callback to fire in ms
MessageBus.callbackInterval = 500;

// you will get all new messages sent to channel
MessageBus.subscribe("/channel", function (data) {
  // data shipped from server
});

// you will get all new messages sent to channel (-1 is implicit)
MessageBus.subscribe("/channel", function(data){
  // data shipped from server
}, -1);

// all messages AFTER message id 7 AND all new messages
MessageBus.subscribe("/channel", function(data){
  // data shipped from server
}, 7);

// last 2 messages in channel AND all new messages
MessageBus.subscribe("/channel", function(data){
  // data shipped from server
}, -3);

// you will get the entire backlog
MessageBus.subscribe("/channel", function(data){
  // data shipped from server
}, 0);

JavaScript Client settings

All client settings are settable via MessageBus.OPTION

Setting Default Info
enableLongPolling true Allow long-polling (provided it is enabled by the server)
callbackInterval 15000 Safeguard to ensure background polling does not exceed this interval (in milliseconds)
backgroundCallbackInterval 60000 Interval to poll when long polling is disabled (either explicitly or due to browser being in background)
minPollInterval 100 When polling requests succeed, this is the minimum amount of time to wait before making the next request.
maxPollInterval 180000 If request to the server start failing, MessageBus will backoff, this is the upper limit of the backoff.
alwaysLongPoll false For debugging you may want to disable the "is browser in background" check and always long-poll
shouldLongPollCallback undefined A callback returning true or false that determines if we should long-poll or not, if unset ignore and simply depend on window visibility.
baseUrl / If message bus is mounted at a sub-path or different domain, you may configure it to perform requests there. See MessageBus.base_route= on how to configure the MessageBus server to listen on a sub-path.
ajax $.ajax falling back to XMLHttpRequest MessageBus will first attempt to use jQuery and then fallback to a plain XMLHttpRequest version that's contained in the message-bus-ajax.js file. message-bus-ajax.js must be loaded after message-bus.js for it to be used. You may override this option with a function that implements an ajax request by some other means
headers {} Extra headers to be include with requests. Properties and values of object must be valid values for HTTP Headers, i.e. no spaces or control characters.
minHiddenPollInterval 1500 Time to wait between poll requests performed by background or hidden tabs and windows, shared state via localStorage
enableChunkedEncoding true Allows streaming of message bus data over the HTTP connection without closing the connection after each message.

Javascript Client API

MessageBus.start() : Starts up the MessageBus poller

MessageBus.subscribe(channel,func,lastId) : Subscribes to a channel. You may optionally specify the id of the last message you received in the channel. The callback receives three arguments on message delivery: func(payload, globalId, messageId). You may save globalId or messageId of received messages and use then at a later time when client needs to subscribe, receiving the backlog since that id.

MessageBus.unsubscribe(channel,func) : Removes a subscription from a particular channel that was defined with a particular callback function (optional).

MessageBus.pause() : Pauses all MessageBus activity

MessageBus.resume() : Resumes MessageBus activity

MessageBus.stop() : Stops all MessageBus activity

MessageBus.status() : Returns status (started, paused, stopped)

MessageBus.diagnostics() : Returns a log that may be used for diagnostics on the status of message bus.

Ruby

The gem ships with a Ruby implementation of the client library available with an API very similar to that of the JavaScript client. It was inspired by https://github.com/lowjoel/message_bus-client.

# Creates a client with the default configuration
client = MessageBus::HTTPClient.new('http://some.test.com')

# Listen for the latest messages
client.subscribe("/channel") { |data| puts data }

# Listen for all messages after id 7
client.subscribe("/channel", last_message_id: 7) { |data| puts data }

# Listen for last message and all new messages
client.subscribe("/channel", last_message_id: -2) { |data| puts data }

# Unsubscribe from a channel
client.unsubscribe("/channel")

# Unsubscribe a particular callback from a channel
callback = -> { |data| puts data }
client.subscribe("/channel", &callback)
client.unsubscribe("/channel", &callback)

Ruby Client Settings

Setting Default Info
enable_long_polling true Allow long-polling (provided it is enabled by the server)
background_callback_interval 60s Interval to poll when long polling is disabled
min_poll_interval 0.1s When polling requests succeed, this is the minimum amount of time to wait before making the next request.
max_poll_interval 180s If request to the server start failing, MessageBus will backoff, this is the upper limit of the backoff.
enable_chunked_encoding true Allows streaming of message bus data over the HTTP connection without closing the connection after each message.
headers {} Extra headers to be include with requests. Properties and values of object must be valid values for HTTP Headers, i.e. no spaces or control characters.

Configuration

message_bus can be configured to use one of several available storage backends, and each has its own configuration options.

Keepalive

To ensure correct operation of message_bus, every 60 seconds a message is broadcast to itself. If for any reason the message is not consumed by the same process within 3 keepalive intervals a warning log message is raised.

To control keepalive interval use

MessageBus.configure(keepalive_interval: 60)

Redis

message_bus supports using Redis as a storage backend, and you can configure message_bus to use redis in config/initializers/message_bus.rb, like so:

MessageBus.configure(backend: :redis, url: "redis://:[email protected]:6380/15")

The redis client message_bus uses is redis-rb, so you can visit it's repo to see what other options you can pass besides a url.

Data Retention

Out of the box Redis keeps track of 2000 messages in the global backlog and 1000 messages in a per-channel backlog. Per-channel backlogs get cleared automatically after 7 days of inactivity. By default, the backlog will be pruned on every message publication. If exact backlog length limiting is not required, the clear_every parameter can be set higher to improve performance.

This is configurable via accessors on the Backend instance.

# only store 100 messages per channel
MessageBus.backend_instance.max_backlog_size = 100

# only store 100 global messages
MessageBus.backend_instance.max_global_backlog_size = 100

# flush per-channel backlog after 100 seconds of inactivity
MessageBus.backend_instance.max_backlog_age = 100

# clear the backlog every 50 messages
MessageBus.backend_instance.clear_every = 50

PostgreSQL

message_bus also supports PostgreSQL as a backend, and can be configured like so:

MessageBus.configure(backend: :postgres, backend_options: {user: 'message_bus', dbname: 'message_bus'})

The PostgreSQL client message_bus uses is ruby-pg, so you can visit it's repo to see what options you can include in :backend_options.

A :clear_every option is also supported, which limits backlog trimming frequency to the specified number of publications. If you set clear_every: 100, the backlog will only be cleared every 100 publications. This can improve performance in cases where exact backlog length limiting is not required.

Memory

message_bus also supports an in-memory backend. This can be used for testing or simple single-process environments that do not require persistence or horizontal scalability.

MessageBus.configure(backend: :memory)

The :clear_every option is supported in the same way as the PostgreSQL backend.

Transport codecs

By default MessageBus serializes messages to the backend using JSON. Under most situation this performs extremely well.

In some exceptional cases you may consider a different transport codec. To configure a custom codec use:

MessageBus.configure(transport_codec: codec)

A codec class must implement MessageBus::Codec::Base. Specifically an encode and decode method.

See the bench directory for examples where the default JSON codec can perform poorly. A specific examples may be attempting to distribute a message to a restricted list of thousands of users. In cases like this you may consider using a packed string encoder.

Keep in mind, much of MessageBus internals and supporting tools expect data to be converted to JSON and back, if you use a naive (and fast) Marshal based codec you may need to limit the features you use. Specifically the Postgresql backend expects the codec never to return a string with \u0000, additionally some classes like DistributedCache expect keys to be converted to Strings.

Another example may be very large and complicated messages where Oj in compatibility mode outperforms JSON. To opt for the Oj codec use:

MessageBus.configure(transport_codec: MessageBus::Codec::Oj.new)

Forking/threading app servers

If you're using a forking or threading app server and you're not getting immediate delivery of published messages, you might need to configure your web server to re-connect to the message_bus backend

Passenger

# Rails: config/application.rb or config.ru
if defined?(PhusionPassenger)
  PhusionPassenger.on_event(:starting_worker_process) do |forked|
    if forked
      # We're in smart spawning mode.
      MessageBus.after_fork
    else
      # We're in conservative spawning mode. We don't need to do anything.
    end
  end
end

MessageBus uses long polling which needs to be configured in Passenger

For passenger version < 5.0.21, add the following to application.rb:

PhusionPassenger.advertised_concurrency_level = 0

For passenger version > 5.0.21, add the following to nginx.conf:

location /message-bus {
  passenger_app_group_name foo_websocket;
  passenger_force_max_concurrent_requests_per_process 0;
}

For more information see the Passenger documentation on long-polling.

Puma

# path/to/your/config/puma.rb
on_worker_boot do
  MessageBus.after_fork
end

Unicorn

# path/to/your/config/unicorn.rb
after_fork do |server, worker|
  MessageBus.after_fork
end

Middleware stack in Rails

MessageBus middleware has to show up after the session middleware, but depending on how the Rails app is configured that might be either ActionDispatch::Session::CookieStore or ActionDispatch::Session::ActiveRecordStore. To handle both cases, the middleware is inserted before ActionDispatch::Flash.

For APIs or apps that have ActionDispatch::Flash deleted from the stack the middleware is pushed to the bottom.

Should you wish to manipulate the default behavior please refer to Rails MiddlewareStackProxy documentation and alter the order of the middlewares in stack in app/config/initializers/message_bus.rb

# config/initializers/message_bus.rb
Rails.application.config do |config|
  # do anything you wish with config.middleware here
end

Specifically, if you use a Rack middleware-based authentication solution (such as Warden) in a Rails application and wish to use it for authenticating message_bus requests, you must ensure that the MessageBus middleware comes after it in the stack.

# config/initializers/message_bus.rb
Rails.application.config.middleware.move_after(Warden::Manager, MessageBus::Rack::Middleware)

A Distributed Cache

MessageBus ships with an optional DistributedCache API which provides a simple and efficient way of synchronizing a cache between processes, based on the core of message_bus:

require 'message_bus/distributed_cache'

# process 1
cache = MessageBus::DistributedCache.new("animals")

# process 2
cache = MessageBus::DistributedCache.new("animals")

# process 1
cache["frogs"] = 5

# process 2
puts cache["frogs"]
# => 5

cache["frogs"] = nil

# process 1
puts cache["frogs"]
# => nil

You can automatically expire the cache on application code changes by scoping the cache to a specific version of the application:

cache = MessageBus::DistributedCache.new("cache name", app_version: "12.1.7.ABDEB")
cache["a"] = 77

cache = MessageBus::DistributedCache.new("cache name", app_version: "12.1.7.ABDEF")

puts cache["a"]
# => nil

Error Handling

The internet is a chaotic environment and clients can drop off for a variety of reasons. If this happens while message_bus is trying to write a message to the client you may see something like this in your logs:

Errno::EPIPE: Broken pipe
  from message_bus/client.rb:159:in `write'
  from message_bus/client.rb:159:in `write_headers'
  from message_bus/client.rb:178:in `write_chunk'
  from message_bus/client.rb:49:in `ensure_first_chunk_sent'
  from message_bus/rack/middleware.rb:150:in `block in call'
  from message_bus/client.rb:21:in `block in synchronize'
  from message_bus/client.rb:21:in `synchronize'
  from message_bus/client.rb:21:in `synchronize'
  from message_bus/rack/middleware.rb:147:in `call'
  ...

The user doesn't see anything, but depending on your traffic you may acquire quite a few of these in your logs or exception tracking tool.

You can rescue from errors that occur in MessageBus's middleware stack by adding a config option:

MessageBus.configure(on_middleware_error: proc do |env, e|
  # env contains the Rack environment at the time of error
  # e contains the exception that was raised
  if Errno::EPIPE === e
    [422, {}, [""]]
  else
    raise e
  end
end)

Adding extra response headers

In e.g. config/initializers/message_bus.rb:

MessageBus.extra_response_headers_lookup do |env|
  [
    ["Access-Control-Allow-Origin", "http://example.com:3000"],
  ]
end

How it works

MessageBus provides durable messaging following the publish-subscribe (pubsub) pattern to subscribers who track their own subscriptions. Durability is by virtue of the persistence of messages in backlogs stored in the selected backend implementation (Redis, Postgres, etc) which can be queried up until a configurable expiry. Subscribers must keep track of the ID of the last message they processed, and request only more-recent messages in subsequent connections.

The MessageBus implementation consists of several key parts:

  • Backend implementations - these provide a consistent API over a variety of options for persisting published messages. The API they present is around the publication to and reading of messages from those backlogs in a manner consistent with message_bus' philosophy. Each of these inherits from MessageBus::Backends::Base and implements the interface it documents.
  • MessageBus::Rack::Middleware - which accepts requests from subscribers, validates and authenticates them, delivers existing messages from the backlog and informs a MessageBus::ConnectionManager of a connection which is remaining open.
  • MessageBus::ConnectionManager - manages a set of subscribers with active connections to the server, such that messages which are published during the connection may be dispatched.
  • MessageBus::Client - represents a connected subscriber and delivers published messages over its connected socket.
  • MessageBus::Message - represents a published message and its encoding for persistence.

The public API is all defined on the MessageBus module itself.

Subscriber protocol

The message_bus protocol for subscribing clients is based on HTTP, optionally with long-polling and chunked encoding, as specified by the HTTP/1.1 spec in RFC7230 and RFC7231.

The protocol consists of a single HTTP end-point at /message-bus/[client_id]/poll, which responds to POST and OPTIONS. In the course of a POST request, the client must indicate the channels from which messages are desired, along with the last message ID the client received for each channel, and an incrementing integer sequence number for each request (used to detect out of order requests and close those with the same client ID and lower sequence numbers).

Clients' specification of requested channels can be submitted in either JSON format (with a Content-Type of application/json) or as HTML form data (using application/x-www-form-urlencoded). An example request might look like:

POST /message-bus/3314c3f12b1e45b4b1fdf1a6e42ba826/poll HTTP/1.1
Host: foo.com
Content-Type: application/json
Content-Length: 37

{"/foo/bar":3,"/doo/dah":0,"__seq":7}

If there are messages more recent than the client-specified IDs in any of the requested channels, those messages will be immediately delivered to the client. If the server is configured for long-polling, the client has not requested to disable it (by specifying the dlp=t query parameter), and no new messages are available, the connection will remain open for the configured long-polling interval (25 seconds by default); if a message becomes available in that time, it will be delivered, else the connection will close. If chunked encoding is enabled, message delivery will not automatically end the connection, and messages will be continuously delivered during the life of the connection, separated by "\r\n|\r\n".

The format for delivered messages is a JSON array of message objects like so:

[
  {
    "global_id": 12,
    "message_id": 1,
    "channel": "/some/channel/name",
    "data": [the message as published]
  }
]

The global_id field here indicates the ID of the message in the global backlog, while the message_id is the ID of the message in the channel-specific backlog. The ID used for subscriptions is always the channel-specific one.

In certain conditions, a status message will be delivered and look like this:

{
  "global_id": -1,
  "message_id": -1,
  "channel": "/__status",
  "data": {
    "/some/channel": 5,
    "/other/channel": 9
  }
}

This message indicates the last ID in the backlog for each channel that the client subscribed to. It is sent in the following circumstances:

  • When the client subscribes to a channel starting from -1. When long-polling, this message will be delivered immediately.
  • When the client subscribes to a channel starting from a message ID that is beyond the last message on that channel.
  • When delivery of messages to a client is skipped because the message is filtered to other users/groups.

The values provided in this status message can be used by the client to skip requesting messages it will never receive and move forward in polling.

Publishing to MessageBus from outside of MessageBus

It may be necessary or desired for integration with existing systems to publish messages from outside the Ruby app where MessageBus is running. @tgodfrey has an example of how to do that, using the Redis backend, from Elixir here: https://gist.github.com/tgodfrey/1a67753d51cb202ca8eb04b933cec924.

Contributing

If you are looking to contribute to this project here are some ideas

  • MAKE THIS README BETTER!
  • Build backends for other providers (zeromq, rabbitmq, disque) - currently we support pg and redis.
  • Improve and properly document admin dashboard (add opt-in stats, better diagnostics into queues)
  • Improve general documentation (Add examples, refine existing examples)
  • Make MessageBus a nice website
  • Add optional transports for websocket and shared web workers

When submitting a PR, please be sure to include notes on it in the Unreleased section of the changelog, but do not bump the version number.

Running tests

To run tests you need both Postgres and Redis installed. By default on Redis the tests connect to localhost:6379 and on Postgres connect the database localhost:5432/message_bus_test with the system username; if you wish to override this, you can set alternative values:

PGUSER=some_user PGDATABASE=some_db bundle exec rake

We include a Docker Compose configuration to run test suite in isolation, or if you do not have Redis or Postgres installed natively. To execute it, do docker-compose run tests.

Generating the documentation

Run rake yard (or docker-compose run docs rake yard) in order to generate the implementation's API docs in HTML format, and open doc/index.html to view them.

While working on documentation, it is useful to automatically re-build it as you make changes. You can do yard server --reload (or docker-compose up docs) and open http://localhost:8808 to browse live-built docs as you edit them.

Benchmarks

Some simple benchmarks are implemented in spec/performance and can be executed using rake performance (or docker-compose run tests rake performance). You should run these before and after your changes to avoid introducing performance regressions.

message_bus's People

Contributors

benlangfeld avatar cannikin avatar cvx avatar davidtaylorhq avatar dependabot[bot] avatar elektronaut avatar eviltrout avatar felixbuenemann avatar ffabreti avatar henrik avatar intrepidd avatar jemminger avatar jeremyevans avatar jjaffeux avatar julik avatar lis2 avatar mikz avatar nathanstitt avatar nikolai-b avatar nlalonde avatar olleolleolle avatar osamasayegh avatar ramontayag avatar rwjblue avatar samsaffron avatar srijanshukla18 avatar st0012 avatar ted-hanson avatar tgxworld avatar washu avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

message_bus's Issues

Restrict subscription to a channel?

Not sure on the best place to ask more support questions, but is there a way to restrict subscription to channel?

I'm looking at using message_bus for a chat type application, but there's also some internal messaging that would ideally go over the message bus (there's a message bus, why not use it?). However, I can't see any way to stop a chat client from subscribing to one of these internal channels, except by running them on a different message bus (e.g. db 1) or giving the channels "unguessable" names.

Is there something I'm missing?

Long polling not working?

From my understanding of long polling (and forgive me if i'm wrong) it should make the request once and wait until the server has something to answer.

I've made a quick setup on my application, and as soon as I subscribe to the channel, if i look at browser network inspector, I see multiple requests being sent (depending on the polling frequency) even though the server has not published anything to that channel.

Am I misinterpreting the way long polling works, have I not configured it properly, or is it a bug?

Support different client/server locations

I'll look into fixing this myself and submitting a PR, but it seems like message_bus isn't built for housing the client and server on different machines (I have a client front end that gets JSON api data from my api server and subscribes to an updates channel on the same server to get notified of changes).

While changing the relevant bit in line 112 from baseUrl to me.baseUrl allows me to say MessageBus.baseUrl = "http://api.example.com" in my app setup block, the message_bus middleware doesn't set CORS headers, so my client refuses to listen to message_bus.

Does message_bus work on Heroku?

Just curious to see how Message Bus would work with a multiple server setup on Heroku?

Also, is it possible to have auth with Message Bus for private channels?

Thanks!

Readme.md is not showing third column which explains javascript client settings

As you can see in the raw file for Readme.md, javascript client settings table is made of three columns, but third column doesn't appears in the compiled version.
This problem hides the explanations of each config option.

Edit:

Changing Setting|Default to Setting|Default|Info makes third row appears again.

excerpt:

Setting|Default|
----|---|---|
enableLongPolling|true|Allow long-polling (provided it is enable by the server)
callbackInterval|15000|Safeguard to ensure background polling does not exceed this interval (in milliseconds)
backgroundCallbackInterval|60000|Interval to poll when long polling is disabled (either explicitly or due to browser being in backgroud)
maxPollInterval|180000|If request to the server start failing, MessageBus will backoff, this is the upper limit of the backoff.
alwaysLongPoll|false|For debugging you may want to disable the "is browser in background" check and always long-poll
baseUrl|/|If message bus is mounted in a subdirectory of different domain, you may configure it to perform requests there
ajax|$.ajax or XMLHttpRequest|MessageBus will first attempt to use jQuery and then fallback to a plain XMLHttpRequest version that's contained in the `messsage-bus-ajax.js` file. `messsage-bus-ajax.js` must be loaded after `messsage-bus.js` for it to be used.

Rendered:

Setting Default
enableLongPolling true
callbackInterval 15000
backgroundCallbackInterval 60000
maxPollInterval 180000
alwaysLongPoll false
baseUrl /
ajax $.ajax or XMLHttpRequest

Should Render:

Setting Default Info
enableLongPolling true Allow long-polling (provided it is enable by the server)
callbackInterval 15000 Safeguard to ensure background polling does not exceed this interval (in milliseconds)
backgroundCallbackInterval 60000 Interval to poll when long polling is disabled (either explicitly or due to browser being in backgroud)
maxPollInterval 180000 If request to the server start failing, MessageBus will backoff, this is the upper limit of the backoff.
alwaysLongPoll false For debugging you may want to disable the "is browser in background" check and always long-poll
baseUrl / If message bus is mounted in a subdirectory of different domain, you may configure it to perform requests there
ajax $.ajax or XMLHttpRequest MessageBus will first attempt to use jQuery and then fallback to a plain XMLHttpRequest version that's contained in the messsage-bus-ajax.js file. messsage-bus-ajax.js must be loaded after messsage-bus.js for it to be used.

message_bus setting document missing in README. Thanks.

Thank you give us a so awesome gem!

I have use this gem in our project several month, it work perfect.

I understood a little long pool, But, I still not understood aboutwhat does the callbackInterval means,
I only notice the long-pool interval is 25 seconds, I can't search more info from issues, could you explain more about this in README ?

Thanks

Setting baseUrl broken

It seems the setting of the MessageBus.baseUrl is broken in 0.9.4.
If I set MessageBus.baseUrl = "http://localhost:3001/" before subscribing to any channels, MessageBus still uses "/" as baseUrl in the ajax polling call.

MessageBus.unsubscribe does not seem to work in JS client

Hello,

I've tested the following in the console :

MessageBus.callbacks.length // 1
var f = function(data) { console.log('test'}}
MessageBus.subscribe('/test', f)
MessageBus.callbacks.length // 2
MessageBus.unsubscribe('/test', f)
MessageBus.callbacks.length // 2 (should have been 1 !!)

==> MessageBus.unsubscribe does not seem to work

errors on wrk benchmark

I have been testing a chat example using provided wrk sample in /message_bus/examples/bench
I am not sure why errors are thrown, maybe it is expected behaviour, I hope I am missing something.
Maybe a config problem, or something misconfigured.

Test command:

$ wrk -c200 -d10s --timeout=30s -s bench.lua http://10.5.5.102:3000

After each test run, I see more and more errors:

Test results:
First run:

Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     1.58s   809.90ms   2.11s    77.63%
    Req/Sec   303.27    257.28     1.03k    57.69%
  1006 requests in 10.03s, 330.37KB read
  Socket errors: connect 0, read 19, write 0, timeout 0
  Non-2xx or 3xx responses: 5

Second run:

Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     1.61s   804.70ms   2.14s    78.71%
    Req/Sec   336.38    343.80     1.38k    88.46%
  1010 requests in 10.03s, 330.54KB read
  Socket errors: connect 0, read 5, write 0, timeout 0
  Non-2xx or 3xx responses: 74

Third run:

  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     1.45s   903.20ms   2.17s    71.26%
    Req/Sec   402.61    424.53     1.44k    86.96%
  1110 requests in 10.03s, 361.54KB read
  Socket errors: connect 0, read 9, write 0, timeout 0
  Non-2xx or 3xx responses: 173

and lots of these on logs:

2016/04/15 15:48:35 [error] 921#0: *252 connect() to /tmp/passenger.1.0.901/generation-0/request failed (11: Resource temporarily unavailable) while connecting to upstream, client: 10.5.5.102, server: _, request: "POST /message-bus/a7dbce357bace4c170951b32b389648e/poll HTTP/1.1", upstream: "passenger:/tmp/passenger.1.0.901/generation-0/request:", host: "10.5.5.102:3000"

2016/04/15 15:55:01 [error] 2020#0: *1474 upstream prematurely closed connection while reading response header from upstream, client: 10.5.5.102, server: _, request: "POST /message-bus/3e8c9c62ffcba06fed52c35b9ca83aed/poll HTTP/1.1", upstream: "passenger:/tmp/passenger.1.0.1991/generation-0/request:", host: "10.5.5.102:3000"`

App 2041 stdout: W, [2016-04-15T15:55:16.179356 #2041] WARN -- : PQconsumeInput() subscribe failed, reconnecting in 1 second. Call stack
App 2041 stdout: /home/fer/.rvm/gems/ruby-2.0.0-p451/bundler/gems/message_bus-98a71c19be42/lib/message_bus/backends/postgres.rb:111:in wait_for_notify' App 2041 stdout: /home/fer/.rvm/gems/ruby-2.0.0-p451/bundler/gems/message_bus-98a71c19be42/lib/message_bus/backends/postgres.rb:111:insubscribe'

Would that be normal within a test scenario where bursts are large and concentrated?
Is there something I can configure so things run smoothly?

My env:

config.ru:

require 'message_bus'

require ::File.expand_path('../config/environment', __FILE__)

    if defined?(PhusionPassenger)
        # https://www.phusionpassenger.com/library/config/apache/tuning_sse_and_websockets/
        # https://github.com/dimelo/passenger-faye-issue

        PhusionPassenger.advertised_concurrency_level = 0

        # https://github.com/SamSaffron/message_bus
        PhusionPassenger.on_event(:starting_worker_process) do |forked|
            if forked
                # We're in smart spawning mode.
                MessageBus.after_fork
            else
                # We're in conservative spawning mode. We don't need to do anything.
            end
        end
    end

MessageBus.long_polling_interval = 1000 * 2
MessageBus.max_active_clients = 10000
use MessageBus::Rack::Middleware

run Rails.application

database.yml:

default: &default
  adapter: postgresql
  pool: 200

initializers/chat.rb:

MessageBus.configure(
    backend: :postgres,
    backend_options: {
        user: 'messagebus-chat',
        password: '123456',
        dbname: 'farma-chat_development'
    }
)

Can not start puma when set MessageBus.after_fork on boot work

when I deploy my rails app in production with puma, puma socket cannot start if I add:

on_worker_boot do
    MessageBus.after_fork
end

anything wrong? Thank you
log:

[11443] Puma starting in cluster mode...
[11443] * Version 2.14.0 (ruby 2.2.3-p173), codename: Fuchsia Friday
[11443] * Min threads: 4, max threads: 16
[11443] * Environment: production
[11443] * Process workers: 1
[11443] * Phased restart available
[11443] * Listening on unix:///tmp/app.sock
[11443] * Daemonizing...

Ang Nginx cannot connect to unix:///tmp/app.sock

Heavy polling when tab is out of focus (in Firefox and Chrome)

Versions used: Firefox 36.0.1, Chrome 41.0.2272.89 (64-bit), Message Bus 1.0.6

In examples/chat, if the browser tab is left out of focus, after some waiting time, heavy polling begins. The Rails server logs show the following. From the timestamps, you can observe the frequency of the polling:

I, [2015-03-19T15:39:28.485981 #3334]  INFO -- : Started POST "/message-bus/88102805ba474edeafffb2ce68d1edfe/poll?dlp=t" for 127.0.0.1 at 2015-03-19 15:39:28 +0530
I, [2015-03-19T15:39:28.593629 #3334]  INFO -- : Started POST "/message-bus/88102805ba474edeafffb2ce68d1edfe/poll?dlp=t" for 127.0.0.1 at 2015-03-19 15:39:28 +0530
I, [2015-03-19T15:39:28.701265 #3334]  INFO -- : Started POST "/message-bus/88102805ba474edeafffb2ce68d1edfe/poll?dlp=t" for 127.0.0.1 at 2015-03-19 15:39:28 +0530
I, [2015-03-19T15:39:28.811940 #3334]  INFO -- : Started POST "/message-bus/88102805ba474edeafffb2ce68d1edfe/poll?dlp=t" for 127.0.0.1 at 2015-03-19 15:39:28 +0530
I, [2015-03-19T15:39:28.920720 #3334]  INFO -- : Started POST "/message-bus/88102805ba474edeafffb2ce68d1edfe/poll?dlp=t" for 127.0.0.1 at 2015-03-19 15:39:28 +0530
I, [2015-03-19T15:39:29.030994 #3334]  INFO -- : Started POST "/message-bus/88102805ba474edeafffb2ce68d1edfe/poll?dlp=t" for 127.0.0.1 at 2015-03-19 15:39:29 +0530
I, [2015-03-19T15:39:29.146022 #3334]  INFO -- : Started POST "/message-bus/88102805ba474edeafffb2ce68d1edfe/poll?dlp=t" for 127.0.0.1 at 2015-03-19 15:39:29 +0530
I, [2015-03-19T15:39:29.254551 #3334]  INFO -- : Started POST "/message-bus/88102805ba474edeafffb2ce68d1edfe/poll?dlp=t" for 127.0.0.1 at 2015-03-19 15:39:29 +0530
I, [2015-03-19T15:39:29.366684 #3334]  INFO -- : Started POST "/message-bus/88102805ba474edeafffb2ce68d1edfe/poll?dlp=t" for 127.0.0.1 at 2015-03-19 15:39:29 +0530
I, [2015-03-19T15:39:29.474899 #3334]  INFO -- : Started POST "/message-bus/88102805ba474edeafffb2ce68d1edfe/poll?dlp=t" for 127.0.0.1 at 2015-03-19 15:39:29 +0530

This polling immediately stops when the browser tab is again brought in focus and from then on, behaves normally (at an interval of 25 seconds).

Is this expected?

Chat example set header without JQuery (using message-bus-ajax.js)

How can I set header like in chat example...

MessageBus.ajax = function(args){
          args["headers"]["X-NAME"] = name;
          return $.ajax(args);
        };

... without JQuery support (using message-bus-ajax.js) ?

tried this without success:

window.MessageBus.ajax = function(args){
       args = args || {};
       args["headers"]["X-NAME"] = name;
       return new XMLHttpRequest(args);
 };

Also tried:

            window.MessageBus.ajax({
                headers: {
                    'X-name': function() { return currentUser; }
                }
            });

but it complains about a lot of other not defined options.

Clients deleted by timer queue preventing message delivery

I'm seeing an issue where messages sometimes aren't being delivered immediately to clients. After some debugging, I believe what is happening is that the current client instance registered against a client_id in the connection_manager is being removed by timers from previous polls. This is is particularly evident in our application because we tend to subscribe and unsubscribe to a number of different channels for short periods of time as a user works with the app. We're effectively simulating a request / response type pattern with an event_sourced / cqrs app using promises and short term message_bus subscriptions.

In more detail, the sequence of events that demonstrates this issue is as follows.

  1. Subscribe to a channel
    • middleware creates a new client and adds it to the connection_manager
    • connection_manager registers it against it's client_id in the @clients attribute of the connection_manager
    • connection_manager adds the client_id to the @subscriptions set.
    • middleware adds callback to the timers queue which calls remove_client in long_polling_interval seconds.
  2. User subscribes to another channel before the long_polling_interval has expired
    • middleware creates a new client and subscribes to both channels as above.
  3. long_polling_interval for the first subscription expires
    • remove_client method on the connection_manager is called.
    • the client registered against @clients in connection_manager for the client_id is removed. Note that this is actually removing the new client as the first client was replaced by a new client in step 2. This means that any messages published to either of these channels won't be delivered.
  4. System publishes a message to the new channel subscribed to in step 2.
    • No message is delivered to the browser because the client is no longer registered.
  5. long_polling_interval expires on the client. It re-polls and probably catches up from the backlog. - Note that I haven't completely confirmed this yet.

I have been able to naively stop this problem from happening by checking the connect_time of the client being removed against the client currently registered in @clients. If they aren't the same i'm not removing the client from @clients or client_id form the @subscriptions but only cancelling the cleanup_timer. While this seems to help in basic testing, I suspect it may cause other cleanup issues due to remaining subscriptions.

I'll continue to investigate as best I can and update where I can but any thoughts on what i've found would be most appreciated. Especially if i'm off track on what I think i'm seeing.

Failed to process message

Hi,

I can't seem to get message_bus working. I get the following errors: https://gist.github.com/Vaporizd/398734438608301a39e0

What I do is:

after_create :publish_entry

def publish_entry
  MessageBus.publish('/entries', "hello word")
end

I also can't seem to include the message-bus js lib

//= require message-bus
couldn't find file 'message-bus' with type 'application/javascript'

Full trace; https://gist.github.com/Vaporizd/f06bffade8b6490b338d
I've tried with both the 1.1.1 version and the newest 2.0 beta versions. Nothing seems to get that going. I have redis running and puma

require 'message_bus'
# Change to match your CPU core count
workers 1 

# Min and Max threads per worker
threads 1, 6

app_dir = File.expand_path("../..", __FILE__)

on_worker_boot do
  MessageBus.after_fork
end

Any help is appreciated, thanks.

Where do user_ids and group_ids get set?

(is there some better place to ask for help? Stackoverflow only has 12 followers of the "discourse" tag)

The readme shows this example:

# messages can be targetted at particular users or groups
MessageBus.publish "/channel", "hello", user_ids: [1,2,3], group_ids: [4,5,6]

However it does not make it clear what specifically a "user_id" is or where it is set. In my use case, I want to broadcast a message to a specific user. Is the user_id the user's database ID, or something else?

I suspect it's just some arbitrary identifier that I can just set into the current session and retrieve via

  MessageBus.user_id_lookup do |env|
    # return the user id here
  end

but I cannot seem to find the right place to put the user_id_lookup call where the rails session actually exists yet. All I get for env["rack.session"] is #<ActionDispatch::Request::Session:0x7fb788510dc8 not yet loaded>

Mention that messages are JSON-encoded internally in documentation

I tried to use symbols as group_ids which turns out not to work since group_ids and user_ids are JSON encoded/decoded internally.. this should probably be mentioned in the docs somewhere.
Alternatively one can use Marsal instead, if human readability of the encoded value is not important.

undefined method `configure'

I am using rails 4.2.5. I have created an init file for message_bus and set ruby MessageBus.configure(backend: :memory) to that file. After this the following error is throwed. BTW I am using thin server.

/home/mehmet/apps/ruby/mbus/config/initializers/message_bus.rb:1:in <top (required)>': undefined methodconfigure' for MessageBus:Module (NoMethodError)

`message-bus` is wrongly used as a id

I used MessageBus in my Article model. In ArticlesController#update, a message is published and the browser is redirected to article page

def update
  respond_to do |format|
    if @article.update(article_params)
      MessageBus.publish "/update-Article-#{@article.id}", {
        collectable_type: 'Article',
        collectable_id: @article.id
      }
      format.html { redirect_to [@article.user, @article] }
    else
      format.html { render :edit }
    end
  end
end

but I saw there is an error in the log:

Started GET "/users/1/articles/message-bus.js" for 127.0.0.1 at 2015-09-05 18:59:37 +0800
Processing by ArticlesController#show as JS
  Parameters: {"user_id"=>"1", "id"=>"message-bus"}
  User Load (0.2ms)  SELECT  "users".* FROM "users" WHERE "users"."id" = $1  ORDER BY "users"."id" ASC LIMIT 1  [["id", 1]]
  Article Load (0.2ms)  SELECT  "articles".* FROM "articles" WHERE "articles"."id" = $1 LIMIT 1  [["id", 0]]
Completed 404 Not Found in 3ms (ActiveRecord: 0.4ms)

ActiveRecord::RecordNotFound (Couldn't find Article with 'id'=message-bus):
  app/controllers/articles_controller.rb:103:in `set_article'

Why message-bus.js is in the URL?

Problems running integration tests using message_bus

I just started using message_bus in a project and I really like it so far. Sometimes however when running feature specs in rspec, message_bus seems to kill the entire rspec process while the tests are running.

rspec
.................*.***............*................................F......*[1]    63130 killed     rspec
☁  myapp [master] ⚡ /Users/eiriklied/.rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/message_bus-1.0.16/lib/message_bus.rb:437:in `kill': No such process (Errno::ESRCH)
    from /Users/eiriklied/.rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/message_bus-1.0.16/lib/message_bus.rb:437:in `block (2 levels) in new_subscriber_thread'
    from /Users/eiriklied/.rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/message_bus-1.0.16/lib/message_bus.rb:434:in `fork'
    from /Users/eiriklied/.rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/message_bus-1.0.16/lib/message_bus.rb:434:in `block in new_subscriber_thread'
    from /Users/eiriklied/.rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/message_bus-1.0.16/lib/message_bus/timer_thread.rb:98:in `call'
    from /Users/eiriklied/.rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/message_bus-1.0.16/lib/message_bus/timer_thread.rb:98:in `do_work'
    from /Users/eiriklied/.rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/message_bus-1.0.16/lib/message_bus/timer_thread.rb:29:in `block in initialize'

In log/test.log I can also see Global messages on 63130 timed out, restarting process corresponding to https://github.com/SamSaffron/message_bus/blob/master/lib/message_bus.rb#L425

At other times I get failures like

Failures:

  1) team administration should allow for creating users through admin
     Failure/Error: visit team_admin_users_path
     SignalException:
       SIGTERM
     # ./spec/features/team_admin_spec.rb:14:in `block (2 levels) in <top (required)>'

along with

☁  myapp [master] ⚡ /Users/eiriklied/.rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/message_bus-1.0.16/lib/message_bus.rb:437:in `kill': No such process (Errno::ESRCH)
    from /Users/eiriklied/.rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/message_bus-1.0.16/lib/message_bus.rb:437:in `block (2 levels) in new_subscriber_thread'
    from /Users/eiriklied/.rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/message_bus-1.0.16/lib/message_bus.rb:434:in `fork'
    from /Users/eiriklied/.rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/message_bus-1.0.16/lib/message_bus.rb:434:in `block in new_subscriber_thread'
    from /Users/eiriklied/.rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/message_bus-1.0.16/lib/message_bus/timer_thread.rb:98:in `call'
    from /Users/eiriklied/.rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/message_bus-1.0.16/lib/message_bus/timer_thread.rb:98:in `do_work'
    from /Users/eiriklied/.rbenv/versions/2.1.6/lib/ruby/gems/2.1.0/gems/message_bus-1.0.16/lib/message_bus/timer_thread.rb:29:in `block in initialize'

Any idea why this is happening?

.js assets cannot be encoded() as UTF8 due to invalid chars

Hi Sam!

I am including message-bus.js into a concatenating build pipe (not using Sprockets). My pipe bails because apparently there are invalid UTF8 chars in the file if it's read in binary.

This causes Ruby 2.0 to bail when they get concatenated with other script sources which might be UTF8 - for instance, for me and many people I know it's fairly common to use non-ASCII string literals in their JS. Switching all of our sources to ASCII is not that great.

Now, both ember and jQuery have those suspicious things going on (probably due to a few iterations of copy and paste), but I thought it might be a good idea to excise those frommessage-bus.js because it is likely to be vendored in other people's applications (and included in a concatenating build phase).

Or do I misinterpret the Ruby encoding pipe in this case?
See here for a test case:

https://github.com/julik/message_bus/tree/js-should-be-valid-utf8

Memory cleanup

Hey there,

I'm currently toying a bit with message_bus because I'm seriously considering using it to be the communication bus of an application I'm developing.

After trying out, and searching a lot (even in discourse source code), I can't find where and if you do some memory clean-up of messages on redis.

Is it possible? What is the recommended way?

Thanks in advance!

Application got very slow with passenger installed with nginx

Hello,

gem works like a charm, unfortunately I experience very slow behaviour on production with passenger.

So far I am not able to find out the reason. Only thing I know, when I disable subscribe on client, everything is fast again. Otherwise it waits like 2-3 seconds before response. Even in rails log is nothing during that waiting.

Can the reason be that passenger is running in only one process?

Do you have any experience with running it with passenger?
Passenger was installed with nginx, so it is not standalone. So my nginx.conf looks like this.
server {
listen 80;
server_name medis.rails.timepress.cz;
root /home/www/medis/public;
passenger_enabled on;
}

I will be glad for any help. Thanks.

Seems error when working together puma

Got the warning.

[1324] Puma starting in cluster mode...
[1324] * Version 2.12.3 (ruby 2.2.1-p85), codename: Plutonian Photo Shoot
[1324] * Min threads: 16, max threads: 40
[1324] * Environment: production
[1324] * Process workers: 4
[1324] * Preloading application
[1324] * Listening on unix:///root/www_space/lenovo/shared/tmp/sockets/puma.sock
[1324] ! WARNING: Detected 2 Thread(s) started in app boot:
[1324] ! #<Thread:0x00000004c3dcd8@/root/www_space/lenovo/releases/6/vendor/bundle/ruby/2.2.0/gems/message_bus-1.0.16/lib/message_bus.rb:407 sleep> - /root/www_space/lenovo/releases/6/vendor/bundle/ruby/2.2.0/gems/redis-3.2.1/lib/redis/connection/ruby.rb:52:in `select'
[1324] ! #<Thread:0x00000004c3da80@/root/www_space/lenovo/releases/6/vendor/bundle/ruby/2.2.0/gems/message_bus-1.0.16/lib/message_bus/timer_thread.rb:29 sleep> - /root/www_space/lenovo/releases/6/vendor/bundle/ruby/2.2.0/gems/message_bus-1.0.16/lib/message_bus/timer_thread.rb:111:in `sleep'
[1324] * Daemonizing...
Connection to 123.57.172.249 closed.
       Elapsed time: 3.52 seconds

my puma config.

workers 4
threads 16, 40
#..... other configs

on_worker_boot do
  # Worker specific setup for Rails 4.1+
  # See: https://devcenter.heroku.com/articles/deploying-rails-applications-with-the-puma-web-server#on-worker-boot
  ActiveRecord::Base.establish_connection

  # Forking/threading app servers
  MessageBus.after_fork
end
  • Ruby version ruby-2.2.1 [ x86_64 ]
  • Rails 4.2.3
  • Unbuntu 1404_64

message_bus doesn't reconnect to postgres backend on database service restart

If postgres service stops, message_bus cannot restablish the connection when postgres is restarted. I guess it is not intended behaviour (bug).

tested with Rails and message_bus (2.0.0.beta)

how to reproduce: (you need postgres up and running)

  1. install and run chat-example and modify it's backend to postgres or install Rails message_bus-chat-example
  2. start chat server
  3. start browser client pointing to chat url
  4. make sure chat is working
  5. stop postgresql: (something like systemctl stop postgresql-9.3 )
  6. read the logs, message_bus tries to reconnect)
  7. start postgresql: (something like systemctl start postgresql-9.3 )
  8. message_bus cannot reconnect

RAILS LOGs:

Started POST "/message-bus/8f767f541df341629d2c6c4a2067ee4c/poll" for 127.0.0.1 at 2016-04-13 15:15:15 -0300

PG::UnableToSend (no connection to the server
):
  message_bus (2.0.0.beta.6) lib/message_bus/backends/postgres.rb:129:in `exec_prepared'
  message_bus (2.0.0.beta.6) lib/message_bus/backends/postgres.rb:129:in `exec_prepared'
  message_bus (2.0.0.beta.6) lib/message_bus/backends/postgres.rb:55:in `block in backlog'
  message_bus (2.0.0.beta.6) lib/message_bus/backends/postgres.rb:159:in `hold'
  message_bus (2.0.0.beta.6) lib/message_bus/backends/postgres.rb:55:in `backlog'
  message_bus (2.0.0.beta.6) lib/message_bus/backends/postgres.rb:276:in `backlog'
  message_bus (2.0.0.beta.6) lib/message_bus.rb:286:in `backlog'
  message_bus (2.0.0.beta.6) lib/message_bus/client.rb:122:in `block in backlog'
  message_bus (2.0.0.beta.6) lib/message_bus/client.rb:120:in `each'
  message_bus (2.0.0.beta.6) lib/message_bus/client.rb:120:in `backlog'
  message_bus (2.0.0.beta.6) lib/message_bus/rack/middleware.rb:134:in `call'
  rack (1.5.5) lib/rack/session/abstract/id.rb:225:in `context'
  rack (1.5.5) lib/rack/session/abstract/id.rb:220:in `call'
  actionpack (4.1.15) lib/action_dispatch/middleware/cookies.rb:562:in `call'
  activerecord (4.1.15) lib/active_record/query_cache.rb:36:in `call'
  activerecord (4.1.15) lib/active_record/connection_adapters/abstract/connection_pool.rb:621:in `call'
  activerecord (4.1.15) lib/active_record/migration.rb:380:in `call'
  actionpack (4.1.15) lib/action_dispatch/middleware/callbacks.rb:29:in `block in call'
  activesupport (4.1.15) lib/active_support/callbacks.rb:82:in `run_callbacks'
  actionpack (4.1.15) lib/action_dispatch/middleware/callbacks.rb:27:in `call'
  actionpack (4.1.15) lib/action_dispatch/middleware/reloader.rb:73:in `call'
  actionpack (4.1.15) lib/action_dispatch/middleware/remote_ip.rb:76:in `call'
  actionpack (4.1.15) lib/action_dispatch/middleware/debug_exceptions.rb:17:in `call'
  actionpack (4.1.15) lib/action_dispatch/middleware/show_exceptions.rb:30:in `call'
  railties (4.1.15) lib/rails/rack/logger.rb:38:in `call_app'
  railties (4.1.15) lib/rails/rack/logger.rb:20:in `block in call'
  activesupport (4.1.15) lib/active_support/tagged_logging.rb:68:in `block in tagged'
  activesupport (4.1.15) lib/active_support/tagged_logging.rb:26:in `tagged'
  activesupport (4.1.15) lib/active_support/tagged_logging.rb:68:in `tagged'
  railties (4.1.15) lib/rails/rack/logger.rb:20:in `call'
  actionpack (4.1.15) lib/action_dispatch/middleware/request_id.rb:21:in `call'
  rack (1.5.5) lib/rack/methodoverride.rb:21:in `call'
  rack (1.5.5) lib/rack/runtime.rb:17:in `call'
  activesupport (4.1.15) lib/active_support/cache/strategy/local_cache_middleware.rb:26:in `call'
  rack (1.5.5) lib/rack/lock.rb:17:in `call'
  actionpack (4.1.15) lib/action_dispatch/middleware/static.rb:84:in `call'
  rack (1.5.5) lib/rack/sendfile.rb:112:in `call'
  railties (4.1.15) lib/rails/engine.rb:514:in `call'
  railties (4.1.15) lib/rails/application.rb:144:in `call'
  rack (1.5.5) lib/rack/content_length.rb:14:in `call'
  puma (3.3.0) lib/puma/configuration.rb:224:in `call'
  puma (3.3.0) lib/puma/server.rb:561:in `handle_request'
  puma (3.3.0) lib/puma/server.rb:406:in `process_client'
  puma (3.3.0) lib/puma/server.rb:271:in `block in run'
  puma (3.3.0) lib/puma/thread_pool.rb:111:in `call'
  puma (3.3.0) lib/puma/thread_pool.rb:111:in `block in spawn_thread'

The logs point to the rescueclause on lib/message_bus/backends/postgres.rb hold method:

def hold
    current_pid = Process.pid
    if current_pid != @pid
      @pid = current_pid
      sync do
        INHERITED_CONNECTIONS.concat(@available)
        @available.clear
      end
    end

    if conn = sync{@allocated[Thread.current]}
      return yield(conn)
    end

    begin
      conn = sync{@available.shift} || new_pg_connection
      sync{@allocated[Thread.current] = conn}
      yield conn
    rescue PG::ConnectionBad => e
      # don't add this connection back to the pool
    ensure
      sync{@allocated.delete(Thread.current)}
      if Process.pid != current_pid
        sync{INHERITED_CONNECTIONS << conn}
      elsif conn && !e
        sync{@available << conn}
      end
    end
  end

I have tried something like:

rescue PG::ConnectionBad => e
     client.reconnect
     retry

without success

message_bus.js not in the gem assets?

I'm not a gem bundling expert, but it seems like message_bus/assets isn't getting included in the gemfile (the gemspec doesn't include assets/*, for example).

It's not the end of the world, having to manually stick message_bus.js into my own /vendor/assets directory, but it'd be cool if the file was properly embedded in the gem, so a sprockets include directive Just Worked.

Maybe this is my cue to figure out how to serve assets from a gem, so I can do the right thing and submit a pull request.

How does a server-to-server client process "catch up"?

I'm evaluating this for our uses, but I wonder how something is supposed to work. From the README

MessageBus.subscribe "/channel" do |msg|
  # block called in a background thread when message is received
end

MessageBus.backlog "/channel", id
# returns all messages after the id

Let's say the client process has been offline for a bit (deployment or something). It knows where it was before stopping, so it can indeed request that backlog. However, it seems to me that:

  • If you call backlog first, process that, and then subscribe, you'll miss any messages between the backlog and subscribing.
  • If you call subscribe first, then process the backlog, you might get duplicates if any messages were published between subscribing (so processed by subscribe block) and calling .backlog.

I was expecting (hoping, really) that subscribe would take an optional id, and just call the block for the backlog and then start sending new messages as they come in. Since this is missing, what is the best way for a process to catch up and not miss anything or get duplicates?

DistributedCache question: load current data from backlog

If I run c = DistributedCache.new 'blah'; c['one'] = 1 in a process and then loads another process and it run DistributedCache.new 'blah' the data {'one' => 1} is not there, just newly published after this.

Is there a way to load the current data? This is necessary as new processes appear dynamically on my app and they must get the data as they init.

message_bus chat example with Rails (Jquery and Angular versions)

Hi there,

I would like to share a Rails version of the chat example.

Rails 4.1 + Postgresql 9 + Puma + message_bus 2.0.0.beta (no REDIS)

Just in case someone would like to start from Rails.

I have made an angularJS version of message-bus.js, without Jquery dependency. here it goes the diff

Thanks for this amazing gem.

Edit

Docs should state clearly the existence of message-bus-ajax.js. See #97
It adds an native JS ajax adapter to message-bus.js that works with angular code without JQuery.

ActionDispatch::Flash shouldn't be required

IMO, app.middleware.insert_before(ActionDispatch::Flash, MessageBus::Rack::Middleware) is a bit naïve since you don't know that the project will have it required. In my rails-api project, it errors out:

=> Booting WEBrick
=> Rails 4.1.8 application starting in development on http://0.0.0.0:3000
=> Run `rails server -h` for more startup options
=> Notice: server is listening on all interfaces (0.0.0.0). Consider using 127.0.0.1 (--binding option)
=> Ctrl-C to shutdown server
Exiting
/Users/adam/.rvm/gems/ruby-2.2.0/gems/actionpack-4.1.8/lib/action_dispatch/middleware/stack.rb:125:in `assert_index': No such middleware to insert before: ActionDispatch::Flash (RuntimeError)
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/actionpack-4.1.8/lib/action_dispatch/middleware/stack.rb:88:in `insert'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/railties-4.1.8/lib/rails/configuration.rb:68:in `block in merge_into'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/railties-4.1.8/lib/rails/configuration.rb:67:in `each'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/railties-4.1.8/lib/rails/configuration.rb:67:in `merge_into'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/railties-4.1.8/lib/rails/engine.rb:497:in `app'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/railties-4.1.8/lib/rails/application/finisher.rb:36:in `block in <module:Finisher>'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/railties-4.1.8/lib/rails/initializable.rb:30:in `instance_exec'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/railties-4.1.8/lib/rails/initializable.rb:30:in `run'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/railties-4.1.8/lib/rails/initializable.rb:55:in `block in run_initializers'
    from /Users/adam/.rvm/rubies/ruby-2.2.0/lib/ruby/2.2.0/tsort.rb:226:in `block in tsort_each'
    from /Users/adam/.rvm/rubies/ruby-2.2.0/lib/ruby/2.2.0/tsort.rb:348:in `block (2 levels) in each_strongly_connected_component'
    from /Users/adam/.rvm/rubies/ruby-2.2.0/lib/ruby/2.2.0/tsort.rb:429:in `each_strongly_connected_component_from'
    from /Users/adam/.rvm/rubies/ruby-2.2.0/lib/ruby/2.2.0/tsort.rb:347:in `block in each_strongly_connected_component'
    from /Users/adam/.rvm/rubies/ruby-2.2.0/lib/ruby/2.2.0/tsort.rb:345:in `each'
    from /Users/adam/.rvm/rubies/ruby-2.2.0/lib/ruby/2.2.0/tsort.rb:345:in `call'
    from /Users/adam/.rvm/rubies/ruby-2.2.0/lib/ruby/2.2.0/tsort.rb:345:in `each_strongly_connected_component'
    from /Users/adam/.rvm/rubies/ruby-2.2.0/lib/ruby/2.2.0/tsort.rb:224:in `tsort_each'
    from /Users/adam/.rvm/rubies/ruby-2.2.0/lib/ruby/2.2.0/tsort.rb:203:in `tsort_each'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/railties-4.1.8/lib/rails/initializable.rb:54:in `run_initializers'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/railties-4.1.8/lib/rails/application.rb:300:in `initialize!'
    from /Users/adam/repos/arcreative/hydrogen-api/config/environment.rb:5:in `<top (required)>'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/activesupport-4.1.8/lib/active_support/dependencies.rb:247:in `require'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/activesupport-4.1.8/lib/active_support/dependencies.rb:247:in `block in require'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/activesupport-4.1.8/lib/active_support/dependencies.rb:232:in `load_dependency'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/activesupport-4.1.8/lib/active_support/dependencies.rb:247:in `require'
    from /Users/adam/repos/arcreative/hydrogen-api/config.ru:3:in `block in <main>'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/rack-1.5.2/lib/rack/builder.rb:55:in `instance_eval'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/rack-1.5.2/lib/rack/builder.rb:55:in `initialize'
    from /Users/adam/repos/arcreative/hydrogen-api/config.ru:in `new'
    from /Users/adam/repos/arcreative/hydrogen-api/config.ru:in `<main>'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/rack-1.5.2/lib/rack/builder.rb:49:in `eval'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/rack-1.5.2/lib/rack/builder.rb:49:in `new_from_string'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/rack-1.5.2/lib/rack/builder.rb:40:in `parse_file'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/rack-1.5.2/lib/rack/server.rb:277:in `build_app_and_options_from_config'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/rack-1.5.2/lib/rack/server.rb:199:in `app'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/railties-4.1.8/lib/rails/commands/server.rb:50:in `app'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/rack-1.5.2/lib/rack/server.rb:314:in `wrapped_app'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/railties-4.1.8/lib/rails/commands/server.rb:130:in `log_to_stdout'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/railties-4.1.8/lib/rails/commands/server.rb:67:in `start'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/railties-4.1.8/lib/rails/commands/commands_tasks.rb:81:in `block in server'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/railties-4.1.8/lib/rails/commands/commands_tasks.rb:76:in `tap'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/railties-4.1.8/lib/rails/commands/commands_tasks.rb:76:in `server'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/railties-4.1.8/lib/rails/commands/commands_tasks.rb:40:in `run_command!'
    from /Users/adam/.rvm/gems/ruby-2.2.0/gems/railties-4.1.8/lib/rails/commands.rb:17:in `<top (required)>'
    from bin/rails:4:in `require'
    from bin/rails:4:in `<main>'

Publish except to the current client_id

On a given action, I want to publish to all but not to the browser window that generated the event.

I know that the client_id is not know by Rails. Is there some way to make it known?

Then, client_filter should inform the client_id (I'm not sure, but maybe it is already on the message parameter?)

Postgres: Failed to process job: ERROR: more than one row returned by a subquery used as an expression

I have this config:

config   = Rails.configuration.database_configuration
    dbname = config[Rails.env]["database"]
    username = config[Rails.env]["username"]
    password = config[Rails.env]["password"]

    MessageBus.configure(
        backend: :postgres,
        backend_options: {
            user: username,
            password: password,
            dbname: dbname
        }
    )

I have edited /home/fer/.rvm/gems/ruby-2.0.0-p451/bundler/gems/message_bus-a92e31f275f0/lib/message_bus/backends/postgres.rb to log to the console with this:

  def exec_prepared(conn, *a)
    #fer debug
    puts "EXEC_PREPARED: #{a}"
    r = conn.exec_prepared(*a)
    ...

console logs (original error is in portuguese):

Loading development environment (Rails 4.1.14)
Switch to inspect mode.
EXEC_PREPARED: ["insert_message", ["/__mb_keepalive__/", "{\"data\":16148,\"user_ids\":[-1],\"group_ids\":null,\"client_ids\":null}"]]
EXEC_PREPARED: ["publish", ["_message_bus_0", "1687|1687|/__mb_keepalive__/|{\"data\":16148,\"user_ids\":[-1],\"group_ids\":null,\"client_ids\":null}"]]
EXEC_PREPARED: ["expire", [604800]]
EXEC_PREPARED: ["clear_channel_backlog", ["/__mb_keepalive__/", 1687, 1000]]
Failed to process job: ERRO:  mais de um registro foi retornado por uma subconsulta utilizada como uma expressão 
 ["/home/fer/.rvm/gems/ruby-2.0.0-p451/bundler/gems/message_bus-a92e31f275f0/lib/message_bus/backends/postgres.rb:131:in `exec_prepared'", "/home/fer/.rvm/gems/ruby-2.0.0-p451/bundler/gems/message_bus-a92e31f275f0/lib/message_bus/backends/postgres.rb:131:in `exec_prepared'", "/home/fer/.rvm/gems/ruby-2.0.0-p451/bundler/gems/message_bus-a92e31f275f0/lib/message_bus/backends/postgres.rb:45:in `block in clear_channel_backlog'", "/home/fer/.rvm/gems/ruby-2.0.0-p451/bundler/gems/message_bus-a92e31f275f0/lib/message_bus/backends/postgres.rb:161:in `hold'", "/home/fer/.rvm/gems/ruby-2.0.0-p451/bundler/gems/message_bus-a92e31f275f0/lib/message_bus/backends/postgres.rb:45:in `clear_channel_backlog'", "/home/fer/.rvm/gems/ruby-2.0.0-p451/bundler/gems/message_bus-a92e31f275f0/lib/message_bus/backends/postgres.rb:265:in `publish'", "/home/fer/.rvm/gems/ruby-2.0.0-p451/bundler/gems/message_bus-a92e31f275f0/lib/message_bus.rb:233:in `publish'", "/home/fer/.rvm/gems/ruby-2.0.0-p451/bundler/gems/message_bus-a92e31f275f0/lib/message_bus.rb:433:in `block in new_subscriber_thread'", "/home/fer/.rvm/gems/ruby-2.0.0-p451/bundler/gems/message_bus-a92e31f275f0/lib/message_bus/timer_thread.rb:98:in `call'", "/home/fer/.rvm/gems/ruby-2.0.0-p451/bundler/gems/message_bus-a92e31f275f0/lib/message_bus/timer_thread.rb:98:in `do_work'", "/home/fer/.rvm/gems/ruby-2.0.0-p451/bundler/gems/message_bus-a92e31f275f0/lib/message_bus/timer_thread.rb:29:in `block in initialize'"]

Any clues?

Working with cURL (or without an official client)

I've made a simple page that polls for a message:

<%= javascript_include_tag 'jquery' %>
<%= javascript_include_tag 'message-bus' %>
<h1>Test</h1>

<script type="application/javascript">
  MessageBus.start()

  MessageBus.callbackInterval = 500;
  MessageBus.subscribe("/channel", function(data){
    console.log(data)
  });
</script>

I'm using MessageBus with Rails 4.1, so I can bring up this page and in the rails console execute something like:

MessageBus.publish '/channel', 'main channel'

This works great, everything as expected. However I'm trying to reproduce this with cURL (with the intension of then implementing it in iOS) but I cannot get the cURL request to work.

Here is the request I'm trying

curl \
       -v \
       -H "X-SILENCE-LOGGER: true" \
       -H "Connection: keep-alive" \
       -X POST \
       -d '{"/channel": -1, "__seq": 1}' \
       "http://api.local.com:3000/message-bus/myclientid/poll?"

Here is what my rails server outputs

Started POST "/message-bus/myclientid/poll" for 127.0.0.1 at 2016-03-03 15:42:31 -0500                                                                             
Delivering messages [] to client myclientid for user  (chunked)
# MessageBus.publish("/channel", "test")
Delivering messages [] to client myclientid for user  (chunked)

Here is the output from cURL

*   Trying 127.0.0.1...
* Connected to api.local.com (127.0.0.1) port 3000 (#0)
> POST /message-bus/myclientid/poll? HTTP/1.1
> Host: api.local.com:3000
> User-Agent: curl/7.43.0
> Accept: */*
> X-SILENCE-LOGGER: true
> Connection: keep-alive
> Content-Type: application/x-www-form-urlencoded
> Content-Length: 28
>
* upload completely sent off: 28 out of 28 bytes
< HTTP/1.1 200 OK
< Cache-Control: must-revalidate, private, max-age=0
< Content-Type: text/plain; charset=utf-8
< Pragma: no-cache
< Expires: 0
< X-Content-Type-Options: nosniff
< Transfer-Encoding: chunked
< Connection: close
< Server: thin
<
[]
|
[]
|
* Closing connection 0

I'm going to try and implement this with ruby's Net::HTTP and see if I get the same problems there. I'm not an iOS dev, but I need to be able to explain how to long-poll outside of the JavaScript/Ruby clients.

Firefox long polling performance fix

add async: true to the ajax request object. Firefox seems to ignore the default so explicitly setting the flag enables the intended behavior better
message-bus.js around line 188

why the callback function(MessageBus.subscribe) called more than one times?

why the callback function called more than one times?

$(document).on('page:change',  function() {
  NProgress.done();

  MessageBus.start(); // call once at startup
  // how often do you want the callback to fire in ms
  MessageBus.callbackInterval = 500;
  MessageBus.subscribe("/notifications_count/" + App.access_token, function(data){
    // data shipped from server
    console.log("执行subscribe");
    var div, link, new_title, span, url;
    span = $(".notification-count span");
    link = $(".notification-count a");
    new_title = document.title.replace(/^\(\d+\) /, '');
    if (data.count > 0) {
      span.show();
      new_title = "(" + data.count + ") " + new_title;

      link.addClass("new");

      $.notify({
        title: "<strong>" + data.title + "</strong> ",
        message: "您有新订单(" + data.content + "),请赶快处理 <a href=\"" + data.content_path + "\">点击这里</a>"
      },{
        delay: 10000
      });
    } else {
      span.hide();
      link.removeClass("new");
    }
    span.text(data.count);
    return document.title = new_title;
  });
});

Authenticating messageBus requests

Thanks for this amazing gem!

We have an ember app with a token based authentication. I see that you generate a new clientID and register with message bus on every subscribe, safeguarding sniffing to an extent, but i failed to understand how authentication of these requests are done.

Assuming that my token has expired, or a new action has been performed by another user where the current user currently subscribed to the channel isn't authorized to access the data, I would still be receiving data from messageBus, which I don't think is ideal.

Is there a way we can return a 401 if the token was expired / user isn't authorized to receive data / token doesn't exist in the request in the existing setup? What are your thoughts regarding this?

"502 Bad Gateway" on unicorn USR2 restart

When about 10-15 message_bus' poll requests are made, the nginx reverse proxy in front of unicorn returns a 502 error. I've tried using proxy_buffering off but it didn't help.

Any clue why it is happening?

message-bus.js: Exception on Connection Timeout

There is an unhandled error case in the message-bus.js client:

If the long polling connection to the server times out without receiving the default [] reply, eg. because the network connection times out earlier, the following error is thrown:

[Error] TypeError: 'null' is not an object (evaluating 'object.length')
    each (jquery.js, line 633)
    success (message-bus.js, line 123)
    resolveWith (jquery.js, line 1017)
    done (jquery.js, line 7248)
    callback (jquery.js, line 8029)

This is because in this case the content of messages in message-bus.js:123 is not an array, so the each implementation in jQuery fails to call length on it. Checking for messages == null should fix this. I'll make a PR to fix this.

I noticed this, because Thin has a connection timeout of 30 seconds and the default MessageBus.long_polling_interval is also 30 seconds. So there is a race condition between those two timeouts. I would also recommend to change the default polling interval to 29 seconds or maybe 25 seconds (that's what Faye uses in all examples).

getting a 404 on the polling

Everything works in development when I run with unicorn, without the addition to my unicorn.rb recommended in the readme. However, in production-like circumstances (our staging server) I'm seeing extremely long waits before a published message hits the client. I'm running Rails 4.2 with the master branch of message_bus.

So, in an effort to get to the bottom of that, I made the addition to the unicorn configuration file, and immediately started having trouble. The first problem was that Rails complained about missing the message-bus.js asset, which I worked around by copying it out of the gem. Now I'm getting 404's on the polling requests. The output of rake middleware is as follows:

use Honeybadger::Rack::UserInformer
use Honeybadger::Rack::UserFeedback
use Honeybadger::Rack::ErrorNotifier
use Rack::Sendfile
use ActionDispatch::Static
use #<ActiveSupport::Cache::Strategy::LocalCache::Middleware:0x007fb3ec49fa50>
use Rack::Runtime
use Rack::MethodOverride
use ActionDispatch::RequestId
use Rails::Rack::Logger
use ActionDispatch::ShowExceptions
use ActionDispatch::DebugExceptions
use BetterErrors::Middleware
use ActionDispatch::RemoteIp
use ActionDispatch::Reloader
use ActionDispatch::Callbacks
use ActiveRecord::Migration::CheckPending
use ActiveRecord::ConnectionAdapters::ConnectionManagement
use ActionDispatch::Cookies
use ActionDispatch::Session::CookieStore
use MessageBus::Rack::Middleware
use ActionDispatch::Flash
use ActionDispatch::ParamsParser
use Rack::Head
use Rack::ConditionalGet
use Rack::ETag
use Warden::Manager
run MyApp::Application.routes

Any ideas of what I'm doing wrong here?

How to publish data to all users except current one?

Is there a way to just pass an user ID to the JavaScript method MessageBus.subscribe and the Ruby method MessageBus.publish and the current user becomes all updates except the updates "signed" with his user ID.

Message_bus threads inside initializer and multi-threaded server

I have hit this situation described on #91 by derekyau:

... when running message bus inside an initializer (like its done in the chat app example), since my web server is threaded (unicorn, etc.) I'm running into this issue where when a message comes in, multiple threads receive and process it at the same time. This often happens in the same action being called multiple times. I know I could guard in the code against this, but is there a better way of having only one thread take it and the rest don't receive it? Alternatively I guess I can setup a side process that is single threaded... whats best practice here?

The problem is reproducible running this repo branch

I cannot understand why running this:

def expire_old_sessions
        $online.each do |name, time|
            if (Time.now - (5*60)) > time
                puts "SESSION EXPIRED FOR [#{name}] - #{time}"
                MessageBus.publish "/presence", {leave: name}
            end
        end
end
Thread.new do
    sleep 5
    while true
        expire_old_sessions
        sleep 2
    end
end

Leads to 2 callbacks calls at:

MessageBus.subscribe "/presence" do |msg|
        if user = msg.data["enter"]
            $online[user] = Time.now
            puts "/PRESENCE ENTER: #{user}"
        end
        if user = msg.data["leave"]
            if $online[user]
                puts "/PRESENCE LEAVE: #{user}"
                $online.delete user
            end
        end
end

Run Output:

/home/fer/.rvm/rubies/ruby-2.0.0-p451/bin/ruby -e at_exit{sleep(1)};$stdout.sync=true;$stderr.sync=true;load($0=ARGV.shift) /home/fer/.rvm/gems/ruby-2.0.0-p451/gems/ruby-debug-ide-0.4.23.beta1/bin/rdebug-ide --disable-int-handler --port 43547 --dispatcher-port 56329 -- /home/fer/RubymineProjects/messagebus-chat/bin/rails server -b 0.0.0.0 -p 3000 -e fake_production
Fast Debugger (ruby-debug-ide 0.4.23.beta1, debase 0.0.9) listens on 127.0.0.1:43547
=> Booting Puma
=> Rails 4.1.15 application starting in fake_production on http://0.0.0.0:3000
=> Run rails server -h for more startup options
=> Notice: server is listening on all interfaces (0.0.0.0). Consider using 127.0.0.1 (--binding option)
=> Ctrl-C to shutdown server
[27346] Puma starting in cluster mode...
[27346] * Version 3.3.0 (ruby 2.0.0-p451), codename: Jovial Platypus
[27346] * Min threads: 1, max threads: 1
[27346] * Environment: fake_production
[27346] * Process workers: 1
[27346] * Phased restart available
[27346] * Listening on tcp://0.0.0.0:3000
[27346] Use Ctrl-C to stop
Fast Debugger (ruby-debug-ide 0.4.23.beta1, debase 0.0.9) listens on 127.0.0.1:60480
========= ON WORKER BOOT ==============
[27346] - Worker 0 (pid: 27360) booted, phase: 0
SESSION EXPIRED FOR [donald] - 2016-04-26 00:30:35 -0300
/PRESENCE LEAVE: donald
SESSION EXPIRED FOR [hilary] - 2016-04-25 00:30:35 -0300
/PRESENCE LEAVE: donald
/PRESENCE LEAVE: hilary
/PRESENCE LEAVE: hilary
SESSION EXPIRED FOR [jeffey] - 2016-04-27 00:25:45 -0300
/PRESENCE LEAVE: jeffey
/PRESENCE LEAVE: jeffey

SESSION EXPIRED FOR [joseph] - 2016-04-27 00:25:55 -0300
/PRESENCE LEAVE: joseph
/PRESENCE LEAVE: joseph

I have tested it with redis and postgres backends.
Any clues?

Question: is it possible to get, on subscribe, the messages that were published previously ?

I would like to be able, on subscribe from the JS, to get messages that were published in the previous 60 seconds prior to the subscribe.

Is it possible to do that with message bus? I saw you can pass a last Id to the subscribe function so I said to myself:

  • lets use a timestamps as message id
  • when subscribing, pass the timestamp corresponding to 60.seconds.ago

But in order for this to work

  • MessageBus should allow me to set messages id
  • MessageBus should not complain if the timestamp I'm passing as a lastid does not correspond to any message (because no message was published with that exact timestamp)

Would this be possible ? Can you suggest any alternative solution ?
Thanks you.

improve readme on how is user_id_lookup used

if I understand correctly, this:

MessageBus.configure(user_id_lookup: proc do |env|
  # return the user id here
end)

is only checked when client pooling reaches the server, and before processing the pooling request.
Correct me if I am wrong!

Been so, Would you accept a pull request for the docs explaining it? Something like:

MessageBus.configure(user_id_lookup: proc do |env|
   # this lookup occurs on JS-client poolings, so that server can retrieve backlog 
   # for the client considering/matching/filtering user_id set on published message
   # if user_id is not set on publish time, any user_id returned here will receive the message

   # return the user id here
end)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.