We (at work) often have the necessity of defining complex RabbitMQ topologies of exchanges and queues. RabbitMQ is pretty clear on the pattern here: declare everything you need every time you connect since declaring is idempotent. We also have a hard requirement of trying to connect to multiple RabbitMQ URLs until one connection succeeds.
The code that we currently have to do things like declare an exchange and bind it to another exchange before declaring the consumer queue and starting the producer looks like this:
with {:ok, conn} <- AMQP.Connection.open(...),
{:ok, chan} <- AMQP.Channel.open(conn),
:ok <- declare_exchange(chan),
:ok <- bind_exchange(chan) do
Broadway.start_link(...)
end
This has a huge disadvantage: it makes our Broadway pipeline synchronous when starting. That is, the pipeline won't start unless RabbitMQ is available right away. This goes against the good principle of starting the process and connecting in the background with potential backoff (see this great article).
What we would like to do instead is to be able to declare the necessary RabbitMQ topology every time the producer connects to RabbitMQ. This will free us of the synchronization point and generally makes things more coherent.
To achieve this, we can go two ways: either we provide a way to pass in a generic piece of code that takes a AMQP channel, or we come up with a schema for additional options that lets us declare arbitrary topologies.
Note that we're only interested in topologies that are strictly related to the pipeline here. This means that we don't care about being able to declare additional queues for example, since the pipeline can only consume from a single queue.
Option 1: custom code
The first option is to pass a generic piece of code that takes the AMQP channel. The API I propose is a new option :rabbitmq_setup_fun
that takes an anonymous function or an MFA tuple.
rabbitmq_setup_fun: (AMQP.Channel.t() -> :ok | {:error, reason}) | {module(), atom(), [term()]}
An example of its usage:
rabbitmq_setup_fun: fn chan ->
with :ok <- AMQP.Exchange.declare(chan, "my-exchange", :direct),
:ok <- AMQP.Exchange.bind(chan, "other-exchange", "my-exchange", routing_key: "#"),
do: :ok
end
or with MFA:
rabbitmq_setup_fun: {__MODULE__, :declare_exchange, [_routing_key = "#"]}
# In the module:
def declare_exchange(chan, routing_key) do
with :ok <- AMQP.Exchange.declare(chan, "my-exchange", :direct),
:ok <- AMQP.Exchange.bind(chan, "other-exchange", "my-exchange", routing_key: routing_key),
do: :ok
end
Option 2: new options to declare exchanges and do exchange-to-exchange bindings
The alternative to custom code is to provide more options that let us declare arbitrary topologies. The nice thing is that in RabbitMQ you can only have bindings that look like this:
exchange1 -> exchange2 -> ... -> exchangeN -> queue
That is, you can only bind to one queue at the end of the "exchanges pipeline". This means we only need to support two things:
- Declaring an arbitrary number of exchanges
- Declaring an arbitrary number of exchange-to-exchange bindings
We don't need to support exchange-to-queue bindings since we already support that through the :bindings
option.
Declaring exchanges
What I propose is to have a new :declare_exchanges
option to declare exchanges:
declare_exchanges: [{name :: String.t(), type :: :direct | :topic | :headers | :fanout, options :: keyword()}]
For example:
declare_exchanges: [
{"my-exchange", :topic, durable: true},
{"my-other-exchange", :headers, durable: true, internal: true}
]
Binding exchanges to other exchanges
I have two different proposals here.
Option 1: add a new option :exchange_bindings
(to mirror the name of the already existing :bindings
) option.
exchange_bindings: [{source :: String.t(), dest :: String.t(), options :: keyword()}]
Option 2: modify the current :bindings
option to support exchange-to-exchange bindings as well. Right now this option supports a list of {exchange_name :: String.t(), options :: keyword()}
. What I propose is to switch to three-element tuples like the one above: {source :: String.t(), dest :: String.t(), options :: keyword()}
. However, we would have a special possible value for dest
which is the atom :queue
which represents the queue used in the :queue
option.
All thoughts are welcome, excited to have this discussion!
cc @josevalim @msaraiva @wojtekmach