Giter VIP home page Giter VIP logo

javansqclient's Introduction

JavaNSQClient

A (fast?) netty-based Java8 client for NSQ heavily forked of TrendrrNSQClient.

Artifact

<dependency>
  <groupId>com.github.brainlag</groupId>
  <artifactId>nsq-client</artifactId>
  <version>1.0.0.RC4</version>
</dependency>

Consumer

Example usage:

NSQLookup lookup = new DefaultNSQLookup();
lookup.addLookupAddress("localhost", 4161);
NSQConsumer consumer = new NSQConsumer(lookup, "speedtest", "dustin", (message) -> {
        System.out.println("received: " + message);            
        //now mark the message as finished.
        message.finished();
        
        //or you could requeue it, which indicates a failure and puts it back on the queue.
        //message.requeue();
});
        
consumer.start();

Producer

Example usage:

NSQProducer producer = new NSQProducer().addAddress("localhost", 4150).start();            
producer.produce("TestTopic", ("this is a message").getBytes());

Backoff

By default Backoff does not kick in and a consumer will eat all your memory and CPU. To enable Backoff you have to set your own thread pool executer with:

consumer.setExecutor(...);

javansqclient's People

Contributors

brainlag avatar dpapworth-qc avatar dustismo avatar icanfly avatar klucar avatar kulikov avatar loveflowers avatar moonpolysoft avatar mreiferson avatar pfeairheller avatar rukyzhc avatar sundy-li avatar vikash-tiwari avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

javansqclient's Issues

Incorrect usage of Netty's ByteBuf class in JavaNSQClient library

The following error happens simply when reading messages for a while from NSQConsumer.

[nioEventLoopGroup-2-2] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. Enable advanced leak reporting to find out where the leak occurred. To enable advanced leak reporting, specify the JVM option '-Dio.netty.leakDetection.level=advanced' or call ResourceLeakDetector.setLevel() See http://netty.io/wiki/reference-counted-objects.html for more information.

Is it possible implement a RMI call style in NSQ client?

Hi,
As the title said, I wanna use NSQ to implement a simple service bus, it is very easy ti find that the pub/sub and queue way have already implemented in NSQ, but how to get return object in a NSQProducer? Or do you have any kindly subjection?

Best regards.

Nullpointer Exception while connecting consumer.

JsonNode jsonNode = mapper.readTree(new URL(addr + "/lookup?topic=" + topicEncoded)); LogManager.getLogger(this).debug("Server connection information: {}", jsonNode); JsonNode producers = jsonNode.get("data").get("producers");

jsonNode value is not having "data" key but rather "producers" is in the same level:
{"channels":["test1"],"producers":[{"remote_address":"127.0.0.1:52354","hostname":"vikash.tiwari","broadcast_address":"vikash.tiwari","tcp_port":4150,"http_port":4151,"version":"1.0.0-compat"}]}

Producer and Consumer connection code is same as in instructions.

java.lang.IllegalStateException: Queue full

java.lang.IllegalStateException: Queue full
    at java.util.AbstractQueue.add(AbstractQueue.java:98)
    at com.github.brainlag.nsq.Connection.incoming(Connection.java:115)
    at com.github.brainlag.nsq.netty.NSQHandler.lambda$channelRead0$3(NSQHandler.java:41)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:358)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:112)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
    at java.lang.Thread.run(Thread.java:748)

Went through the code and found the following line in Connection.java :

private final LinkedBlockingQueue<NSQFrame> responses = new LinkedBlockingQueue<>(1);

Wanted to understand why this is initialized with capacity 1 and what could potentially be causing this error. Could it be related to high rate of polling NSQ? We deal with a lot of data.

Release of the version 1.0.0

Can I ask about when version 1.0.0 will be released ? I am planning to use this project with the one of the project I am contributing and I'm uneasy at the idea of having an RC as dependency.

Upgrading netty to 4.1.x

There are breaking changes between 4.1 and 4.0 Most of the project start to use 4.1 and this creates classpath errors. It'll be good upgrading netty to 4.1.x

Use sf4j instead of log4j2

I'd like to sf4j-log4j to instead of log4j2, cause sf4j is a simple Logging Facade and could be used in various logging frameworks (e.g. java.util.logging, logback, log4j) .
If you think that's needed, I will open a merge request to you.

fail to run message.requeue();

nsq version:1.0
os:centos 7.4

codes:
public static void main( String[] args )
{
NSQLookup lookup = new DefaultNSQLookup();
lookup.addLookupAddress("192.168.1.228", 4161);
NSQConsumer consumer = new NSQConsumer(lookup, "TestTopic", "dusti", (message) -> {
System.out.println("received: " + new String(message.getMessage()));
message.finished();
message.requeue();
});

    consumer.start();
}

below is run time messages:
17:00:37.080 INFO Created connection: knowledgebase:4150 - Connection.
17:00:37.100 INFO IdentifyResponse: {"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250} - NSQFeatureDetectionHandler.channelRead0
17:00:37.101 INFO reinstall LengthFieldBasedFrameDecoder - NSQFeatureDetectionHandler.eject
17:00:37.102 INFO Server identification: {"max_rdy_count":2500,"version":"1.0.0-compat","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250} - Connection.
received: test8
received: test9
十二月 28, 2017 5:00:53 下午 io.netty.util.concurrent.SingleThreadEventExecutor runAllTasks
警告: A task raised an exception.
java.lang.IllegalStateException: Queue full
at java.util.AbstractQueue.add(AbstractQueue.java:98)
at com.github.brainlag.nsq.Connection.incoming(Connection.java:129)
at com.github.brainlag.nsq.netty.NSQHandler.lambda$channelRead0$3(NSQHandler.java:41)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:408)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:402)
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:748)

Bug while connection close gives an exception

While doing a connection close, there is an Channel Inactive which is un-caught. Hence, the connection is not removed.
So I'm left with a stale connection, and not able to reconnect due to an existing entry to that address.

Logs:
16:14:47.245 INFO Remove connection server:4150 - NSQConsumer.connect
16:14:47.245 INFO Closing connection: com.github.brainlag.nsq.Connection@188fe003 - Connection.close
16:14:47.246 INFO Channel disconnected! com.github.brainlag.nsq.Connection@188fe003 - NSQHandler.channelInactive

Fix could be to put in try catch

        for (final ServerAddress server : Sets.difference(oldAddresses, newAddresses)) {
            LogManager.getLogger(this).info("Remove connection " + server.toString());                
           try {
                   connections.get(server).close();
          } catch (Exception e) {
                LogManager.getLogger(this).error("Close connection failed: " + e.getMessage(), e);
           } finally {
            connections.remove(server);
           }
        }

How to switch jdk1.7 version ?

Because the use of jdk1.7 so I want to use RPC framework(JDK1.7), switching effects will be great? If it is how to do it?

ConcurrentModificationException in connect

It looks like this code from NSQConsumer.java (around line 202) will sometimes throw a ConcurrentModificationException:

        for (final ServerAddress server : Sets.difference(oldAddresses, newAddresses)) {
            LogManager.getLogger(this).info("Remove connection " + server.toString());
            connections.get(server).close();
            connections.remove(server);
        }

Since the exception is uncaught, the scheduler.scheduleAtFixedRate then silently fails, which is bad.

Failed when i try to set timeout

I try to set the timeout of NSQConsumer but this throw a Cast Exception

NSQConfig config= new NSQConfig();
config.setMsgTimeout(Util.NSQ_MSG_TIMEOUT);
 NSQConsumer consumer = new NSQConsumer(lookup, topic, NsqChannelConst.REPORTS_CHANNEL, (message) -> {

//......

},config);

this is the error:

Caused by: java.lang.ClassCastException: class com.github.brainlag.nsq.frames.ErrorFrame cannot be cast to class com.github.brainlag.nsq.frames.ResponseFrame (com.github.brainlag.nsq.frames.ErrorFrame and com.github.brainlag.nsq.frames.ResponseFrame are in unnamed module of loader 'app')
at com.github.brainlag.nsq.Connection.(Connection.java:77) ~[nsq-client-1.0.0.RC4.jar:na]
at com.github.brainlag.nsq.NSQConsumer.createConnection(NSQConsumer.java:80) ~[nsq-client-1.0.0.RC4.jar:na]
at com.github.brainlag.nsq.NSQConsumer.connect(NSQConsumer.java:210) ~[nsq-client-1.0.0.RC4.jar:na]
at com.github.brainlag.nsq.NSQConsumer.start(NSQConsumer.java:72) ~[nsq-client-1.0.0.RC4.jar:na]

Cannot easily end the process

I've created a consumer and a producer, and had to jump through some hoops to get the process to close. I ended up having to:

  • Create a NIOEventLoopGroup, and add it to an NSQConfig that was passed to each producer/consumer, and had to pass it a thread factory that would create daemon threads (shutdown on the object didn't seem to work)
  • Create an executor service with a similar thread factory that creates daemon threads, and pass that to consumers only (producers shut theirs down properly)

It would be nice if this were either easier to do, or documented.

Creating new connection causes NPE instead of graceful fail.

Connection.java, lines 70, 71

final NSQFrame response = commandAndWait(ident);
LogManager.getLogger(this).info("Server identification: " + ((ResponseFrame) response).getMessage());

commandAndWait returns null on an InterruptedException, which causes the log line to throw an NPE on response.getMessage()

I haven't been able to reproduce this error yet, but I saw it while testing.

Consuming with snappy only receives one message

            //curl -d 'message 1' 'http://localhost:4151/put?topic=test_snappy'
            //curl -d 'message 2' 'http://localhost:4151/put?topic=test_snappy'
            //curl -d 'message 3' 'http://localhost:4151/put?topic=test_snappy'

            NSQLookup lookup = new DefaultNSQLookup();
            lookup.addLookupAddress("localhost", 4161);
            NSQConfig config = new NSQConfig();
            config.setCompression(NSQConfig.Compression.SNAPPY);
            NSQConsumer consumer = new NSQConsumer(lookup, "test_snappy", "test_consume",
                    (msg) -> {
                        System.out.println(new String(msg.getMessage()));
                        msg.finished();
                    }, config);
            consumer.start();
            Thread.sleep(20000);
            System.out.println("done");

10:10:01.997 INFO  Created connection: Rob-Seeds-MacBook-Pro.local:4150 - Connection.<init> 
10:10:02.021 INFO  IdentifyResponse: {"max_rdy_count":2500,"version":"0.3.2","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":true,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250} - NSQFeatureDetectionHandler.channelRead0 
10:10:02.025 INFO  Adding snappy to pipline - NSQFeatureDetectionHandler.installSnappyDecoder 
10:10:02.027 INFO  IdentifyResponse: OK - NSQFeatureDetectionHandler.channelRead0 
10:10:02.029 INFO  Server identification: {"max_rdy_count":2500,"version":"0.3.2","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":true,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250} - Connection.<init> 
message 1
done

Works as expected when not using snappy compression. Publishing with snappy enabled works fine.

Unexpected network errors will cause consuming stop

Consumer never be triggered about network error. For example, unplug the network cable, disable computer networks. so connections in consumer never be closed and remove.

I think should add heartbeat interval expire for connection. If last heartbeat interval exceeded, close and reconnection.

I will post my code improvements soon.

Why Producer not lookup lookupd for produce connection like Consumer ?

Why the implementation of producer not apply consumer model, get nsqd server address by lookup lookupd servers, then establish a connection for message sending, and decide which nsqd to send a message by scheduling lookup lookupd servers

like this:
`
public NSQConsumer(final NSQLookup lookup, final String topic, final String channel, final NSQMessageCallback callback) {
this(lookup, topic, channel, callback, new NSQConfig());
}

public NSQConsumer(final NSQLookup lookup, final String topic, final String channel, final NSQMessageCallback callback,
                   final NSQConfig config) {
    this(lookup, topic, channel, callback, config, null);
}

public NSQConsumer(final NSQLookup lookup, final String topic, final String channel, final NSQMessageCallback callback,
                   final NSQConfig config, final NSQErrorCallback errCallback) {
    this.lookup = lookup;
    this.topic = topic;
    this.channel = channel;
    this.config = config;
    this.callback = callback;
    this.errorCallback = errCallback;

}

`
but Producer has initialized hardly like this:

`
public class NSQProducer {
private Set addresses = Sets.newConcurrentHashSet();
...

protected Connection getConnection() throws NoConnectionsException {
int c = 0;
while (c < connectionRetries) {
ServerAddress[] serverAddresses = addresses.toArray(new ServerAddress[addresses.size()]);
if (serverAddresses.length != 0) {
try {
return pool.borrowObject(serverAddresses[roundRobinCount++ % serverAddresses.length]);
} catch (NoSuchElementException e) {
try {
Thread.sleep(1000);
} catch (InterruptedException ix) {
throw new NoConnectionsException("Could not acquire a connection to a server", ix);
}
} catch (Exception ex) {
throw new NoConnectionsException("Could not acquire a connection to a server", ex);
}
}
}
throw new IllegalStateException("No server configured for producer");
}

`

I think it's good for Increase/Decrease number of nsqd servers.

Unable to connect lookup

Hi!brainlag.
I use your 'nsq-client' in my clojure-project. But I can not connect to lookup. The code is as follow:
(import com.github.brainlag.nsq.lookup.DefaultNSQLookup)
=> com.github.brainlag.nsq.lookup.DefaultNSQLookup
(def nsq-lookup (DefaultNSQLookup.))
=> #'nsq-tester.core/nsq-lookup
(.addLookupAddress nsq-lookup "127.0.0.1" 4161)
=> nil
(.lookup nsq-lookup "testss")
17:28:42.004 WARN Unable to connect to address http://127.0.0.1:4161 - DefaultNSQLookup.lookup
17:28:42.004 WARN Unable to connect to any NSQ Lookup servers, servers tried: [http://127.0.0.1:4161] - DefaultNSQLookup.lookup
=> #{}

I run this code on the repl.
the nsq is deployed on my PC.

How to connect to nsqd without lookup

Hi, I am wondering how to connect to nsqd without lookup. The consumer samples online always use nsqdlookup firstly then connect and I've failed to find a way to set connection in NSQConsumer. Thank you.

Could not create connection to server xxxxxxx - NSQConsumer.createConnection

Producer works fine, but consumer can not start . Stacktrace may like this:

com.github.brainlag.nsq.exceptions.NoConnectionsException: Could not connect to server
at com.github.brainlag.nsq.Connection.(Connection.java:62) ~[nsq-client-1.0.0.RC4.jar:?]
at com.github.brainlag.nsq.NSQConsumer.createConnection(NSQConsumer.java:80) ~[nsq-client-1.0.0.RC4.jar:?]
at com.github.brainlag.nsq.NSQConsumer.connect(NSQConsumer.java:210) ~[nsq-client-1.0.0.RC4.jar:?]
at com.github.brainlag.nsq.NSQConsumer.lambda$start$0(NSQConsumer.java:73) ~[nsq-client-1.0.0.RC4.jar:?]
at java.util.concurrent.Executors$RunnableAdapter.call$$$capture(Executors.java:511) [?:1.8.0_144]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java) [?:1.8.0_144]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) [?:1.8.0_144]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_144]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) [?:1.8.0_144]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_144]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_144]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]
Caused by: java.nio.channels.UnresolvedAddressException
at sun.nio.ch.Net.checkAddress(Net.java:101) ~[?:1.8.0_144]
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) ~[?:1.8.0_144]
at io.netty.channel.socket.nio.NioSocketChannel.doConnect(NioSocketChannel.java:208) ~[netty-all-4.0.39.Final.jar:4.0.39.Final]
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.connect(AbstractNioChannel.java:203) ~[netty-all-4.0.39.Final.jar:4.0.39.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.connect(DefaultChannelPipeline.java:1226) ~[netty-all-4.0.39.Final.jar:4.0.39.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:549) ~[netty-all-4.0.39.Final.jar:4.0.39.Final]
at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:534) ~[netty-all-4.0.39.Final.jar:4.0.39.Final]
at io.netty.channel.ChannelOutboundHandlerAdapter.connect(ChannelOutboundHandlerAdapter.java:47) ~[netty-all-4.0.39.Final.jar:4.0.39.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:549) ~[netty-all-4.0.39.Final.jar:4.0.39.Final]
at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:534) ~[netty-all-4.0.39.Final.jar:4.0.39.Final]
at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:516) ~[netty-all-4.0.39.Final.jar:4.0.39.Final]
at io.netty.channel.DefaultChannelPipeline.connect(DefaultChannelPipeline.java:970) ~[netty-all-4.0.39.Final.jar:4.0.39.Final]
at io.netty.channel.AbstractChannel.connect(AbstractChannel.java:215) ~[netty-all-4.0.39.Final.jar:4.0.39.Final]
at io.netty.bootstrap.Bootstrap$2.run(Bootstrap.java:166) ~[netty-all-4.0.39.Final.jar:4.0.39.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:408) ~[netty-all-4.0.39.Final.jar:4.0.39.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:402) ~[netty-all-4.0.39.Final.jar:4.0.39.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:140) ~[netty-all-4.0.39.Final.jar:4.0.39.Final]
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144) ~[netty-all-4.0.39.Final.jar:4.0.39.Final]
... 1 more

How can I fix this?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.