opentsdb / async Goto Github PK
View Code? Open in Web Editor NEWBuilding blocks for asynchronous Java processing inspired by Twisted's API.
Home Page: http://github.com/stumbleupon/async
License: BSD 3-Clause "New" or "Revised" License
Building blocks for asynchronous Java processing inspired by Twisted's API.
Home Page: http://github.com/stumbleupon/async
License: BSD 3-Clause "New" or "Revised" License
,-----------------------------. | StumbleUpon's Async Library | `-----------------------------' This Java library provides some useful building blocks to build high-performance multi-threaded asynchronous applications in Java. Its implementation was inspired by Twisted's asynchronous library (twisted.internet.defer). Deferred allows you to easily build asynchronous processing chains that must trigger when an asynchronous event (I/O, RPC and whatnot) completes. It can be used extensively to build an asynchronous API in a multi-threaded server or client library.
I'm having trouble understanding the flow of the Deferred class.
For example, this API is used in the AsyncKudu. How can I do this method?
public void foo(){
//verify that the table exists;
...
//if exists, open table;
...
//write on the table;
...
}
More specifically, the AsyncKuduClient has this method to see if a table exists:
Deferred<Boolean> aux = kuduClient.tableExists(tableName);
What can I do after this?
And after that, how can I call this method asynchronously?
Thank you.
version 1.4.1
call stack
"OpenTSDB I/O Worker #20" prio=10 tid=0x00007f246c6c6000 nid=0x1d3e in Object.wait() [0x00007f23cdfe1000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
- waiting on <0x00000006c3f705a0> (a com.stumbleupon.async.Deferred$Signal)
at java.lang.Object.wait(Object.java:503)
at com.stumbleupon.async.Deferred.doJoin(Deferred.java:1136)
- locked <0x00000006c3f705a0> (a com.stumbleupon.async.Deferred$Signal)
at com.stumbleupon.async.Deferred.joinUninterruptibly(Deferred.java:1061)
at net.opentsdb.uid.UniqueId.getOrCreateId(UniqueId.java:663)
at net.opentsdb.core.IncomingDataPoints.rowKeyTemplate(IncomingDataPoints.java:132)
at net.opentsdb.core.TSDB.addPointInternal(TSDB.java:785)
at net.opentsdb.core.TSDB.addPoint(TSDB.java:766)
at net.opentsdb.tsd.PutDataPointRpc.execute(PutDataPointRpc.java:210)
at net.opentsdb.tsd.RpcHandler.handleHttpQuery(RpcHandler.java:283)
at net.opentsdb.tsd.RpcHandler.messageReceived(RpcHandler.java:134)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler.handleUpstream(IdleStateAwareChannelUpstreamHandler.java:36)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at org.jboss.netty.handler.timeout.IdleStateHandler.messageReceived(IdleStateHandler.java:294)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at com.stumbleupon.async.Deferred.joinUninterruptibly(Deferred.java:1061)
at net.opentsdb.uid.UniqueId.getOrCreateId(UniqueId.java:663)
at net.opentsdb.core.IncomingDataPoints.rowKeyTemplate(IncomingDataPoints.java:132)
at net.opentsdb.core.TSDB.addPointInternal(TSDB.java:785)
at net.opentsdb.core.TSDB.addPoint(TSDB.java:766)
at net.opentsdb.tsd.PutDataPointRpc.execute(PutDataPointRpc.java:210)
at net.opentsdb.tsd.RpcHandler.handleHttpQuery(RpcHandler.java:283)
at net.opentsdb.tsd.RpcHandler.messageReceived(RpcHandler.java:134)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.jboss.netty.handler.timeout.IdleStateAwareChannelUpstreamHandler.handleUpstream(IdleStateAwareChannelUpstreamHandler.java:36)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at org.jboss.netty.handler.timeout.IdleStateHandler.messageReceived(IdleStateHandler.java:294)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at org.jboss.netty.handler.codec.http.HttpContentEncoder.messageReceived(HttpContentEncoder.java:82)
at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:88)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at org.jboss.netty.handler.codec.http.HttpContentDecoder.messageReceived(HttpContentDecoder.java:108)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at org.jboss.netty.handler.codec.http.HttpChunkAggregator.messageReceived(HttpChunkAggregator.java:194)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:452)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.callDecode(ReplayingDecoder.java:536)
at org.jboss.netty.handler.codec.replay.ReplayingDecoder.messageReceived(ReplayingDecoder.java:435)
at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:791)
at org.jboss.netty.channel.SimpleChannelHandler.messageReceived(SimpleChannelHandler.java:142)
at org.jboss.netty.channel.SimpleChannelHandler.handleUpstream(SimpleChannelHandler.java:88)
at net.opentsdb.tsd.ConnectionManager.handleUpstream(ConnectionManager.java:87)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:564)
at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:559)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:88)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178)
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
hi, is there any examples to show how to use?
Hi,
We are seeing frequent deadlocks happening because the Deferred tries to acquire lock (code-link) due to synchronize(this) when it tries to run the callback chain and Signal (the callback object inside Deferred) also needs to acquire lock (code-link) on itself when it tries to run its callback method to notify the waiter. But inside doJoin method the lock is already acquired on the Signal (code-link)and when you are trying to throw the TimeOutException it will try to convert the Deferred to String which leads to acquiring the lock(code-link) on (this) Deferred. Hope you follow the logic I am also attaching the jstack output so it can make more sense.
Found one Java-level deadlock:
=============================
"pool-88-thread-1":
waiting to lock monitor 0x00007fa088011518 (object 0x00007fa62e0442f0, a com.stumbleupon.async.Deferred$Signal),
which is held by "pool-1-thread-520"
"pool-1-thread-520":
waiting to lock monitor 0x00007fa088011468 (object 0x00007fa62e044240, a com.stumbleupon.async.Deferred),
which is held by "pool-88-thread-1"
Java stack information for the threads listed above:
===================================================
"pool-88-thread-1":
at com.stumbleupon.async.Deferred$Signal.call(Deferred.java:1215)
- waiting to lock <0x00007fa62e0442f0> (a com.stumbleupon.async.Deferred$Signal)
at com.stumbleupon.async.Deferred.doCall(Deferred.java:1278)
at com.stumbleupon.async.Deferred.runCallbacks(Deferred.java:1257)
- locked <0x00007fa62e044240> (a com.stumbleupon.async.Deferred)
at com.stumbleupon.async.Deferred.callback(Deferred.java:1005)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:744)
"pool-1-thread-520":
at com.stumbleupon.async.Deferred.toString(Deferred.java:1416)
- waiting to lock <0x00007fa62e044240> (a com.stumbleupon.async.Deferred)
at java.lang.String.valueOf(String.java:2979)
at java.lang.StringBuilder.append(StringBuilder.java:131)
at com.stumbleupon.async.TimeoutException.<init>(TimeoutException.java:43)
at com.stumbleupon.async.Deferred.doJoin(Deferred.java:1177)
- locked <0x00007fa62e0442f0> (a com.stumbleupon.async.Deferred$Signal)
at com.stumbleupon.async.Deferred.join(Deferred.java:1045)
I am thinking of a couple of ways to fix this, I want your suggestions
synchronized (signal_cb) {
(line no : 1133 Deferred.java). This may require to set a bool that timeout happen and come out of synchronize to check status and then throw exception if it is still pending or something like that.There are several shortened links in the documentation referencing su.pr
. All of these links appear to have stopped working. They now all just redirect to www.stumbleupon.com.
The sources JAR in the central Maven repository has a missing package structure.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.