Giter VIP home page Giter VIP logo

amqp's People

Contributors

abscondment avatar aledalgrande avatar amutz avatar arvicco avatar bitdeli-chef avatar careo avatar carlhoerberg avatar celldee avatar codahale avatar cremes avatar dougbarth avatar dpritchett avatar dpw avatar grantr avatar jakedouglas avatar kytrinyx avatar leikind avatar majek avatar markiz avatar mhanne avatar michaelklishin avatar pairing avatar quixoten avatar rhex avatar rianmcguire avatar skaes avatar tensho avatar tmm1 avatar utilum avatar wjossey avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

amqp's Issues

Reconnect / recover on broken connection

Question: what is the API for re-establishing a severed connection? Looking through the source, it looks like reconnect method is raising a "not implemented" exception. Is the plan to add that as a flag? How do we, currently, go about re-establishing a broken connection?

Avoid depending on cgi library

With amqp:// connection string support contributor decided to pull in cgi from standard Ruby library. It is slow, unmaintained and has all kinds of bad reputation.

We need to switch to addressible gem: it has very stable API, is efficient and pulls no other dependencies with it.

Find which AMQP methods should be synchronous

Some other operations have similar issues. E.g. the close method of
MQ does a channel.close - a synchronous operation, but it is not
exposed as such in the API. The whole channel/connection close
business is pretty ugly, e.g. the way that closing the last channel
implicitly closes the connection.

Cannot see a way to gracefully retry initial connection

I am having trouble getting AMQP to reconnect if a connection fails. Automatic reconnects seem to happen okay after an initial connection, but I can't find a way to make the script recover from an initial failure and reconnect until it works:

loop do
  begin
    AMQP.start do 
      # do stuff
    end
  rescue Exception => e
    log "Failed to connect, #{e}"
    # uncommenting this makes no difference
    #AMQP.stop { EM.stop }
  end
  sleep 5 # try again in 5 seconds
end

On ruby 1.9.2, this hangs forever the second time I get to AMQP.start if I have not yet started the broker. Without catching the exception, the script of course exits, and if I catch it and do nothing (no loop), it continues to block forever. Am I missing something?

Heartbeats broken for long running tasks (Fix included)

Right now the init_heartbeat method (lib/amqp/client.rb) looks something like this:

def init_heartbeat
  @last_server_heartbeat = Time.now

  @timer ||= EM::PeriodicTimer.new(@settings[:heartbeat]) do
    if connected?
      if @last_server_heartbeat < (Time.now - (@settings[:heartbeat] * 2))
        log "Reconnecting due to missing server heartbeats"
        reconnect(true)
      else
        send AMQP::Frame::Heartbeat.new
      end
    end
  end
end

However, when you send the hearbeat frame you don't update the @last_server_heartbeat variable. That causes delivery tags for long running jobs to be reset, and all sorts of other problems. You can fix this by updating the @last_server_heartbeat time when you send the heartbeat frame. Like so:

def init_heartbeat
  @last_server_heartbeat = Time.now

  @timer ||= EM::PeriodicTimer.new(@settings[:heartbeat]) do
    if connected?
      if @last_server_heartbeat < (Time.now - (@settings[:heartbeat] * 2))
        log "Reconnecting due to missing server heartbeats"
        reconnect(true)
      else
        @last_server_heartbeat = Time.now
        send AMQP::Frame::Heartbeat.new
      end
    end
  end
end

Can't connect to RabbitMQ w/ SSL

I am trying to get Ruby AMQP to connect to RabbitMQ over SSL to no avail.

I am using the RabbitMQ settings and Ruby code block listed here:
https://gist.github.com/839690

I get the following error when running the code block in my Rails app's console:

AMQP::Error: Could not connect to server 127.0.0.1:5671
    from /Users/baron/.rvm/gems/ruby-1.9.2-p136@sl-profiles/bundler/gems/amqp-38622e1bc721/lib/amqp/client.rb:26:in `block in initialize'
    from /Users/baron/.rvm/gems/ruby-1.9.2-p136@sl-profiles/bundler/gems/amqp-38622e1bc721/lib/amqp/client.rb:87:in `call'
    from /Users/baron/.rvm/gems/ruby-1.9.2-p136@sl-profiles/bundler/gems/amqp-38622e1bc721/lib/amqp/client.rb:87:in `block in unbind'
    from /Users/baron/.rvm/gems/ruby-1.9.2-p136@sl-profiles/gems/eventmachine-0.12.10/lib/eventmachine.rb:996:in `call'
    from /Users/baron/.rvm/gems/ruby-1.9.2-p136@sl-profiles/gems/eventmachine-0.12.10/lib/eventmachine.rb:996:in `block in run_deferred_callbacks'
    from /Users/baron/.rvm/gems/ruby-1.9.2-p136@sl-profiles/gems/eventmachine-0.12.10/lib/eventmachine.rb:996:in `each'
    from /Users/baron/.rvm/gems/ruby-1.9.2-p136@sl-profiles/gems/eventmachine-0.12.10/lib/eventmachine.rb:996:in `run_deferred_callbacks'
    from /Users/baron/.rvm/gems/ruby-1.9.2-p136@sl-profiles/gems/eventmachine-0.12.10/lib/eventmachine.rb:256:in `run_machine'
    from /Users/baron/.rvm/gems/ruby-1.9.2-p136@sl-profiles/gems/eventmachine-0.12.10/lib/eventmachine.rb:256:in `run'
    from /Users/baron/.rvm/gems/ruby-1.9.2-p136@sl-profiles/bundler/gems/amqp-38622e1bc721/lib/amqp/connection.rb:83:in `start

I am running ruby-amqp edge as of Feb 22, 2011. That's commit 38622e1.

I actually need to be able to connect using client-side SSL cert as well, but I am just trying to get classic SSL to work to start with.

rbx issues with 0.7.x-stable

I could fix problems with amqp-spec but I still can't get spec suite to pass on rbx for 0.7.x.

I keep getting something like this in error reports:

=ERROR REPORT==== 20-Apr-2011::00:00:32 ===
exception on TCP connection <0.27501.3> from 127.0.0.1:57057
{bad_payload,1,0,8,<<0,10,0,40,1,47,0,0,0>>}

Spent two hours trying to figure out, no dice. Probably something is wrong with serialization.

I disabled travis rbx builds for 0.7.x for now.

Qpid compatibility

Possibility of Qpid compatibility is not clear (Qpid has 2 versions/implementations, one of them implementation of AMQP 0.9.1 and is in Java and the other one is AMQP 0.10.0 implementation in C++). But we will see what we can do (given there is a maintained Qpid version for 0.9.1).

Add :prefetch option to AMQP::Channel constructor

It was suggested to add :prefetch option to AMQP::Channel constructor. Even thought 0.6/0.7 do not seem to support this, it seems that there is code out there that looks like this:

ch = AMQP::Channel.new(connection)
ch.prefetch(10)

so we need to wrap #prefetch implementation into AMQP::Channel#once_open and making it a constructor option seems natural.

amqp-failover: A gem that adds multi-server support with failover and fallback

I'm posting this here in hopes that my work might be of some use for the future development of the amqp gem. So go ahead and rip my (hopefully not too horrible) code apart. Hopefully a failover/fallback feature will make it back into the amqp gem itself sometime soon :)

https://github.com/jimeh/amqp-failover

For some background; We needed failover and fallback capabilities in our AMQP workers. I originally achieved it through a quite messy and untestable hacked version of AMQP::BasicClient. I just finished rewriting all of it to a proper gem, with proper specs to make sure everything works.

I decided to make a gem rather than fork amqp due to the rapid progress of development here as of late, and also to simplify our production deployment process. It currently works with amqp 0.7.0, but I have not tested it against the master branch yet.

Properly handle :passive option

Right now when exchange is created with :passive option, InvalidOptionError kicks in even though :passive parameter on Exchange.Declare is not supposed to declare anything. We need to handle this in MQ.topic, MQ.direct and so on.

Calling Queue#status immediately following Channel#queue breaks in async scenario

Repro:

  1. Channel#queue (Queue#new) sends Protocol::Queue::Declare and sets Queue's @status to :unfinished
  2. Queue#status also sends Protocol::Queue::Declare
  3. Channel#handle_method processes the first Protocol::Queue::DeclareOk response, looking for a matching queue in the :unfinished state (1), handing off the response to Queue#receive_status which in turn sets @status to :finished
  4. The second Protocol::Queue::DeclareOk response comes through, Channel#handle_method doesn't find any matching queue in the :unfinished state, and an exception gets thrown from nil.receive_status

Rough code:

channel = ::AMQP::Channel.new
queue = channel.queue("foobar") # Protocol::Queue::Declare sent

# using these to simulate synchronous call
m = Mutex.new
cv = ConditionVariable.new
msgs = 0
queue.status { |msgs, _| cv.signal } # Another Protocol::Queue::Declare sent
m.synchronize { cv.wait(m) }

puts "queue #{queue.name} has #{msgs} msgs"

Callback isn't called if the entity is already declared

When calling the MQ declaraction methods (queue, fanout, topic,
etc.), if you supply a callback, and the queue or exchange is not known,
everything is fine - the callback is called as expected. But if the
queue or exchange is already known, the callback is never called; it
simply gets ignored. All callbacks should get called (when the reply
arrives, or immediately if we have already got the reply).

0.8.0.rc1 installation fails on JRuby 1.6.0 with uncaught java.lang.ArrayIndexOutOfBoundsException

I'm working with Windows 7, Jruby 1.6.
When i gem install amqp -v '0.8.0***'
It comes out this error:

E:>gem install amqp -v '0.8.0.rc1'
System.java:-2:in arraycopy': java.lang.ArrayIndexOutOfBoundsException from DefaultResolver.java:111:inmakeTime'
from DefaultResolver.java:277:in create' from DefaultResolver.java:317:inhandleScalar'
from DefaultResolver.java:435:in orgHandler' from DefaultResolver.java:455:innode_import'
from DefaultResolver$s$1$0$node_import.gen:65535:in `call'
<...rest of those output...>

But version 0.7 is okay.

BTW: Could you please update some uptodate examples? Some of those including in the package can not run properly.

Publishing a lot of messages blocks the reactor

From the mailing list, "Blocking while publishing", by Theo:

What happens is that when you run

 @large_set_of_ids.each do | id |
   q.publish(id)
 end

you're blocking the EventMachine reactor. Blocking the reactor means
that EventMachine can't do anything else, like checking for incoming
data, or trigger timers. It takes some time to loop over all the IDs
(not very long in the grand scheme of things, but it's not
instantaneous if large means thousands or hundreds of thousands), and
it takes (comparatively) a very long time for the AMQP code to encode
each message and send it off to the connection. I assume the reason
your consumer does not see the messages is that since the reactor is
blocked by all the calls to #publish, EventMachine hasn't had the
opportunity to send anything over the connection socket.

I made the same mistake myself when I started using the AMQP gem, I
didn't understand why I could publish 1000 messages per second when my
consumer didn't get more than 10 or so. As the others have mentioned
the solution is to publish only one message per tick.

Solution: http://rdoc.info/github/eventmachine/eventmachine/master/EventMachine/Iterator

We should put a note about it to the README.

Queue#reset leaks consumer tags

Mainly applies to when reconnections without application restarts occur. This manifests for us in our monitoring systems that are looking for N consumers of a particular queue in RabbitMQ to be present, and N starts growing as transient network errors happen (due to heartbeat being enabled).

Solution is to Queue#unsubscribe and Queue#cancelled inside Queue#reset.

Small patch here: cloudcrowd@417700e

Allow for 0.12.10+ EM gems in gemspec

Currently can't use the AMQP gem in any gemfile / environment which is using the latest beta builds of eventmachine (at beta3). Example:

Bundler could not find compatible versions for gem "eventmachine":
In Gemfile:
amqp depends on
eventmachine (~> 0.12.10)

goliath depends on
  eventmachine (1.0.0.beta.3)

Can you guys change or remove the version lock? There is no reason why the current gem should not work with more recent versions of EM.

Some parts of documentation seems to be missing on RubyDoc.info

Cf http://rubydoc.info/github/ruby-amqp/amqp/master/AMQP/Exchange:publish & the actual documentation in source code. One of missing parts is:

# @option [Boolean] :immediate (false) This flag tells the server how to react if the message cannot be
#                                      routed to a queue consumer immediately.  If this flag is set, the
#                                      server will return an undeliverable message with a Return method.
#                                      If this flag is zero, the server will queue the message, but with
#                                      no guarantee that it will ever be consumed.

Conflict with builder 2.1.2

Something is very wrong with amqp + builder 2.1.2 (ALOT of stuff depends on that version, among them rails 3.0.x). I haven't tested ruby 1.8.7, I'm running 1.9.2.

Just check this out:

require 'rubygems'
require 'mq'
require 'builder'

builder = Builder::XmlMarkup.new
xml = builder.person { |b| b.name("Jim"); b.phone("555-1234") }
puts xml

Gives:

NameError: uninitialized constant Builder::XmlBase::Symbol

method method_missing   in xmlbase.rb at line 40
method <main>   in untitled at line 7

Lazy-loading

Currently the whole library is loaded to the memory, that's not good at all. User should require what he wants to use explicitly. Michael mentioned Kernel#autoload, but as it isn't thread-safe, I'd rather leave it to user to do it manually โ€“ it's more transparent anyway.

Nil pointer error in AMQP::Queue.new

If you try to pass a nil queue name to AMQP::Queue.new, the initialize method bombs with nil.empty? not being a valid method. This is on version 0.7.1 of the gem. That line is setting the name ivar to nil if the passed in name parameter is an empty string. It seems that it should check that name is not nil first.

@name = name unless name.empty?

NoMethodError: You have a nil object when you didn't expect it! You might have expected an instance of Array. The error occurred while evaluating nil.empty?
[GEM_ROOT]/gems/amqp-0.7.1/lib/amqp/queue.rb:72:in `initialize'
[GEM_ROOT]/gems/amqp-0.7.1/lib/amqp/channel.rb:619:in `new'
[GEM_ROOT]/gems/amqp-0.7.1/lib/amqp/channel.rb:619:in `queue'

Migration guide for 0.8.0

We need to come up with a migration guide for 0.8.0. Even if it ends up being a drop-in replacement for most people, there are certain practices in old examples that we should stop promoting. And it overlaps with other guides: AMQP 0.8.0 vs. 0.9.1, old RabbitMQ versions that Debian and Ubuntu ship with are just two examples.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.