Giter VIP home page Giter VIP logo

rabbitmq-mock's Introduction

RabbitMQ-mock

Build Status Coverage Status Maven Central JitPack License FOSSA Status

Mock for RabbitMQ Java amqp-client.

Compatible with versions 4.0.0 to 5+ of com.rabbitmq:amqp-client

Compatible with versions 3.6.3 to 4.0.0 with the com.github.fridujo.rabbitmq.mock.compatibility package.

Motivation

This project aims to emulate RabbitMQ behavior for test purposes, through com.rabbitmq.client.ConnectionFactory with MockConnectionFactory.

However today, you will have more robust results using a real RabbitMQ instance through the use of Testcontainers.

If Docker is not an acceptable option, you can still rely on RabbitMQ-mock.

Example Use

Replace the use of com.rabbitmq.client.ConnectionFactory by MockConnectionFactory

ConnectionFactory factory = new MockConnectionFactory();
try (Connection conn = factory.newConnection()) {
    try (Channel channel = conn.createChannel()) {
        GetResponse response = channel.basicGet(queueName, autoAck);
        byte[] body = response.getBody();
        long deliveryTag = response.getEnvelope().getDeliveryTag();

        // Do what you need with the body

        channel.basicAck(deliveryTag, false);
    }
}

More details in integration-test

With Spring

Change underlying RabbitMQ ConnectionFactory by MockConnectionFactory

@Configuration
@Import(AppConfiguration.class)
class TestConfiguration {
    @Bean
    ConnectionFactory connectionFactory() {
        return new CachingConnectionFactory(new MockConnectionFactory());
    }
}

More details in integration-test

Contribute

Any contribution is greatly appreciated. Please check out the guide for more details.

Open in Gitpod

Getting Started

Maven

Add the following dependency to your pom.xml

<dependency>
    <groupId>com.github.fridujo</groupId>
    <artifactId>rabbitmq-mock</artifactId>
    <version>${rabbitmq-mock.version}</version>
    <scope>test</scope>
</dependency>

Gradle

Add the following dependency to your build.gradle

repositories {
	mavenCentral()
}

// ...

dependencies {
	// ...
	testCompile('com.github.fridujo:rabbitmq-mock:$rabbitmqMockVersion')
	// ...
}

Building from Source

You need [JDK-17(https://adoptium.net/temurin/releases/?version=17&package=jdk) to build RabbitMQ-Mock. The project can be built with Maven using the following command.

./mvnw install

Tests are split in:

  • unit tests covering features and borderline cases: mvn test
  • integration tests, seatbelts for integration with Spring and Spring-Boot. These tests use the maven-invoker-plugin to launch the same project (in src/it/spring_boot) with different versions of the dependencies: mvn integration-test
  • mutation tests, to help understand what is missing in test assertions: mvn org.pitest:pitest-maven:mutationCoverage

Using the latest SNAPSHOT

The master of the project pushes SNAPSHOTs in Sonatype's repo.

To use the latest master build add Sonatype OSS snapshot repository, for Maven:

<repositories>
    ...
    <repository>
        <id>sonatype-oss-spanshots</id>
        <url>https://oss.sonatype.org/content/repositories/snapshots</url>
    </repository>
</repositories>

For Gradle:

repositories {
    // ...
    maven {
        url "https://oss.sonatype.org/content/repositories/snapshots"
    }
}

License

FOSSA Status

rabbitmq-mock's People

Contributors

asomov avatar codacy-badger avatar dependabot-preview[bot] avatar dependabot[bot] avatar fossabot avatar hjohn avatar kaywu avatar ledoyen avatar lessonz avatar madmuffin1 avatar pawelebe avatar redmu avatar snyk-bot avatar thorin avatar tobilarscheid avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rabbitmq-mock's Issues

MockConnection#close closes all queues

We are using multiple connections from the connection factory. In some cases, when we have overlapping connections we don't receive messages on the consumers anymore.

In the test singleConnection we only have one connection and our reads work as expected.

In the test multiConnection_sequential we have multiple connections but the first one gets closed before the second one is initialized. This works fine too.

In the test multiConnection_overlapping we start the second connection before we close the first one. With this setup, the consumer doesn't receive messages.

Click to expand

import com.github.fridujo.rabbitmq.mock.MockConnectionFactory;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;

public class MockConnectionTest {

    private static final String EXCHANGE = "exchange";
    private static final String QUEUE = "queue";

    @Test
    void multiConnection_overlapping() throws Exception {
        var factory = new MockConnectionFactory();

        var conn = factory.newConnection();
        var conn2 = factory.newConnection();

        createQueueAndPublish(conn);
        conn.close();

        var future = new CompletableFuture<Void>();
        readFromQueue(conn2, future);
        future.get(1000, TimeUnit.MILLISECONDS);
    }

    @Test
    void multiConnection_sequential() throws Exception {
        var factory = new MockConnectionFactory();

        var conn = factory.newConnection();
        createQueueAndPublish(conn);
        conn.close();

        var conn2 = factory.newConnection();
        var future = new CompletableFuture<Void>();
        readFromQueue(conn2, future);
        future.get(1000, TimeUnit.MILLISECONDS);
    }

    @Test
    void singleConnection() throws Exception {
        var factory = new MockConnectionFactory();
        var future = new CompletableFuture<Void>();

        var conn = factory.newConnection();
        createQueueAndPublish(conn);

        readFromQueue(conn, future);
        future.get(1000, TimeUnit.MILLISECONDS);
    }


    void createQueueAndPublish(Connection conn) throws IOException {
        var channel = conn.createChannel();

        channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare(QUEUE, false, true, true, Map.of());
        channel.queueBind(QUEUE, EXCHANGE, "");

        channel.basicPublish(EXCHANGE, "", new AMQP.BasicProperties(), "hello".getBytes());
    }

    void readFromQueue(Connection conn, CompletableFuture<Void> future) throws IOException {
        var channel = conn.createChannel();

        var consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                future.complete(null);
                super.handleDelivery(consumerTag, envelope, properties, body);
            }
        };

        channel.basicConsume(QUEUE, true, consumer);
    }
}

Deleting unexisting queue causes NPE

Whereas the method should be idempotent.
Stacktrace:

Caused by: java.lang.NullPointerException: null
	at com.github.fridujo.rabbitmq.mock.MockNode.queueDelete(MockNode.java:90)
	at com.github.fridujo.rabbitmq.mock.MockChannel.queueDelete(MockChannel.java:323)
	at com.github.fridujo.rabbitmq.mock.MockChannel.queueDelete(MockChannel.java:318)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1140)
	at com.sun.proxy.$Proxy86.queueDelete(Unknown Source)
	at org.springframework.amqp.rabbit.core.RabbitAdmin.lambda$deleteQueue$3(RabbitAdmin.java:315)
	at org.springframework.amqp.rabbit.core.RabbitTemplate.invokeAction(RabbitTemplate.java:2135)
	at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2094)

Messages must be consumed before another Message is published

For a spring boot test case i do send multiple messages to an exchange routing to an single queue in a test setup.
While verifying the messages in the queue only the first message can be consumed. Any further rabbitTemplate.receive() calls for the remaining messages deliver null (with a sufficient timeout). The tests do work though if i immediately consume messages after sending before publishing the next one. Regardless of this behavior being intended... it might be useful to be able to queue multiple messages at once

INFO REQ: How does one test the shutdown listener

I am looking for advice how would anyone go about testing a shutdown listener. For example:

Connection connection = rabbitConnectionFactory.getNewConnection(rabbitQueueConfiguration);

// Auto reconnect on service failure
connection.addShutdownListener(cause -> {
    log.error("Shutdown signal received from RabbitMQ", cause);

Is it possible to mock such behavior during test?

MockFanoutExchange constructor accessibility

Hello everyone,

First of all thank you to the maintainers for your amazing piece of software - it has saved us tons of time and seems to be quite nicely extensible.

Is there a reason why com.github.fridujo.rabbitmq.mock.exchange.MockFanoutExchange.MockFanoutExchange() is protected?

I only want to implement a delegating "x-delayed-message" mock where the underlying exchange type is defined by "x-delayed-type" AmqArgument.

Best regards,
Vasil Stoyanov

default exchange name is "default" instead of ""

When publishing a message to the default exchange using

@Override
public void publish(String previousExchangeName, String routingKey, AMQP.BasicProperties props, byte[] body) {
node.getQueue(routingKey).ifPresent(q -> q.publish("default", routingKey, props, body));
}

the exchangeName is set to "default". According to the documentation the name of the default exchange should be "".

A problem arose while setting up a test in a module of ours, where certain scenarios will cause a message to be re-published and acked. in this case, we take the Envelope.exchange property to determine which exchange to publish to.

rabbitmq-mock will then complain with a java.lang.IllegalArgumentException: No exchange named default error, since the default exchange key is correctly set to "".

public MockNode() {
exchanges.put("", defaultExchange);
}

I suggest changing the static "default" in MockDefaultExchange to "".

Can't Receive Messages When Custom MessageConverter In Use

Perhaps I'm misattributing my failures, but when we moved to using a Jackson2JsonMessageConverter, we're seeing some failures that would seem to indicate our listeners aren't using the configured MessageConverter. I looked through the source, but it's not readily apparent to me where that would happen. Did I miss something?

New RabbitMQ Streams Feature

As of RabbitMQ 3.9 there is a new feature called Streams.

I was testing a small setup that I have with Apache Camel and RabbitMQ Streams until I found a configuration that works. I then chose to swap the ConnectionFactory with the Mock version. Almost everything works as expected but I will leave the few details that this mocking tool might want to add.

Regarding declaration, there is a new queue type: stream (the mock still works but it will become important for the behavior under testing). There are many other details but those are policy driven. I don't know if it these details would be interesting for this mock tool. Binding works exactly the same as any classic queue and the exchanges.

As far as the declaration goes, Streams

  • must have autoAck set to false,
  • must store all it's messages in an addressable fashion
    • by index (for offsets)
    • by timestamp (for dates)

Regarding the message persistence, Streams are hardcoded to persist everything and consumers cannot cause messages to disappear from the Stream. RabbitMQ does have a mechanism to shorten the Stream itself by way of policies (max-age, etc). Again, I don't know if these features are supposed to come in handy for a mock tool.

The queued messages must be accessible by a few methods

  • "first" starts at the first available message;
  • "last" starts at last written chunk (chunking is the storage segmentation mechanism, which would have to be incorporated for this feature to make sense, which I don't know if it is overkill for this tool);
  • "next" starts at the end and will consume the first new offset written to the stream, this is the default if the offset is not declared;
  • numerical offset (long) is self explanatory (0 based indexing);
  • timestamp (java.util.Date) will start consuming from the closest message timestamp;
  • an interval string (e.g.: "1D 20h 15m 22s") will start consuming from the date resulting in the subtraction of interval from the current date.

Regarding consumers, they will have to define some form of Stream offset to start reading from (the option x-stream-offset or "next" if undeclared) which can be populated by the different kinds of values described above.

In addition, Stream consumers

  • must have qos (a prefetch size) defined on the channel,
  • must manually acknowledge the delivery,
  • must somehow store the offset in a durable medium to survive a reboot (this feature is supposedly handled by RabbitMQ but the AMQP0.9.1 spec and clients do not make use of this),
  • must be provided the current offset of the message in the message.properties.headers["x-stream-offset"] so that they may store the offset to pick up after a reboot.

The RabbitMQ team is already working on a RabbitMQ Java Stream client library but the amqp-0.9.1 client is supposed to be able to provide the low level features so implementers can choose their own method.

In summary, a healthy minimum set of features could be

  • a new queue type: stream
    • Streams keep all messages stored
    • Streams keep a numerical index
    • Streams keep a timestamp index
  • stream consumers
    • must not autoAck (consumer should fail to set up),
    • must define qos (prefetch) (consumer should fail to set up),
    • should provide the option "x-stream-offset" ("first", "next", offset, timestamp)
    • must be provided the current message offset in the headers under "x-stream-offset" of each message.

More Info at the source

Message expiration after dead lettering

Assuming system has the following configuration.
T1 --B1--> Q1 ==> T2 --B2--> Q2

  • T1 - topic exchange
  • Q1 - queue that has dead letter exchange configured to T2
  • B1 - binding between T1 and Q1
  • T2 - another topic exchange that is a dead-letter exchange for Q1
  • Q2 - another queue
  • B2 - binding between T2 and Q2

Steps:

  1. message with some TTL is posted to topic exchange T1
  2. message is delivered to queue Q1 because of binding B1
  3. message expires in Q1 and is delivered to T2 (dead-lettered)
  4. T2 via binding B2 deliveres message to Q2
  5. message is still considered expired and it is going to be dead-lettered again (https://github.com/fridujo/rabbitmq-mock/blob/master/src/main/java/com/github/fridujo/rabbitmq/mock/MockQueue.java#L78)
  6. As Q2 doesn't have dead-letter exchange configured message is lost.

Expected behavior:
Message is delivered to Q2 and could be consumed by queue listener. During the same flow on real RabbitMQ message is consumed by the queue listener and not dead-lettered again.

Issue Mocking Delayed Message Exchange

I am using a delayed exchange and been trying to mock it. I am getting the following

Caused by: java.lang.IllegalArgumentException: No exchange type x-delayed-message at com.github.fridujo.rabbitmq.mock.exchange.MockExchangeFactory.build(MockExchangeFactory.java:31) at com.github.fridujo.rabbitmq.mock.MockNode.exchangeDeclare(MockNode.java:58) at com.github.fridujo.rabbitmq.mock.MockChannel.exchangeDeclare(MockChannel.java:244) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:564) at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1184) at com.sun.proxy.$Proxy176.exchangeDeclare(Unknown Source) at org.springframework.amqp.rabbit.core.RabbitAdmin.declareExchanges(RabbitAdmin.java:688) at org.springframework.amqp.rabbit.core.RabbitAdmin.lambda$declareExchange$0(RabbitAdmin.java:222) at org.springframework.amqp.rabbit.core.RabbitTemplate.invokeAction(RabbitTemplate.java:2151) at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:2110) ... 96 more

Anyone got this before ?

rabbitTemplate receive() does not dequeue message

I have this simple test. As you can see, calling rabbitTemplate.receive() did not remove message from the queue, although the same message was received every time I called this method. This issue does not happen if I hook to a real rabbitmq

@TestConfiguration
public class TestConfig {

    @Bean
    @Primary
    ConnectionFactory connectionFactory() {
        return new CachingConnectionFactory(new MockConnectionFactory());
    }

    @Bean
    @Primary
    public RabbitAdmin getTestRabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

    @Bean
    @Primary
    public RabbitTemplate getTestRabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }
}


@SpringBootTest
@Import(TestConfig.class)
class SampleTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private RabbitAdmin rabbitAdmin;

    @Test
    void test() throws Exception {
         rabbitTemplate.send("test-exchange", "test-key", new Message("test".getBytes(StandardCharsets.UTF_8)));

        Thread.sleep(500);
        QueueInformation info0 = rabbitAdmin.getQueueInfo("test-queue");  // QueueInformation [name=test-queue, messageCount=1, consumerCount=0]

        Message message1 = rabbitTemplate.receive("test-queue", -1);
        Thread.sleep(500);
        QueueInformation info1 = rabbitAdmin.getQueueInfo("test-queue");  // QueueInformation [name=test-queue, messageCount=1, consumerCount=1]

        Message message2 = rabbitTemplate.receive("test-queue", -1);
        Thread.sleep(500);
        QueueInformation info2 = rabbitAdmin.getQueueInfo("test-queue");  // QueueInformation [name=test-queue, messageCount=1, consumerCount=1]
    }
}

MockChannel: Overridden methods do not declare exceptions

Issue summary:

Methods in MockChannel which override Channel interface methods do not declare the same exceptions. For example, basicAck does not declare an IOException:

Channel:

    void basicAck(long deliveryTag, boolean multiple) throws IOException;

MockChannel:

    @Override
    public void basicAck(long deliveryTag, boolean multiple) {
        getTransactionOrNode().basicAck(deliveryTag, multiple);
        metricsCollectorWrapper.basicAck(this, deliveryTag, multiple);
    }

Why it matters:

Without the declared exceptions, it is not possible to mock throwing of the checked exception. Example / use-case:

try(Channel channel = Mockito.spy(conn.createChannel())) {
  assertThat(channel).isInstanceOf(MockChannel.class);
  
  Mockito.doThrow(new IOException("ACK failed"))
    .when(channel)
    .basicAck(anyLong(), anyBoolean());
  
  // If the ACK fails (i.e. throws an IOException, my app will try 3 times.
  // This is the behaviour I want to test
  verify(channel, times(3)).basicAck(anyLong(), anyBoolean());
}

Due to the undeclared exception, this test results in an error: Checked exception is invalid for this method!

Is this a valid use case?

Is this a valid use-case, or am I using rabbitmq-mock incorrectly?

If its a valid issue, I'm happy to create a PR to fix it if that's OK?

Unstable delivery in when more than one consumer is used

In my case I have 5 consumer register for queue, all works file in 90% of cases.
Occassionally it crash in MockQueue::deliverToConsumerIfPossible with NPE.
1/ NPE is not catch and whole delivery is broken
2/ map from

private final Map<String, ConsumerAndTag> consumersByTag = new LinkedHashMap<>();
is unstable it say it has size 5, but in fact has less elements

I think (2) could be easier fixed with concurrent version of map.

MetricCollector.consumedMessage is called incorrectly.

In rabbitmq-java-client, consumedMessage is called before handleDelivery.

https://github.com/rabbitmq/rabbitmq-java-client/blob/v5.9.0/src/main/java/com/rabbitmq/client/impl/ChannelN.java#L452-L460

But in mock, consumedMessage is called after it.

nextConsumer.consumer.handleDelivery(nextConsumer.tag, envelope, message.props, message.body);
mockChannel.getMetricsCollector().consumedMessage(mockChannel, deliveryTag, nextConsumer.tag);

Multiple reject/ack call should throw precondition failed

I tried to deploy our application to staging environment and I assumed it will work without exception/warning because I have tested some error cases in ITs regarding to message rejecting too. Unfortunately, I have got some warning:

"Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)"

The root cause of this warning is that:

"Client Errors: Double Acking and Unknown Tags
Should a client acknowledge the same delivery tag more than once, RabbitMQ will result a channel error such as PRECONDITION_FAILED - unknown delivery tag 100. The same channel exception will be thrown if an unknown delivery tag is used." https://www.rabbitmq.com/confirms.html

I've tried some combination of rejecting but I can't get a warning in this use cases:

      channel.basicReject(original.getMessageProperties().getDeliveryTag(), false); // successful
      channel.basicReject(original.getMessageProperties().getDeliveryTag(), false); // already rejected, should have a warning
      channel.basicReject(original.getMessageProperties().getDeliveryTag(), false); // successful
      channel.basicAck(original.getEnvelope().getDeliveryTag(), false); // already rejected, should have a warning
      channel.basicAck(original.getEnvelope().getDeliveryTag(), false); // successful
      channel.basicReject(original.getMessageProperties().getDeliveryTag(), false); // already acknowledged, should have a warning
      channel.basicAck(original.getEnvelope().getDeliveryTag(), false); // successful
      channel.basicAck(original.getEnvelope().getDeliveryTag(), false); // already acknowledged, should have a warning
      channel.basicReject(15, false); // unknown tag, should have a warning too
      channel.basicAck(15, false); // unknown tag, should have a warning too

RoutingKey does not follow spec for topic exchanges

When using a topic exchange, the official RMQ tutorial states that:

(hash) can substitute for zero or more words. (https://www.rabbitmq.com/tutorials/tutorial-five-python.html)

This means that a Queue bound to an Exchange with the RoutingKey 'something.#' should be able to match a message with a RoutingKey 'something'. This is, as expected, also the behavior on a real RMQ system. On the mock however, the hash matches against 1 or more words (instead of 0 or more). I believe the culprit is MockTopicExchange.java:19.
The correct behavior is useful because it allows for publishing routing keys to be modified with additional information in the future without breaking existing bindings.

Spring Cloud Stream not Working

Your mock example works like a charm, but I have an issue about it. Maybe you can help me.

I have one output channel configured using spring cloud stream.

When the output is called, the message counter isn't called.

Basically, I have the AMQPMocked and the Spring Cloud Stream isn't using it.

How can I configure this correctly?

declaring exchange multiple times resets it

RMQ allows multiple parties to declare the same exchange.

'exchange.declare' documentation in the AMQP spec:

"This method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class."

The mock implementation recreates the exchange on each declare, loosing all bindings.

RPC support

Do you plan to support RPC exchanges ?

Actually in the code

    @Override
    public CompletableFuture<Command> asyncCompletableRpc(Method method) {
        throw new UnsupportedOperationException();
    }

MockChannel don't clear transaction when rollback

MockChannel reuses transaction when rollback, which leaves wrong published messages in transaction.

// modify origin commit_or_rollback_can_be_called_multiple_times_after_a_single_select
String thirdMsg = "third message";
channel.basicPublish("", queue, null, thirdMsg.getBytes());
assertThat(channel.basicGet(queue, true)).isNull();
channel.txRollback();
assertThat(channel.basicGet(queue, true)).isNull();
                                                                                    
String fourthMsg = "fourth message";
channel.basicPublish("", queue, null, fourthMsg.getBytes());
assertThat(channel.basicGet(queue, true)).isNull();
channel.txCommit();
assertThat(channel.basicGet(queue, true).getBody()).isEqualTo(fourthMsg.getBytes());

Backdoor to queues

The test examples send and receive messages from the same queue. In practice often an application receives messages from one queue and sends them to another.
Are there plans to implement direct access to the MockQueue and its messages ?
Use case: application only sends messages to queue1. In the test run the code and check that the messages in MockQueue which mocks queue1 contain the expected content.
(Scala, for instance, has a special backdoor TestActorRef which gives direct access to the underlying actor and its mailbox.)

`MockQueue` uses unsynchronized `LinkedHashMap`s which can cause `ConcurrentModificationException` or halts message consumption

The MockQueue class has many unsynchronized LinkedHashMaps. Any access, even read access like calling size() that may occur on different threads must be synchronized or the JVM is free to cache these values for the current thread. This can cause for example a message that is being received by a queue almost simultaneously with a consumer being added to be not delivered (and delivery can stop altogether even if more message are sent). It can also cause ConcurrentModificationExceptions; here's an example that I've seen:

java.util.ConcurrentModificationException: null
at java.base/java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:756)
at java.base/java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:778)
at com.github.fridujo.rabbitmq.mock.MockQueue.doWithUnackedUntil(MockQueue.java:323)
at com.github.fridujo.rabbitmq.mock.MockQueue.basicAck(MockQueue.java:200)
at com.github.fridujo.rabbitmq.mock.MockNode.lambda$basicAck$0(MockNode.java:120)
at java.base/java.util.concurrent.ConcurrentHashMap$ValuesView.forEach(ConcurrentHashMap.java:4780)
at com.github.fridujo.rabbitmq.mock.MockNode.basicAck(MockNode.java:120)
at com.github.fridujo.rabbitmq.mock.MockChannel.basicAck(MockChannel.java:407)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1162)
at jdk.proxy2/jdk.proxy2.$Proxy181.basicAck(Unknown Source)
at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.commitIfNecessary(BlockingQueueConsumer.java:876)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1048)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:940)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:84)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1317)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1223)
at java.base/java.lang.Thread.run(Thread.java:833)

I've fixed these problems locally, but I've already have two Pull Requests open on this project that have unfortunately not had any response, and I'm not sure if it is worth my time to make a 3rd PR to get this fixed.

The project seems somewhat abandoned -- are you looking for help? I wouldn't mind integrating the fix above, and my other two PR's and cutting a new release.

If it is abandoned, would you have any objection in my forking this project and releasing it to Maven central under my own groupId?

Policies

Implement CRUD operations on policies, as defined here https://www.rabbitmq.com/parameters.html#policies

CRUD methods of Policies must be available on MockConnectionFactory as it makes no sense to have the opened/closed semantic of a Connection or even a Channel when it comes to Policy definition.

Policies will affect only the following extensions (the other being not implemented):

  • alternate exchanges
  • dead lettering
  • per-queue TTLs
  • maximum queue length

Failed to declare queue

Hello,

I'm using spring boot version 2.1.2.RELEASE, amqp-client:5.4.3 and rabbitmq-mock:1.0.9.
When I try to mock the RabbitMQ Connection Factory like the showed example:

    @Bean
    public ConnectionFactory connectionFactory() {
        return new CachingConnectionFactory(MockConnectionFactoryFactory.build());
    }

I'm receiving error org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[****(name of queue)]

I was using this library with spring boot version 2.0.6.RELEASE it was working perfect.

Can you please check it :)

MockQueue should not use declaring channel for metrics

When trying to use the MockConnectionFactory for integration tests for a Spring Boot application I noticed that metrics were not complete when using a transaction manager. Upon further investigation I noticed MockQueue stores the MockChannel used to create it. It subsequently uses this in deliverToConsumerIfPossible to register consumption.

This means that acknowledgements are not recorded in AbstractMetricsCollector.updateChannelStateAfterAckReject for single messages because channelState.unackedMessageDeliveryTags does not contain the consumption, instead it was recorded against the channel used to declare the queue.

I've provided a PR with two commits. The first provides a failing test, the second commit introduces a fix which stores the MockChannel in the ConsumerAndTag when basicConsume is called so it can be used when delivering to the consumer.

Exception in Consumer breaks subscription

When an exception is thrown in the com.rabbitmq.client.Consumer#handleDelivery method, the consumer doesn't receive messages afterwards because the delivery thread is dead.

The test throwKillsConsumer, will throw an exception when receiving the first message. The second message won't be delivered. This was hand to debug because no exception is logged about the dying thread.

The test noThrowIsFine, delivers all messages as expected.

Click to expand

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.github.fridujo.rabbitmq.mock.MockConnectionFactory;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.Test;

public class MockConnectionTest3 {

    private static final String EXCHANGE = "exchange";
    private static final String QUEUE = "queue";

    @Test
    void throwKillsConsumer() throws Exception {
        var factory = new MockConnectionFactory();

        var conn = factory.newConnection();
        var channel = conn.createChannel();
        channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare(QUEUE, false, true, true, Map.of());
        channel.queueBind(QUEUE, EXCHANGE, "");

        channel.basicPublish(EXCHANGE, "", new AMQP.BasicProperties(), "throw".getBytes());
        channel.basicPublish(EXCHANGE, "", new AMQP.BasicProperties(), "complete".getBytes());

        var queue = readFromQueue(conn);
        assertEquals("throw", queue.poll(1, TimeUnit.SECONDS));
        assertEquals("complete", queue.poll(1, TimeUnit.SECONDS));
    }

    @Test
    void noThrowIsFine() throws Exception {
        var factory = new MockConnectionFactory();

        var conn = factory.newConnection();
        var channel = conn.createChannel();
        channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, true);
        channel.queueDeclare(QUEUE, false, true, true, Map.of());
        channel.queueBind(QUEUE, EXCHANGE, "");

        channel.basicPublish(EXCHANGE, "", new AMQP.BasicProperties(), "first".getBytes());
        channel.basicPublish(EXCHANGE, "", new AMQP.BasicProperties(), "complete".getBytes());

        var queue = readFromQueue(conn);
        assertEquals("first", queue.poll(1, TimeUnit.SECONDS));
        assertEquals("complete", queue.poll(1, TimeUnit.SECONDS));
    }

    ArrayBlockingQueue<String> readFromQueue(Connection conn) throws IOException {
        var channel = conn.createChannel();
        var queue = new ArrayBlockingQueue<String>(3);

        var consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                try {
                    queue.put(new String(body));
                } catch (InterruptedException e) {}

                if (new String(body).equals("throw")) {
                    throw new RuntimeException("oops");
                }
            }
        };

        channel.basicConsume(QUEUE, true, consumer);
        return queue;
    }
}

Channels can be committed only once.

I'm using mock rmq with transactions using spring. Spring is reusing channels, even across transactions but the mockchannel fail big time the second time a 'reused' channel is committed. See below.

AFAIK, a channel can be commited multiple times, without requiring another txSelect

#[101219 23:21:50,734 | vm010124 | ERROR |http-nio-9090-exec-5|org.springframework.transaction.support.TransactionSynchronizationUtils | TransactionSynchronization.afterCompletion threw exception]#
java.lang.IllegalStateException: No started transaction (make sure you called txSelect before txCommit
	at com.github.fridujo.rabbitmq.mock.MockChannel.txCommit(MockChannel.java:521)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:1140)
	at com.sun.proxy.$Proxy86.txCommit(Unknown Source)
	at org.springframework.amqp.rabbit.connection.RabbitResourceHolder.commitAll(RabbitResourceHolder.java:171)
	at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$RabbitResourceSynchronization.afterCompletion(ConnectionFactoryUtils.java:332)
	at org.springframework.transaction.support.TransactionSynchronizationUtils.invokeAfterCompletion(TransactionSynchronizationUtils.java:171)
	at org.springframework.transaction.support.AbstractPlatformTransactionManager.invokeAfterCompletion(AbstractPlatformTransactionManager.java:992)
	at org.springframework.transaction.support.AbstractPlatformTransactionManager.triggerAfterCompletion(AbstractPlatformTransactionManager.java:967)
	at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:788)
	at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:714)
	at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:534)
	at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:305)
	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)

Add support for mandatory flag

Would be nice if mandatory flag is respected. Right now (1.0.14-SNAPSHOT) it is just ignored.
When using the recently added direct-reply combined with 'mandatory' there is even an exception:

[main] WARN com.nokia.impact.adaptation.model.Result - Unexpected exception
java.lang.UnsupportedOperationException
	at com.github.fridujo.rabbitmq.mock.MockChannel.addReturnListener(MockChannel.java:111)
	at com.rabbitmq.client.RpcClient.<init>(RpcClient.java:116)
	at com.nokia.impact.broker.RabbitMQWritableChannel.lambda$null$2(RabbitMQWritableChannel.java:133)
	at com.nokia.impact.broker.RabbitTemplate.execute(RabbitTemplate.java:55)
	at com.nokia.impact.broker.RabbitMQWritableChannel.lambda$call$3(RabbitMQWritableChannel.java:120)
	at com.nokia.impact.adaptation.model.Result.execute(Result.java:34)
	at com.nokia.impact.broker.RabbitMQWritableChannel.call(RabbitMQWritableChannel.java:118)
	at com.nokia.impact.broker.RmqControllerChannels.callAdapter(RmqControllerChannels.java:75)
	at com.nokia.impact.broker.RmqControllerChannelsTest.call(RmqControllerChannelsTest.java:110)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
	at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

Multiple bindings which only differ in binding arguments not working correctly

We discovered that when creating two queue bindings on the same exchange which only differ in binding arguments that rabbit mock will not deliver messages to the queue for all of the possible bindings (ie, one of the bindings simply does not work despite a message being delivered to the exchange with correct arguments).

This is because the BindConfigurations are at some point compared and its equals/hashCode does not take the binding arguments into account.

PR #228 shows a potential solution which fixes our test cases.

Is this mock supported for Spring-Boot 1.5.x projects ?

I'm trying rabbitmq-mock for a spring boot 1.5.12 project built with apache-camel.
amqp-client version is 4.0.3 which is a supported version as per your documentation.
However when I tried to use this in my project(tried 1.0.10, 1.0.0 and 1.0.7), I got ClassNotFound exception in all cases, all those classes are from different versions of com.rabbitmq:amqp-client
Hence I would like to ask, whether this mock supports lower version of amqp-client ?
coz I saw that right from the beginning this library is using 2.x.x version of spring-amqp...

Errors:
java.lang.ClassNotFoundException: com.rabbitmq.client.DeliverCallback
java.lang.ClassNotFoundException: com.rabbitmq.client.QueueingConsumer

This is what I'm trying

    @Bean
    public Channel amqpChannel() {
        Connection connection = new MockConnectionFactory().newConnection();
        Channel channel = null;
        try {
            channel = connection.createChannel();
            channel.exchangeDeclare("xyz", "direct", true);
        } catch (Exception e) {
            log.info("Error declaring xyz exchange");
        }

        return channel;
    }

On stream restart, the mock stops working

I got the following set-up:
Producer actor -> Akka stream -> alpakka-amqp -> Consumer akka stream

When I restart either stream (which implies draining the stream, canceling upstream and completing downstream, and then creating a new stream with the same AmqpConnectionProvider and configs), the mock stops working with the new stream.

Using messageCount and consumerCount on the used channel shows the correct number of messages waiting to be read and the new consumer in there (for example, starts at one, goes to 0 when the stream is shutdown and back to 1 when it is recreated), but nothing is delivered to the stream.

Using the same set-up on a real RMQ server instead of the mock seems to be working. Any idea?

No started transaction using Spring

I'am imput depedence on pom.xml and a created Bean to connectionFactory(), but I have stacktrace: No started transaction (make sure you called txSelect before txRollback.
I have problem just when going exec the second test with rabbitmq.

I'm using Spring and Junit.

Bean:

@Configuration
class RabbitmqConfig {

    @Bean
    ConnectionFactory connectionFactory() {
        return new CachingConnectionFactory(new MockConnectionFactory());
    }

}

Error validade:

public RollbackOk txRollback() {
        if (this.transaction == null) {
            throw new IllegalStateException("No started transaction (make sure you called txSelect before txRollback");
        } else {
            this.transaction = null;
            return new com.rabbitmq.client.impl.AMQImpl.Tx.RollbackOk();
        }
    }

any help?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.