tokio-rs / tokio-core Goto Github PK
View Code? Open in Web Editor NEWI/O primitives and event loop for async I/O in Rust
License: Apache License 2.0
I/O primitives and event loop for async I/O in Rust
License: Apache License 2.0
These futures call std::io::Read
methods and expect them to internally register the task's interest in an event if it's not immediately ready. This isn't generally true for implementations of Read
and Write
implementations not based on PollEvented
. Tokio could introduce a new AsyncRead
and AsyncWrite
traits to make this requirement explicit.
It appears to be idiomatic in tokio to take ownership of IO handles when transforming them, and therefore to split bidirectional channels into two separate objects. The existing monolithic Io
abstraction doesn't play well with this, but could be split into separate traits respectively providing poll_read
and poll_write
, and perhaps need_read
and need_write
if necessary. These could be the traits suggested in #61.
Currently when I'm running my tests on travis there is a failing .unwrap()
in the timers heap.rs
. The error does not occur on my PC.
This is the backtrace (the error occurs here):
thread 'tests::put' panicked at 'called `Option::unwrap()` on a `None` value', ../src/libcore/option.rs:317
stack backtrace:
1: 0x7f811bcd9423 - std::sys::backtrace::tracing::imp::write::h46f28e67d38b4637
2: 0x7f811bcdd52d - std::panicking::default_hook::{{closure}}::h1d3243f546573ff4
3: 0x7f811bcdc6ce - std::panicking::default_hook::h96c288d728df3ebf
4: 0x7f811bcdcdb8 - std::panicking::rust_panic_with_hook::hb1322e5f2588b4db
5: 0x7f811bcdcc52 - std::panicking::begin_panic::hfbeda5aad583dc32
6: 0x7f811bcdcb90 - std::panicking::begin_panic_fmt::h4fe9fb9d5109c4bf
7: 0x7f811bcdcb11 - rust_begin_unwind
8: 0x7f811bd1741f - core::panicking::panic_fmt::h4395919ece15c671
9: 0x7f811bd1734b - core::panicking::panic::hc74ff52ed78364e1
10: 0x7f811bc2b195 - <core::option::Option<T>>::unwrap::h287745a6792bd9bc
at /buildslave/rust-buildbot/slave/nightly-dist-rustc-linux/build/obj/../src/libcore/macros.rs:21
11: 0x7f811bc41b8c - <tokio_core::heap::Heap<T>>::remove::hf9d52bc2c83ee1dd
at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/heap.rs:71
12: 0x7f811bc4b365 - tokio_core::reactor::Inner::cancel_timeout::h7772ee9fb74940ad
at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/reactor/mod.rs:474
13: 0x7f811bc49676 - tokio_core::reactor::Core::notify::h1adc6d28c9d93874
at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/reactor/mod.rs:403
14: 0x7f811bc4ce53 - tokio_core::reactor::Remote::send::{{closure}}::hf960237408f0a20d
at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/reactor/mod.rs:507
15: 0x7f811bc4c90b - tokio_core::reactor::Remote::with_loop::{{closure}}::h1832bb427de4d5ea
at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/reactor/mod.rs:532
16: 0x7f811bc2e613 - <scoped_tls::ScopedKey<T>>::with::h6e57d5658ba0e9e7
at /home/travis/.cargo/registry/src/github.com-1ecc6299db9ec823/scoped-tls-0.1.0/src/lib.rs:169
17: 0x7f811bc4bcbd - tokio_core::reactor::Remote::with_loop::h86eacd5eb370b9fc
at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/reactor/mod.rs:529
18: 0x7f811bc4bb25 - tokio_core::reactor::Remote::send::he982eb09318059e2
at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/reactor/mod.rs:501
19: 0x7f811bc44dbc - tokio_core::reactor::timeout_token::TimeoutToken::cancel_timeout::hd3f8329f407351ea
at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/reactor/timeout_token.rs:55
20: 0x7f811bc451d7 - <tokio_core::reactor::timeout::Timeout as core::ops::Drop>::drop::h28d6e347297cbdb7
at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/reactor/timeout.rs:67
21: 0x7f811bbcaa50 - drop::h8fe4978a28fca4b2
22: 0x7f811bbc925c - drop::h3ad133ee1ca32f75
23: 0x7f811bbddeb9 - tokio_curl::imp::Data::check_timeout::h12c04688a420c888
at /home/travis/.cargo/git/checkouts/tokio-curl-54e764422eb98a95/master/src/unix.rs:406
24: 0x7f811bbdf6e5 - <tokio_curl::imp::Data as futures::Future>::poll::{{closure}}::hbf483cb013d7295a
at /home/travis/.cargo/git/checkouts/tokio-curl-54e764422eb98a95/master/src/unix.rs:151
25: 0x7f811bbb33a4 - <scoped_tls::ScopedKey<T>>::set::hddcd333f694d8d33
at /home/travis/.cargo/registry/src/github.com-1ecc6299db9ec823/scoped-tls-0.1.0/src/lib.rs:135
26: 0x7f811bbda5bb - <tokio_curl::imp::Data as futures::Future>::poll::ha5387a9110be3742
at /home/travis/.cargo/git/checkouts/tokio-curl-54e764422eb98a95/master/src/unix.rs:134
27: 0x7f811bbd6327 - <futures::map_err::MapErr<A, F> as futures::Future>::poll::h831b3981182fa89f
at /home/travis/.cargo/git/checkouts/futures-rs-a4f11d094efefb0a/master/src/map_err.rs:29
28: 0x7f811bc329f1 - <Box<F> as futures::Future>::poll::he02a88e98ecfd367
at /home/travis/.cargo/git/checkouts/futures-rs-a4f11d094efefb0a/master/src/lib.rs:227
29: 0x7f811bc4c38c - <futures::task::Spawn<F>>::poll_future::{{closure}}::h4247054fe3ce122d
at /home/travis/.cargo/git/checkouts/futures-rs-a4f11d094efefb0a/master/src/task/mod.rs:217
30: 0x7f811bc4c3de - <futures::task::Spawn<T>>::enter::{{closure}}::h2ef40e21bf314afa
at /home/travis/.cargo/git/checkouts/futures-rs-a4f11d094efefb0a/master/src/task/mod.rs:304
31: 0x7f811bc4cb05 - futures::task::set::{{closure}}::h3868fa6304a605e9
at /home/travis/.cargo/git/checkouts/futures-rs-a4f11d094efefb0a/master/src/task/mod.rs:75
32: 0x7f811bc3161a - <std::thread::local::LocalKey<T>>::with::hba1eb31bc1ccce00
at /buildslave/rust-buildbot/slave/nightly-dist-rustc-linux/build/obj/../src/libstd/thread/local.rs:245
33: 0x7f811bc3eb77 - futures::task::set::hf29ebf4e1badbc96
at /home/travis/.cargo/git/checkouts/futures-rs-a4f11d094efefb0a/master/src/task/mod.rs:72
34: 0x7f811bc2c56a - <futures::task::Spawn<T>>::enter::h9098896dd8ba81dd
at /home/travis/.cargo/git/checkouts/futures-rs-a4f11d094efefb0a/master/src/task/mod.rs:304
35: 0x7f811bc2c3ea - <futures::task::Spawn<F>>::poll_future::h224b3d5d722c3423
at /home/travis/.cargo/git/checkouts/futures-rs-a4f11d094efefb0a/master/src/task/mod.rs:217
36: 0x7f811bc4c6d6 - tokio_core::reactor::Core::dispatch_task::{{closure}}::h8ba75822601e0ca1
at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/reactor/mod.rs:337
37: 0x7f811bc2e43e - <scoped_tls::ScopedKey<T>>::set::hb38bd7bc1a40c86b
at /home/travis/.cargo/registry/src/github.com-1ecc6299db9ec823/scoped-tls-0.1.0/src/lib.rs:135
38: 0x7f811bc47dcd - tokio_core::reactor::Core::dispatch_task::h85e5ff0bfe787603
at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/reactor/mod.rs:337
39: 0x7f811bc47258 - tokio_core::reactor::Core::dispatch::he793b13f3b00750d
at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/reactor/mod.rs:297
40: 0x7f811bc46e15 - tokio_core::reactor::Core::_run::h3fb411c1d745048e
at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/reactor/mod.rs:284
41: 0x7f811bb459c6 - tokio_core::reactor::Core::run::h5b1951e578df9f55
at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/reactor/mod.rs:217
42: 0x7f811bba2e2e - tokio_request::tests::put::h174ca765f7c4fdae
at /home/travis/build/NeoLegends/tokio-request/src/lib.rs:257
43: 0x7f811bc8007b - <F as alloc::boxed::FnBox<A>>::call_box::h8891a5eabeec8e3a
44: 0x7f811bc7a2f3 - std::panicking::try::do_call::h8b4330db5afa6940
45: 0x7f811bce4ff6 - __rust_maybe_catch_panic
46: 0x7f811bc7fee9 - <F as alloc::boxed::FnBox<A>>::call_box::h4eaa9b5ba8675a97
47: 0x7f811bcdb570 - std::sys::thread::Thread::new::thread_start::h5b631f48cd23f128
48: 0x7f811ac0ae99 - start_thread
49: 0x7f811a72236c - <unknown>
From @nbigaouette in gitter
I have a client that should attempt to connect to a remote server. How can I add a timeout to the connection attempt? I use
loop_handle.tcp_connect(addr)
which returns a future, but how can I insert my timeout? Usingselect()
?loop_handle.tcp_connect(addr).select(timeout)
where timeout comes fromloop_handle.clone().timeout(Duration::from_secs(5))
gives me a type mismatch(tokio::Timeout vs tokio::TcpStream)
...
The split
method produces a read/write pair, while methods like spsc::channel
produce a write/read pair. We should ensure consistency across the stack (and with std
).
Provide a consistent story around timers. Currently, there is also the tokio-timer
crate.
In general, sometimes it is necessary to create timers without knowing about the tokio-core reactor. For example, anything at the Service
middleware layer and above should be decoupled from tokio-core.
Also, timer system integrated with epoll should be optimized for usage with a hashed wheel strategy as this is by far the most efficient timer strategy for most network related cases.
That being said, a misconfigured hash wheel has much worse "worse case" performance characteristics than a heap based timer. tokio-timer
deals with this by not accepting timeout requests that would trigger this worst case behavior, but I am unsure if a hashed wheel is better as a default than a heap.
I believe that netty provides a hashed wheel timer by default.
Need to signal EOF in read
and may want to signal "shutdown" in write
I think that is will be common to want to get the result of a of a spawned task. We should consder having remote.spawn
(and other task spawning fns) return a future w/ the result.
The question will then be what happens if the returned future is dropped instead of consumed. As per this comment I would suggest that, by default, dropping the result future cancels the task, unless a result.detach()
is called.
There should also be a "spawn" variant that doesn't return a future in order to avoid the allocation. I would suggest remote.fire(_fn_)
The timer wheel used for timeouts is suboptimal in a few cases:
In the example : https://github.com/tokio-rs/tokio-core/blob/66cff8e84b0bdbdc716fe1cd715363aa7e79aefa/examples/echo.rs
is there line :
let pair = futures::lazy(|| futures::finished(socket.split()));
I replace it on
let pair = futures::finished(socket.split());
And I get error :
Running `target/debug/examples/echo`
Listening on: 127.0.0.1:8080
thread 'main' panicked at 'TaskRc being accessed on task it does not belong to', /home/grzegorz/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.1/src/lib.rs:206
note: Run with `RUST_BACKTRACE=1` for a backtrace.
error: Process didn't exit successfully: `target/debug/examples/echo` (exit code: 101)
The current state is wrong because the bug manifests itself in runtime.
src/socket_stream.rs:25:32: 25:53 error: mismatched types [E0308]
src/socket_stream.rs:25 let ready: Async<()> = self.read.poll_read();
^~~~~~~~~~~~~~~~~~~~~
src/socket_stream.rs:25:32: 25:53 help: run `rustc --explain E0308` to see a detailed explanation
src/socket_stream.rs:25:32: 25:53 note: expected type `futures::Async<()>`
src/socket_stream.rs:25:32: 25:53 note: found type `futures::poll::Async<()>`
My file contains a use futures::Async
declaration, and I was having trouble pattern matching the ready
variable like this:
match ready {
Async::Ready(_) => ...
Async::NotReady => ...
}
it turns out that for some reason (related to this maybe? http://alexcrichton.com/futures-rs/src/futures/src/poll.rs.html#24) that the type returned here is futures::poll::Async
. I tried changing the type to use futures::poll::Async;
but that resulted in
src/socket_stream.rs:5:5: 5:25 error: module `poll` is private
Is this the correct usage?
As a work around I'm using if ready.is_ready()
, but I think the match might be preferable?
I'm currently working on a project which converts a TcpStream
into a Stream
of u8s via using poll_read() to implement the poll method of the Stream
. The problem being that creating this stream consumes the TcpStream, meaning that once I've read data out of the socket, I'm unable to write things back. If I were able to poll() the readhalf and writehalf I'd be able to implement the byte stream in terms of the split() of the socket, and be able to write in the eventual for_each call.
Is this something that can be done? I'd be up for implementing it if it seems tractable.
Thanks ๐
The term Loop
is a bit vague for role played by the type. The Loop
type listens for event notifications, then dispatches the event notifications to their targets. It is a Loop
in that it repeats this job until it is shutdown.
Three options discussed are:
IoLoop
Pro: It references I/O, which is the primary role of the component.
Con: It does more than I/O, so IoLoop
doesn't really match the scope of the component.
EventLoop
Pro: Better describes that it is doing and the term has been used in other async I/O libraries, so there is prior art for it.
Con: EventLoopHandle
is long
Reactor
Pro: Not too long and describes what the component is actually doing. There is also plenty of prior art for the term "reactor"
Con: The term comes with some baggage in that Reactor
already has meaning to some people that isn't necessarily related to what our component does.
Please excuse my potential ignorance - I'm just trying to be helpful.
I am investigating tokio/futures , and it seems tokio is very much like internals of mioco - mioco is driving coroutines, tokio is driving tasks, both based on readiness, both using mio
.
When I stumbled upon: https://github.com/tokio-rs/tokio-core/blob/master/src/reactor/io_token.rs#L13 , and investigated a bit more, I am wondering why are poll_read
and poll_write
needed at all.
I have never really seen a need to just check if IO is probably ready. In mioco one just attempt to do IO, and it works or not. As everything is driven by event loop, and spurious notifications are rare, failed attempt are very rare. So it seems to me that poll_read
and poll_write
are only slowing down the hot-path.
Eg. here:
Line 75 in c9554fa
poll_write
one still has to check result of send_to
as it's still possible that it will return an WouldBlock
(eg. when fd was cloned etc. and there are parallel accesses to it). Sometimes io can even succeed, when it was considered to be blocked, so why not take a chance? :)
By removing poll_read
and poll_write
, sharing readiness
with reactor is unnecessary. In fact, AFAIR mioco does not keep any per-IO or per-task readiness
information at all. I had it at the beginning and then I just removed it all together. If there is a notification, mioco just dispatches it to corresponding coroutine
(task
in tokio) and that's all. Even checking if given token still belongs to a given task is unnecessary : worst thing will happen is that task will be waken up needlessly, but since it happens very, very rarely, it's the hotpath that is important. And in futures
, poll
ing a future is much cheaper than switching to coroutine so it's even better tradeoff, I believe.
It seems to me all these Arcs, and atomic accesses are just holding back the performance, and considering how similar mioco dispatching works, might be the only reason why in the benchmark tests mioco is still doing quite well against tokio (sometimes still having better perf.).
Right now there's no way to gracefully shut down a Core
. There's no API for learning how many tasks are still running or receiving a notification when they're all done. Some more discussion can be found on #16.
The current proposal is to add some form of future which receives a notification when there are no tasks on the event loop any more. This would allow building up a "graceful shutdown". My personal preference would be:
impl Handle {
pub fn poll_shutdown(&mut self) -> Async<()> {
// ...
}
}
This function would return Ready
if the core has 0 tasks running on it, or NotReady
if there are tasks running. If NotReady
is returned, then the current task is scheduled to receive a notification when there are 0 tasks running.
I'm working on a library in which I need to use tokio_core::reactor::Remote
instead of tokio_core::reactor::Core
. Unfortunately, all the examples I can find are using tokio_core::reactor::Core
. @carllerche was kind enough to point me in the right direction in #75, so I now have a good idea how this works.
I tried to reproduce the tutorial using Remote
instead of Core
. Here is the code I came up with so far:-
extern crate futures;
extern crate tokio_core;
extern crate tokio_tls;
use std::net::ToSocketAddrs;
use std::net::TcpStream as StdTcpStream;
use futures::Future;
use tokio_core::reactor::{Core, Remote};
use tokio_core::net::TcpStream;
use tokio_tls::ClientContext;
fn main() {
let remote = Core::new().unwrap().remote();
let (tx, rx) = futures::oneshot();
let stream = StdTcpStream::connect("www.rust-lang.org:443").unwrap();
remote.spawn(|handle| {
let addr = stream.peer_addr().unwrap();
let socket = TcpStream::connect_stream(stream, &addr, handle);
tx.complete(socket);
Ok(())
});
let tls_handshake = rx.and_then(|socket| {
let cx = ClientContext::new().unwrap();
cx.handshake("www.rust-lang.org", socket)
});
let request = tls_handshake.and_then(|socket| {
tokio_core::io::write_all(socket, "\
GET / HTTP/1.0\r\n\
Host: www.rust-lang.org\r\n\
\r\n\
".as_bytes())
});
let response = request.and_then(|(socket, _)| {
tokio_core::io::read_to_end(socket, Vec::new())
});
let (_, res) = response.wait().unwrap();
println!("{}", String::from_utf8_lossy(&res));
}
However, this errors out. The first error being:-
error[E0277]: the trait bound `Box<futures::Future<Item=tokio_core::net::TcpStream, Error=std::io::Error> + Send>: tokio_core::io::Io` is not satisfied
--> src/main.rs:29:12
|
29 | cx.handshake("www.rust-lang.org", socket)
| ^^^^^^^^^
I have tried calling as_ref()
(among other things) to get out of the box on the socket
on that line 29 which has the first error. How do I get this to work?
On OS X (haven't tested anything else), as soon as a connection is opened, CPU usage climbs to 100%.
I can reproduce this using the echo server example and opening a connection using netcat.
As soon as the connection is opened, the server uses 100% CPU until it is terminated (even after the netcat connection is closed).
Right now, representation details about EasyBuf
are leaked, in particular the fact that it's ultimately backed by a Vec<u8>
(which you can get immutable access to). We should probably hide these details.
The more I think about, given how industry convention there is around trying to keep IO on the same thread, would it be preferable to design the API to put emphasis on Pinned
, which keeps execution local vs. Handle
which allows IO types to get spread around threads.
This issue is currently mostly a placeholder for a discussion and will be filled out more as thoughts evolve.
Relates to #13
General goal: Make it trivial to create IO objects using only Remote
type,
instead of full-blown Handle
.
One possible use case: Building an abstraction that makes TCP connections.
Connections are created with TcpStream::new and as such require a reference to
Handle
. So, as a first approximation programmer may also put &Handle
in
their interface.
If creating a connection is first thing done in the abstraction, they will
proceed without problems. Otherwise, they will probably stash a clone of
Handle
. If returned future doesn't have to implement Send
trait, then the
work is done. On the other hand, if Send
is needed, they have to write some
non-trivial and undiscoverable piece of code:
One realization of this approach: https://github.com/sbstp/tokio-dns/blob/master/src/net.rs#L116-128
There are many possible improvements to current situation, so let me give a few
suggestions:
Remote
type in IO constructors. For those of them that already returnFuture
this seems like strictly better option, without seriousResult
, though still worthyRemote
to a Handle
, butThere is a related discussion in closed issue #6.
This issue currently represents the task of unifying "spawning" functions across Handle
, Remote
and potentially other executors (like CPU pool).
Currently Handle::spawn
takes a future where as Remote::spawn
takes a fn.
Also, there are other questions to consider, such as #77
It looks like spawn_fn
already exists on Handle
.
https://tokio-rs.github.io/tokio-core/tokio_core/reactor/struct.Handle.html#method.spawn_fn
IO cancellation is particularly important when dealing with composed network operations, e.g. timeouts and predictive operations. The web is currently dealing with this:
whatwg/fetch#20
whatwg/fetch#27
As far as I can tell, cancellation needs to be a primitive of the IO system if it is to function correctly, so IMO it is timely to consider at this time.
Needs to have interoperation somehow there.
Something like:
fn current_reactor() -> Option<&Handle> {
...
}
Even though in most networking related cases, you want a hash wheel based timer, if the hash wheel timer is not correctly configured, really bad edge cases can happen. A heap based timer is O(log n) but when well implemented has more or less uniform performance characteristics for all work loads.
I propose that we implement a heap based timer and use it as the default. We definitely should keep the hash wheel timer around and document when you want to use it (most of the time) as well as how to configure it.
The defaults config values for the hash wheel timer should be revisited. There should also be a max timeout duration setting that defaults to tick_size * wheel_size
Related to #2
It would be helpful to get the handles back to the reader
/ writer
so that they can be used again later.
As discussed with @carllerche on gitter, Handle::spawn_fn
would be nice to have. It allows two futures working with ReadHalf
and WriteHalf
to be run on the same task without wrapping handle.spawn(futures::lazy(โฆ))
around.
Currently whenever accept returns error the incoming stream will finish in a
failed state. It is not possible to continue accepting further connections
without creating a new TcpListener and rebinding it.
This is highly problematic especially that accept errors are not uncommon, and
easily produced by malicious client.
One solution, which seems to me quite ergonomic, would be to change the item
type of Incoming to Result<(TcpStream, SocketAddr), io::Error>
. Stream
consumers would be free to handle errors exactly as they wish.
The only serious limitation of this interface is the fact that you can't get
TcpListener back. Such requirement seems uncommon, and supporting it has some
non-trivial ergonomic cost. This limitation is of course pre-existing.
I've been stuck with this piece of code for about an hour...
This code is responsible for accepting incoming connections (it does handshake etc).
pub fn listen(handle: &Handle, config: Config) -> Result<Listen, io::Error> {
let listener = try!(TcpListener::bind(&config.local_address, handle));
let listen = Listen {
inner: listener.incoming()
.and_then(move |(stream, address)| accept_connection(stream, &config, address))
.boxed(),
};
Ok(listen)
}
pub struct Listen {
inner: IoStream<MessageResult<Connection>>,
}
So far, so good, everything works fine. The problem starts when I try to create a timeout inisde my accept_connection function (using tokio_core::reactor::Timeout
). After modifications my code looks like this:
pub fn listen(handle: &Handle, config: Config) -> Result<Listen, io::Error> {
let listener = try!(TcpListener::bind(&config.local_address, handle));
let handle = handle.clone();
let listen = Listen {
inner: listener.incoming()
.and_then(move |(stream, address)| accept_connection(stream, &handle, &config, address))
.boxed(),
};
Ok(listen)
}
pub struct Listen {
inner: IoStream<DeadlineStatus<MessageResult<Connection>>>,
}
So I've changed 3 lines:
+ let handle = handle.clone();
- .and_then(move |(stream, address)| accept_connection(stream, &config, address))
+ .and_then(move |(stream, address)| accept_connection(stream, &handle, &config, address))
- inner: IoStream<MessageResult<Connection>>,
+ inner: IoStream<DeadlineStatus<MessageResult<Connection>>>,
Unfortunately, I'm no longer able to compile it:
--> p2p/src/net/listen.rs:18:5
|
18 | .boxed(),
| ^^^^^
|
= note: `std::rc::Weak<std::cell::RefCell<tokio_core::reactor::Inner>>` cannot be sent between threads safely
= note: required because it appears within the type `tokio_core::reactor::Handle`
= note: required because it appears within the type `[closure@p2p/src/net/listen.rs:17:14: 17:91 handle:tokio_core::reactor::Handle, config:net::config::Config]`
= note: required because it appears within the type `futures::stream::AndThen<tokio_core::net::Incoming, [closure@p2p/src/net/listen.rs:17:14: 17:91 handle:tokio_core::reactor::Handle, config:net::config::Config], io::deadline::Deadline<net::listen::AcceptConnection>>`
The error is very clear. I cannot box the stream, cause it needs to be Send
while rc::Weak
inside Handle
is !Send
.
So... how can I use handle iniside boxed stream to return something synchronously? :D It seems that I cannot :D
After few minutes of digging I realized that one thing which would help here (a lot!) is additional (or new version of) constructor for tokio_core::reactor::Timeout
pub fn new_at(at: Instant, handle: &Remote) -> io::Result<Timeout> {
It looks like calling PollEvented::need_read
will cause the current task to be unparked immediately. This is unexpected behavior and can cause a task to get into an infinite wakeup loop if need_read
is called without a poll_read
guard before it.
This change would give up a little flexibility (which it's not clear we can actually make use of) for the sake of simplification in code that works generically with FramedIo
.
The argument passed to Loop::run
adds an extra step to getting an event loop running. I also am not convinced that it provides the easiest way to manage shutting down a loop.
One question that needs to be asked when shutting down an event loop is what should happen with open sockets? There will still be the ability to have socket handles around but they won't be functional anymore since the event loop will go away.
One reason why I think getting rid of the argument to Loop::run
is that there are going to be two primary shutdown cases:
Do nothing... run the event loop forever and have the process shutdown handle cleaning up.
Do a proper shutdown, the steps will be something like:
Also, for the event loop proper, there are multiple shutdown steps that aren't necessarily well represented by taking a future:
This could be handled by adding functions on LoopHandle
:
fn shutdown(&self) -> Shutdown;
fn shutdown_now(&self) -> Shutdown;
where Shutdown
is a future that resolves when the loop is successfully shutdown.
As discussed in the Chat example PR it would be nice to have a function in tokio-core, which takes a BufRead
and returns Stream<Item=String, Error=io::Error>
. This would make it easy to read lines asynchronously from a TcpStream
/Read
.
On the other hand side, there is tokio-line, a repository based on the idea of line-based asynchronous handling. Adding such a method to tokio-core would definitely interfere with the idea of tokio-line.
For example, say we have a Client
struct that owns a UnixStream
:
pub struct Client {
stream: UnixStream,
}
We implement a simple hello
method on the Client
:
impl Client {
pub fn hello(&self) -> IoFuture<()> {
write_all(&self.stream, "Hello!\r\n").map(|_| ()).boxed()
}
}
This won't compile, because the future returned from write_all
wants to hold onto the reference to self.stream
longer than the reference to self
is guaranteed to survive.
Is there a correct way of doing this?
rotor is similar in purpose to this library, except that it uses explicit state machines instead of futures.
What are the advantages and disadvantages of each? Why is this library better?
https://github.com/tokio-rs/tokio-core/blob/master/src/io/mod.rs#L47
ReadToEnd
and ReadExact
are exposed, for example, but Read
is not, making it impossible for code outside of tokio to refer to the return type of tokio_core::io::read
.
TaskIoRead
and TaskIoWrite
should probably be renamed as well since TaskIo
will be gone.
Current naming ideas:
SplitRead
/ SplitWrite
ReadHalf
/ WriteHalf
I'm trying to use tokio-core in a real-time (high-frequency trading) environment. I can allocate as much as I want during startup but would like to run allocation free once everything is up and running.
I understand that tokio-core allocates once per spawn, per TCP connection, etc. My concern is with mpsc_queue.rs
. This queue allocates from the heap for every insert and it's also the underlying mechanism for channels and Remote
.
I want to run one event loop per CPU core and have the loops talk to one another. I also want my own event sources, e.g. for high-precision timers and the like.
Did I miss any other places where tokio-core allocates and will you accept a PR that changes the mpsc queue to a bounded one?
Code:
extern crate tokio_core;
extern crate futures;
use tokio_core::Loop;
use futures::stream::Stream;
#[test]
fn recv_after_close() {
let mut lp: Loop = Loop::new().unwrap();
let (sender, receiver) = lp.handle().channel::<u32>();
let receiver = lp.run(receiver).unwrap();
drop(lp);
drop(sender);
receiver.wait().next();
}
This test hangs. I cannot tell for sure, but seems like it worked (i. e. did not hang) two days ago.
It would be very convenient to have a read_until
utility method like std::io::BufRead
has. Maybe even add a read_line
based on it. It would help with the chat example.
As a rule of thumb, unbounded growth is a bad idea. This is a placeholder issue to track the progress of auditing the code and bounding any growth.
When writing loops that shall be run for a long time on the event loop, it would be nice if we could yield execution to the loop at a certain point. E.g. with a using Handle::yield()
.
There should be some way to re-use the event loop to run additional IO (e.g. from a raw fd).
This seems to be the closest thing to adding a new arbitrary stream to the event loop but it's private: https://github.com/tokio-rs/tokio-core/blob/master/src/net/tcp.rs#L227
That of course doesn't answer the other side of the coin of some kind of notification mechanism for when that fd is ready for read/write. I would imagine this looks something like the now deprecated mio event loop API but that could be debated.
Motivation: You want to build a service with the higher level primitives but you also need to integrate with additional libraries (e.g. an ffi library that you run the IO for the socket) or have low level control over the IO of another socket. More concretely maybe I want to build an IPC service with tokio-service/proto and need to integrate some library into the event loop somehow.
As it stands you could of course run a second event loop in another thread around mio primitives but that seems less than ideal as tokio-core is already running such a loop.
It looks as though mio is moving towards deprecating any event loop abstraction and just providing the common poll/registration interface which makes a lot of sense. As such I think this would belong in tokio-core as it now provides the event loop abstraction.
Ideally EasyFramed
would contain something like the following:
pub fn into_upstream(self) -> T {
self.upstream
}
I find myself needing this for a protocol that switches between two different transport formats during a single session (authentication -> main data exchange).
I can submit this as a quick PR if it'd be welcome.
Hi, I was playing around with tokio and implemented my goto benchmark for any kind of async distributed language: Pass a number around a ring of threads and see how how fast it is. In erlang with ring of 100 threads I get can pass the number around +/- 250K times a second. Below is an attempt to implement the same with Tokio channels but I thought the numbers are a bit low. Its possible I have not used you library properly or there is some hidden lock somewhere as my CPU hardly goes above 180% no matter how few/many threads I spawn. It takes about 14 seconds to send the number around a loop of 2 threads 1000000 times or around 77k sec.
Again I won't be surprised if I have done something wrong. Here is the code snippet:
extern crate futures;
extern crate tokio_core;
use tokio_core::{Sender, Receiver};
use tokio_core::io::IoFuture;
use futures::stream::Stream;
use futures::{Future,finished};
use std::thread::spawn;
use std::io;
fn spawn_aux(trx: Sender<u32>, rx: IoFuture<Receiver<u32>>) {
spawn(move || {
let mut aux_loop = tokio_core::Loop::new().unwrap();
let future = rx.and_then(|s| {
s.for_each(|num| {
trx.send(num + 1)
})
});
aux_loop.run(future)
});
}
fn main() {
let mut main_loop = tokio_core::Loop::new().unwrap();
let (first_trx, mut last_rx) = main_loop.handle().channel::<u32>();
for _ in 1..2 {
let (next_trx, rx2) = main_loop.handle().channel::<u32>();
spawn_aux(next_trx, last_rx);
last_rx = rx2
}
first_trx.send(0).unwrap();
let future = last_rx.and_then(|s| {
s.take(1_000_000)
.fold(0, |_, num|{
let num = num + 1;
first_trx.send(num).unwrap();
finished::<u32, io::Error>(num)
})
});
let res = main_loop.run(future).unwrap();
println!("res {}", res);
}
available here: https://github.com/josephDunne/ring_test.git
I am really excited for this project going forward!
Currently FramedIo::write
is defined as follows:
fn write(&mut self, req: Self::In) -> Poll<(), Error>
If you call framed.write(msg)
and get back Ok(Async::NotReady)
, there's no easy way to attempt retransmission: msg
has already been moved and discarded by write
. The only way I can see to avoid this is to make a wasteful copy of msg
before every transmission attempt.
Can req
be switched from Self::In
to &Self::In
?
Hey guys, it is natural to think that timers and cancellation go hand in hand. So if i schedule a tokio_core::reactor::Timeout
like
let to = Timeout::new(Duration::from_secs(2), &handle).unwrap();
handle.spawn(to); // spawn into a running event loop
is there a way i can cancel this ?
To explain more - i have a Complete
stored and will fire it if i get something within a time period. If i don't I want to drop it, indicating that it was cancelled. So i spawn a corresponding Timeout
. However if i do get that something before the timeout then i will fire Complete
and would also to cancel the timeout.
It's my understanding that only one instance of tokio_core::reactor::Core
should be used for the entire lifetime of the program so I think having this instance in a global variable would be nice. So I tried making this instance thread safe by wrapping it in a RwLock
and storing it using lazy_static!
:-
extern crate tokio_core;
#[macro_use] extern crate lazy_static;
use tokio_core::reactor::Core;
use std::sync::RwLock;
lazy_static! {
static ref CORE: RwLock<Core> = RwLock::new(Core::new().unwrap());
}
However, this gives me the following error:-
error[E0277]: the trait bound `std::rc::Rc<std::cell::RefCell<tokio_core::reactor::Inner>>: std::marker::Send` is not satisfied
--> <lazy_static macros>:2:32
|
2 | use std :: sync :: ONCE_INIT ; static mut $ NAME : $ crate :: lazy :: Lazy < $
| ^
<lazy_static macros>:21:1: 21:40 note: in this expansion of __lazy_static_create! (defined in <lazy_static macros>)
<lazy_static macros>:9:1: 10:74 note: in this expansion of lazy_static! (defined in <lazy_static macros>)
src/lib.rs:7:1: 9:2 note: in this expansion of lazy_static! (defined in <lazy_static macros>)
|
= note: `std::rc::Rc<std::cell::RefCell<tokio_core::reactor::Inner>>` cannot be sent between threads safely
= note: required because it appears within the type `tokio_core::reactor::Core`
= note: required because of the requirements on the impl of `std::marker::Sync` for `std::sync::RwLock<tokio_core::reactor::Core>`
= note: required by `lazy_static::lazy::Lazy`
[truncated]
This traces back to mio
but I figured since @carllerche is also a member of this team it might be better to put it here.
I will use connecting a TcpStream
as an example, but the idea applies to a number of other functions on LoopHandle
.
The current primary API for connecting a TCP stream is to use LoopHandle::tcp_connect
or LoopPin::tcp_connect
(which doesn't currently exist, but I believe the current plan is to implement it). There also exists TcpStream::connect_stream
which allows for more advanced socket configuration.
IMO there are two drawbacks to this strategy.
It creates a scattered API. When a user wants to discover how to establish a TCP stream, they will most likely first look at TcpStream
. They will see TcpStream::connect_stream
and one conclusion is that this is the way to connect a TcpStream
. They will have to know to look at either LoopHandle
or LoopPin
depending on their case.
It is not an extensible pattern. The goal of tokio-core is to allow any mio::Evented
type to work with tokio_core::Loop
. Imagine struct TimerFd
, a wrapper around Linux's timerfd feature. This library would not be able to easily implement LoopHandle::create_timer()
, and will most likely fallback on the TimerFd::create(&loop_handle)
API.
I propose to keep the functions on LoopHandle
minimal in favor of keeping constructor functions on the types themselves:
struct TcpStream {
fn connect(addr: &SocketAddr, event_loop: &LoopHandle) -> IoFuture<TcpStream> {
// ...
}
}
This leaves open the question of how to differentiate between LoopHandle
, which must return an IoFuture
and LoopPin
which can complete the operation immediately.
To handle this, two functions are needed:
struct TcpStream {
fn connect(addr: &SocketAddr, event_loop: &LoopPin) -> io::Result<TcpStream> {
// ...
}
fn connect_on(addr: &SocketAddr, event_loop: &LoopHandle) -> IoFuture<TcpStream> {
// ...
}
}
However, LoopPin
refers to the currently running Loop
(this is due to the fact that it cannot be sent). So, one possible simplification would be to omit LoopPin
in the first function in favor of having a way to retrieve it from a thread local:
struct TcpStream {
fn connect(addr: &SocketAddr) -> io::Result<TcpStream> {
// ...
}
fn connect_on(addr: &SocketAddr, event_loop: &LoopHandle) -> IoFuture<TcpStream> {
// ...
}
}
In this case, if TcpStream::connect
is called off of an event loop, the function would either return an error or it could always return NotReady
until it is operated on from within the context of a running Loop
.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.