Giter VIP home page Giter VIP logo

tokio-core's People

Contributors

3hren avatar alexcrichton avatar alreece45 avatar asomers avatar aturon avatar azdle avatar bkchr avatar byron avatar carllerche avatar cramertj avatar danburkert avatar dpc avatar frewsxcv avatar johncf avatar k4rtik avatar king6cong avatar manuel-woelker avatar oberien avatar oconnor663 avatar rotty avatar rrichardson avatar sbstp avatar seanmonstar avatar seeekr avatar sfackler avatar sinkuu avatar srijs avatar tailhook avatar twmb avatar vorner avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

tokio-core's Issues

tokio::io::* futures depend on details of std::io::Read and Write implementations

These futures call std::io::Read methods and expect them to internally register the task's interest in an event if it's not immediately ready. This isn't generally true for implementations of Read and Write implementations not based on PollEvented. Tokio could introduce a new AsyncRead and AsyncWrite traits to make this requirement explicit.

tokio_core::io::Io should be split into async read/write traits

It appears to be idiomatic in tokio to take ownership of IO handles when transforming them, and therefore to split bidirectional channels into two separate objects. The existing monolithic Io abstraction doesn't play well with this, but could be split into separate traits respectively providing poll_read and poll_write, and perhaps need_read and need_write if necessary. These could be the traits suggested in #61.

Timer heap seems to fail on travis

Currently when I'm running my tests on travis there is a failing .unwrap() in the timers heap.rs. The error does not occur on my PC.

This is the backtrace (the error occurs here):

thread 'tests::put' panicked at 'called `Option::unwrap()` on a `None` value', ../src/libcore/option.rs:317
stack backtrace:
   1:     0x7f811bcd9423 - std::sys::backtrace::tracing::imp::write::h46f28e67d38b4637
   2:     0x7f811bcdd52d - std::panicking::default_hook::{{closure}}::h1d3243f546573ff4
   3:     0x7f811bcdc6ce - std::panicking::default_hook::h96c288d728df3ebf
   4:     0x7f811bcdcdb8 - std::panicking::rust_panic_with_hook::hb1322e5f2588b4db
   5:     0x7f811bcdcc52 - std::panicking::begin_panic::hfbeda5aad583dc32
   6:     0x7f811bcdcb90 - std::panicking::begin_panic_fmt::h4fe9fb9d5109c4bf
   7:     0x7f811bcdcb11 - rust_begin_unwind
   8:     0x7f811bd1741f - core::panicking::panic_fmt::h4395919ece15c671
   9:     0x7f811bd1734b - core::panicking::panic::hc74ff52ed78364e1
  10:     0x7f811bc2b195 - <core::option::Option<T>>::unwrap::h287745a6792bd9bc
                        at /buildslave/rust-buildbot/slave/nightly-dist-rustc-linux/build/obj/../src/libcore/macros.rs:21
  11:     0x7f811bc41b8c - <tokio_core::heap::Heap<T>>::remove::hf9d52bc2c83ee1dd
                        at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/heap.rs:71
  12:     0x7f811bc4b365 - tokio_core::reactor::Inner::cancel_timeout::h7772ee9fb74940ad
                        at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/reactor/mod.rs:474
  13:     0x7f811bc49676 - tokio_core::reactor::Core::notify::h1adc6d28c9d93874
                        at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/reactor/mod.rs:403
  14:     0x7f811bc4ce53 - tokio_core::reactor::Remote::send::{{closure}}::hf960237408f0a20d
                        at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/reactor/mod.rs:507
  15:     0x7f811bc4c90b - tokio_core::reactor::Remote::with_loop::{{closure}}::h1832bb427de4d5ea
                        at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/reactor/mod.rs:532
  16:     0x7f811bc2e613 - <scoped_tls::ScopedKey<T>>::with::h6e57d5658ba0e9e7
                        at /home/travis/.cargo/registry/src/github.com-1ecc6299db9ec823/scoped-tls-0.1.0/src/lib.rs:169
  17:     0x7f811bc4bcbd - tokio_core::reactor::Remote::with_loop::h86eacd5eb370b9fc
                        at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/reactor/mod.rs:529
  18:     0x7f811bc4bb25 - tokio_core::reactor::Remote::send::he982eb09318059e2
                        at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/reactor/mod.rs:501
  19:     0x7f811bc44dbc - tokio_core::reactor::timeout_token::TimeoutToken::cancel_timeout::hd3f8329f407351ea
                        at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/reactor/timeout_token.rs:55
  20:     0x7f811bc451d7 - <tokio_core::reactor::timeout::Timeout as core::ops::Drop>::drop::h28d6e347297cbdb7
                        at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/reactor/timeout.rs:67
  21:     0x7f811bbcaa50 - drop::h8fe4978a28fca4b2
  22:     0x7f811bbc925c - drop::h3ad133ee1ca32f75
  23:     0x7f811bbddeb9 - tokio_curl::imp::Data::check_timeout::h12c04688a420c888
                        at /home/travis/.cargo/git/checkouts/tokio-curl-54e764422eb98a95/master/src/unix.rs:406
  24:     0x7f811bbdf6e5 - <tokio_curl::imp::Data as futures::Future>::poll::{{closure}}::hbf483cb013d7295a
                        at /home/travis/.cargo/git/checkouts/tokio-curl-54e764422eb98a95/master/src/unix.rs:151
  25:     0x7f811bbb33a4 - <scoped_tls::ScopedKey<T>>::set::hddcd333f694d8d33
                        at /home/travis/.cargo/registry/src/github.com-1ecc6299db9ec823/scoped-tls-0.1.0/src/lib.rs:135
  26:     0x7f811bbda5bb - <tokio_curl::imp::Data as futures::Future>::poll::ha5387a9110be3742
                        at /home/travis/.cargo/git/checkouts/tokio-curl-54e764422eb98a95/master/src/unix.rs:134
  27:     0x7f811bbd6327 - <futures::map_err::MapErr<A, F> as futures::Future>::poll::h831b3981182fa89f
                        at /home/travis/.cargo/git/checkouts/futures-rs-a4f11d094efefb0a/master/src/map_err.rs:29
  28:     0x7f811bc329f1 - <Box<F> as futures::Future>::poll::he02a88e98ecfd367
                        at /home/travis/.cargo/git/checkouts/futures-rs-a4f11d094efefb0a/master/src/lib.rs:227
  29:     0x7f811bc4c38c - <futures::task::Spawn<F>>::poll_future::{{closure}}::h4247054fe3ce122d
                        at /home/travis/.cargo/git/checkouts/futures-rs-a4f11d094efefb0a/master/src/task/mod.rs:217
  30:     0x7f811bc4c3de - <futures::task::Spawn<T>>::enter::{{closure}}::h2ef40e21bf314afa
                        at /home/travis/.cargo/git/checkouts/futures-rs-a4f11d094efefb0a/master/src/task/mod.rs:304
  31:     0x7f811bc4cb05 - futures::task::set::{{closure}}::h3868fa6304a605e9
                        at /home/travis/.cargo/git/checkouts/futures-rs-a4f11d094efefb0a/master/src/task/mod.rs:75
  32:     0x7f811bc3161a - <std::thread::local::LocalKey<T>>::with::hba1eb31bc1ccce00
                        at /buildslave/rust-buildbot/slave/nightly-dist-rustc-linux/build/obj/../src/libstd/thread/local.rs:245
  33:     0x7f811bc3eb77 - futures::task::set::hf29ebf4e1badbc96
                        at /home/travis/.cargo/git/checkouts/futures-rs-a4f11d094efefb0a/master/src/task/mod.rs:72
  34:     0x7f811bc2c56a - <futures::task::Spawn<T>>::enter::h9098896dd8ba81dd
                        at /home/travis/.cargo/git/checkouts/futures-rs-a4f11d094efefb0a/master/src/task/mod.rs:304
  35:     0x7f811bc2c3ea - <futures::task::Spawn<F>>::poll_future::h224b3d5d722c3423
                        at /home/travis/.cargo/git/checkouts/futures-rs-a4f11d094efefb0a/master/src/task/mod.rs:217
  36:     0x7f811bc4c6d6 - tokio_core::reactor::Core::dispatch_task::{{closure}}::h8ba75822601e0ca1
                        at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/reactor/mod.rs:337
  37:     0x7f811bc2e43e - <scoped_tls::ScopedKey<T>>::set::hb38bd7bc1a40c86b
                        at /home/travis/.cargo/registry/src/github.com-1ecc6299db9ec823/scoped-tls-0.1.0/src/lib.rs:135
  38:     0x7f811bc47dcd - tokio_core::reactor::Core::dispatch_task::h85e5ff0bfe787603
                        at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/reactor/mod.rs:337
  39:     0x7f811bc47258 - tokio_core::reactor::Core::dispatch::he793b13f3b00750d
                        at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/reactor/mod.rs:297
  40:     0x7f811bc46e15 - tokio_core::reactor::Core::_run::h3fb411c1d745048e
                        at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/reactor/mod.rs:284
  41:     0x7f811bb459c6 - tokio_core::reactor::Core::run::h5b1951e578df9f55
                        at /home/travis/.cargo/git/checkouts/tokio-core-ec1bf80948dd483b/master/src/reactor/mod.rs:217
  42:     0x7f811bba2e2e - tokio_request::tests::put::h174ca765f7c4fdae
                        at /home/travis/build/NeoLegends/tokio-request/src/lib.rs:257
  43:     0x7f811bc8007b - <F as alloc::boxed::FnBox<A>>::call_box::h8891a5eabeec8e3a
  44:     0x7f811bc7a2f3 - std::panicking::try::do_call::h8b4330db5afa6940
  45:     0x7f811bce4ff6 - __rust_maybe_catch_panic
  46:     0x7f811bc7fee9 - <F as alloc::boxed::FnBox<A>>::call_box::h4eaa9b5ba8675a97
  47:     0x7f811bcdb570 - std::sys::thread::Thread::new::thread_start::h5b631f48cd23f128
  48:     0x7f811ac0ae99 - start_thread
  49:     0x7f811a72236c - <unknown>

Setting timeouts with futures is not currently ergonomic

From @nbigaouette in gitter

I have a client that should attempt to connect to a remote server. How can I add a timeout to the connection attempt? I use loop_handle.tcp_connect(addr) which returns a future, but how can I insert my timeout? Using select()? loop_handle.tcp_connect(addr).select(timeout) where timeout comes from loop_handle.clone().timeout(Duration::from_secs(5)) gives me a type mismatch (tokio::Timeout vs tokio::TcpStream)...

Unify timer story

Provide a consistent story around timers. Currently, there is also the tokio-timer crate.

In general, sometimes it is necessary to create timers without knowing about the tokio-core reactor. For example, anything at the Service middleware layer and above should be decoupled from tokio-core.

Also, timer system integrated with epoll should be optimized for usage with a hashed wheel strategy as this is by far the most efficient timer strategy for most network related cases.

That being said, a misconfigured hash wheel has much worse "worse case" performance characteristics than a heap based timer. tokio-timer deals with this by not accepting timeout requests that would trigger this worst case behavior, but I am unsure if a hashed wheel is better as a default than a heap.

I believe that netty provides a hashed wheel timer by default.

Consider having `remote.spawn` return a future w/ the result

I think that is will be common to want to get the result of a of a spawned task. We should consder having remote.spawn (and other task spawning fns) return a future w/ the result.

The question will then be what happens if the returned future is dropped instead of consumed. As per this comment I would suggest that, by default, dropping the result future cancels the task, unless a result.detach() is called.

There should also be a "spawn" variant that doesn't return a future in order to avoid the allocation. I would suggest remote.fire(_fn_)

Related: #75, #57

Timer wheel needs optimizing

The timer wheel used for timeouts is suboptimal in a few cases:

  • If a server is alseep for more than the duration of the entire wheel it'll scan the wheel buckets more than once. Should ensure that "one round of polling" only looks at each bucket at most once.
  • Figuring out the next timeout is currently a scan-the-whole-world operation, which can be slow.
  • Scanning forward involves traversing the entire list of timeouts up to the point you're going towards, but that seems... bad?

TaskRc being accessed on task it does not belong to

In the example : https://github.com/tokio-rs/tokio-core/blob/66cff8e84b0bdbdc716fe1cd715363aa7e79aefa/examples/echo.rs

is there line :

let pair = futures::lazy(|| futures::finished(socket.split()));

I replace it on

let pair = futures::finished(socket.split());

And I get error :

     Running `target/debug/examples/echo`
Listening on: 127.0.0.1:8080
thread 'main' panicked at 'TaskRc being accessed on task it does not belong to', /home/grzegorz/.cargo/registry/src/github.com-1ecc6299db9ec823/futures-0.1.1/src/lib.rs:206
note: Run with `RUST_BACKTRACE=1` for a backtrace.
error: Process didn't exit successfully: `target/debug/examples/echo` (exit code: 101)

The current state is wrong because the bug manifests itself in runtime.

`poll_read()` returns futures::poll::Async which is a private type?

src/socket_stream.rs:25:32: 25:53 error: mismatched types [E0308]
src/socket_stream.rs:25         let ready: Async<()> = self.read.poll_read();
                                                       ^~~~~~~~~~~~~~~~~~~~~
src/socket_stream.rs:25:32: 25:53 help: run `rustc --explain E0308` to see a detailed explanation
src/socket_stream.rs:25:32: 25:53 note: expected type `futures::Async<()>`
src/socket_stream.rs:25:32: 25:53 note:    found type `futures::poll::Async<()>`

My file contains a use futures::Async declaration, and I was having trouble pattern matching the ready variable like this:

match ready {
  Async::Ready(_) => ...
  Async::NotReady => ...
}

it turns out that for some reason (related to this maybe? http://alexcrichton.com/futures-rs/src/futures/src/poll.rs.html#24) that the type returned here is futures::poll::Async. I tried changing the type to use futures::poll::Async; but that resulted in

src/socket_stream.rs:5:5: 5:25 error: module `poll` is private

Is this the correct usage?

As a work around I'm using if ready.is_ready(), but I think the match might be preferable?

Consider making ReadHalf and WriteHalf implement poll(_read) and poll(_write)

I'm currently working on a project which converts a TcpStream into a Stream of u8s via using poll_read() to implement the poll method of the Stream. The problem being that creating this stream consumes the TcpStream, meaning that once I've read data out of the socket, I'm unable to write things back. If I were able to poll() the readhalf and writehalf I'd be able to implement the byte stream in terms of the split() of the socket, and be able to write in the eventual for_each call.

Is this something that can be done? I'd be up for implementing it if it seems tractable.

Thanks ๐Ÿ‘

Rename `Loop`

The term Loop is a bit vague for role played by the type. The Loop type listens for event notifications, then dispatches the event notifications to their targets. It is a Loop in that it repeats this job until it is shutdown.

Three options discussed are:

IoLoop

Pro: It references I/O, which is the primary role of the component.
Con: It does more than I/O, so IoLoop doesn't really match the scope of the component.

EventLoop

Pro: Better describes that it is doing and the term has been used in other async I/O libraries, so there is prior art for it.
Con: EventLoopHandle is long

Reactor

Pro: Not too long and describes what the component is actually doing. There is also plenty of prior art for the term "reactor"
Con: The term comes with some baggage in that Reactor already has meaning to some people that isn't necessarily related to what our component does.

Just trying io instead of having `poll_read` and `poll_write`?

Please excuse my potential ignorance - I'm just trying to be helpful.

I am investigating tokio/futures , and it seems tokio is very much like internals of mioco - mioco is driving coroutines, tokio is driving tasks, both based on readiness, both using mio.

When I stumbled upon: https://github.com/tokio-rs/tokio-core/blob/master/src/reactor/io_token.rs#L13 , and investigated a bit more, I am wondering why are poll_read and poll_write needed at all.

I have never really seen a need to just check if IO is probably ready. In mioco one just attempt to do IO, and it works or not. As everything is driven by event loop, and spurious notifications are rare, failed attempt are very rare. So it seems to me that poll_read and poll_write are only slowing down the hot-path.

Eg. here:

pub fn send_to(&self, buf: &[u8], target: &SocketAddr) -> io::Result<usize> {
- after poll_write one still has to check result of send_to as it's still possible that it will return an WouldBlock (eg. when fd was cloned etc. and there are parallel accesses to it). Sometimes io can even succeed, when it was considered to be blocked, so why not take a chance? :)

By removing poll_read and poll_write, sharing readiness with reactor is unnecessary. In fact, AFAIR mioco does not keep any per-IO or per-task readiness information at all. I had it at the beginning and then I just removed it all together. If there is a notification, mioco just dispatches it to corresponding coroutine (task in tokio) and that's all. Even checking if given token still belongs to a given task is unnecessary : worst thing will happen is that task will be waken up needlessly, but since it happens very, very rarely, it's the hotpath that is important. And in futures, polling a future is much cheaper than switching to coroutine so it's even better tradeoff, I believe.

It seems to me all these Arcs, and atomic accesses are just holding back the performance, and considering how similar mioco dispatching works, might be the only reason why in the benchmark tests mioco is still doing quite well against tokio (sometimes still having better perf.).

Support graceful shutdown

Right now there's no way to gracefully shut down a Core. There's no API for learning how many tasks are still running or receiving a notification when they're all done. Some more discussion can be found on #16.

The current proposal is to add some form of future which receives a notification when there are no tasks on the event loop any more. This would allow building up a "graceful shutdown". My personal preference would be:

impl Handle {
    pub fn poll_shutdown(&mut self) -> Async<()> {
        // ...
    }
}

This function would return Ready if the core has 0 tasks running on it, or NotReady if there are tasks running. If NotReady is returned, then the current task is scheduled to receive a notification when there are 0 tasks running.

How do I use a remote reactor?

I'm working on a library in which I need to use tokio_core::reactor::Remote instead of tokio_core::reactor::Core. Unfortunately, all the examples I can find are using tokio_core::reactor::Core. @carllerche was kind enough to point me in the right direction in #75, so I now have a good idea how this works.

I tried to reproduce the tutorial using Remote instead of Core. Here is the code I came up with so far:-

extern crate futures;
extern crate tokio_core;
extern crate tokio_tls;

use std::net::ToSocketAddrs;
use std::net::TcpStream as StdTcpStream;

use futures::Future;
use tokio_core::reactor::{Core, Remote};
use tokio_core::net::TcpStream;
use tokio_tls::ClientContext;

fn main() {
    let remote = Core::new().unwrap().remote();

    let (tx, rx) = futures::oneshot();

    let stream = StdTcpStream::connect("www.rust-lang.org:443").unwrap();

    remote.spawn(|handle| {
        let addr = stream.peer_addr().unwrap();
        let socket = TcpStream::connect_stream(stream, &addr, handle);
        tx.complete(socket);
        Ok(())
    });

    let tls_handshake = rx.and_then(|socket| {
        let cx = ClientContext::new().unwrap();
        cx.handshake("www.rust-lang.org", socket)
    });

    let request = tls_handshake.and_then(|socket| {
        tokio_core::io::write_all(socket, "\
                GET / HTTP/1.0\r\n\
                Host: www.rust-lang.org\r\n\
                \r\n\
            ".as_bytes())
    });

    let response = request.and_then(|(socket, _)| {
        tokio_core::io::read_to_end(socket, Vec::new())
    });

    let (_, res) = response.wait().unwrap();
    println!("{}", String::from_utf8_lossy(&res));
}

However, this errors out. The first error being:-

error[E0277]: the trait bound `Box<futures::Future<Item=tokio_core::net::TcpStream, Error=std::io::Error> + Send>: tokio_core::io::Io` is not satisfied
  --> src/main.rs:29:12
   |
29 |         cx.handshake("www.rust-lang.org", socket)
   |            ^^^^^^^^^

I have tried calling as_ref() (among other things) to get out of the box on the socket on that line 29 which has the first error. How do I get this to work?

High CPU usage after first connection

On OS X (haven't tested anything else), as soon as a connection is opened, CPU usage climbs to 100%.

I can reproduce this using the echo server example and opening a connection using netcat.
As soon as the connection is opened, the server uses 100% CPU until it is terminated (even after the netcat connection is closed).

Reconsider exposing `Vec<u8>` in `easy`

Right now, representation details about EasyBuf are leaked, in particular the fact that it's ultimately backed by a Vec<u8> (which you can get immutable access to). We should probably hide these details.

Figure out if emphasis should be put on `reactor::Handle` or `reactor::Pinned`

The more I think about, given how industry convention there is around trying to keep IO on the same thread, would it be preferable to design the API to put emphasis on Pinned, which keeps execution local vs. Handle which allows IO types to get spread around threads.

This issue is currently mostly a placeholder for a discussion and will be filled out more as thoughts evolve.

Relates to #13

Handle vs Remote when constructing IO objects.

General goal: Make it trivial to create IO objects using only Remote type,
instead of full-blown Handle.

One possible use case: Building an abstraction that makes TCP connections.
Connections are created with TcpStream::new and as such require a reference to
Handle. So, as a first approximation programmer may also put &Handle in
their interface.

If creating a connection is first thing done in the abstraction, they will
proceed without problems. Otherwise, they will probably stash a clone of
Handle. If returned future doesn't have to implement Send trait, then the
work is done. On the other hand, if Send is needed, they have to write some
non-trivial and undiscoverable piece of code:

  1. Convert a handle to a remote and stash it instead.
  2. At point when new connection is needed:
    • Create a new channel.
    • Spawn a new future using Remote::spawn that actually creates a connection.
    • Send connection through the channel.
    • Receive connection from the channel.

One realization of this approach: https://github.com/sbstp/tokio-dns/blob/master/src/net.rs#L116-128

There are many possible improvements to current situation, so let me give a few
suggestions:

  • Use Remote type in IO constructors. For those of them that already return
    a Future this seems like strictly better option, without serious
    drawbacks. Additionally Tokio could avoid some of work involved by checking
    if it is already running on desired event loop. The situation is not that
    obvious for those IO constructor that return a Result, though still worthy
    of consideration.
  • Provide utility function that not only upgrades Remote to a Handle, but
    also transfers result of computation out as a future.
  • Provide utility function to obtain a handle to the current event loop.

There is a related discussion in closed issue #6.

Switch default timer to a heap based one + other timer improvements

Even though in most networking related cases, you want a hash wheel based timer, if the hash wheel timer is not correctly configured, really bad edge cases can happen. A heap based timer is O(log n) but when well implemented has more or less uniform performance characteristics for all work loads.

I propose that we implement a heap based timer and use it as the default. We definitely should keep the hash wheel timer around and document when you want to use it (most of the time) as well as how to configure it.

The defaults config values for the hash wheel timer should be revisited. There should also be a max timeout duration setting that defaults to tick_size * wheel_size

Related to #2

Add Handle::spawn_fn

As discussed with @carllerche on gitter, Handle::spawn_fn would be nice to have. It allows two futures working with ReadHalf and WriteHalf to be run on the same task without wrapping handle.spawn(futures::lazy(โ€ฆ)) around.

Support recovery from accept errors in TcpListener::incoming()

Currently whenever accept returns error the incoming stream will finish in a
failed state. It is not possible to continue accepting further connections
without creating a new TcpListener and rebinding it.

This is highly problematic especially that accept errors are not uncommon, and
easily produced by malicious client.

One solution, which seems to me quite ergonomic, would be to change the item
type of Incoming to Result<(TcpStream, SocketAddr), io::Error>. Stream
consumers would be free to handle errors exactly as they wish.

The only serious limitation of this interface is the fact that you can't get
TcpListener back. Such requirement seems uncommon, and supporting it has some
non-trivial ergonomic cost. This limitation is of course pre-existing.

Using handle and boxed stream together

I've been stuck with this piece of code for about an hour...

This code is responsible for accepting incoming connections (it does handshake etc).

pub fn listen(handle: &Handle, config: Config) -> Result<Listen, io::Error> {
    let listener = try!(TcpListener::bind(&config.local_address, handle));
    let listen = Listen {
        inner: listener.incoming()
            .and_then(move |(stream, address)| accept_connection(stream, &config, address))
            .boxed(),
    };
    Ok(listen)
}

pub struct Listen {
    inner: IoStream<MessageResult<Connection>>,
}

So far, so good, everything works fine. The problem starts when I try to create a timeout inisde my accept_connection function (using tokio_core::reactor::Timeout). After modifications my code looks like this:

pub fn listen(handle: &Handle, config: Config) -> Result<Listen, io::Error> {
    let listener = try!(TcpListener::bind(&config.local_address, handle));
    let handle = handle.clone();
    let listen = Listen {
        inner: listener.incoming()
            .and_then(move |(stream, address)| accept_connection(stream, &handle, &config, address))
            .boxed(),
    };
    Ok(listen)
}

pub struct Listen {
    inner: IoStream<DeadlineStatus<MessageResult<Connection>>>,
}

So I've changed 3 lines:

+   let handle = handle.clone();
-           .and_then(move |(stream, address)| accept_connection(stream, &config, address))
+           .and_then(move |(stream, address)| accept_connection(stream, &handle, &config, address))
-   inner: IoStream<MessageResult<Connection>>,
+   inner: IoStream<DeadlineStatus<MessageResult<Connection>>>,

Unfortunately, I'm no longer able to compile it:

  --> p2p/src/net/listen.rs:18:5
   |
18 |            .boxed(),
   |             ^^^^^
   |
   = note: `std::rc::Weak<std::cell::RefCell<tokio_core::reactor::Inner>>` cannot be sent between threads safely
   = note: required because it appears within the type `tokio_core::reactor::Handle`
   = note: required because it appears within the type `[closure@p2p/src/net/listen.rs:17:14: 17:91 handle:tokio_core::reactor::Handle, config:net::config::Config]`
   = note: required because it appears within the type `futures::stream::AndThen<tokio_core::net::Incoming, [closure@p2p/src/net/listen.rs:17:14: 17:91 handle:tokio_core::reactor::Handle, config:net::config::Config], io::deadline::Deadline<net::listen::AcceptConnection>>`

The error is very clear. I cannot box the stream, cause it needs to be Send while rc::Weak inside Handle is !Send.

So... how can I use handle iniside boxed stream to return something synchronously? :D It seems that I cannot :D

After few minutes of digging I realized that one thing which would help here (a lot!) is additional (or new version of) constructor for tokio_core::reactor::Timeout

    pub fn new_at(at: Instant, handle: &Remote) -> io::Result<Timeout> {

PollEvented::need_read wakes up the task immediately

It looks like calling PollEvented::need_read will cause the current task to be unparked immediately. This is unexpected behavior and can cause a task to get into an infinite wakeup loop if need_read is called without a poll_read guard before it.

Consider removing the argument to `Loop::run`

The argument passed to Loop::run adds an extra step to getting an event loop running. I also am not convinced that it provides the easiest way to manage shutting down a loop.

One question that needs to be asked when shutting down an event loop is what should happen with open sockets? There will still be the ability to have socket handles around but they won't be functional anymore since the event loop will go away.

One reason why I think getting rid of the argument to Loop::run is that there are going to be two primary shutdown cases:

  1. Do nothing... run the event loop forever and have the process shutdown handle cleaning up.

  2. Do a proper shutdown, the steps will be something like:

  • Wait for a shutdown event (ctrl-c, or something else)
  • Stop accepting new sockets
  • send a shutdown signal to all open sockets
  • Wait a grace period to allow open sockets to correctly shutdown
  • Hard shutdown any lingering connections
  • Shutdown the loop

Also, for the event loop proper, there are multiple shutdown steps that aren't necessarily well represented by taking a future:

  • Stop accepting new tasks (spawn fails) and shutdown after all existing sources close
  • Stop accepting new tasks and shut the loop down immediately

This could be handled by adding functions on LoopHandle:

fn shutdown(&self) -> Shutdown;

fn shutdown_now(&self) -> Shutdown;

where Shutdown is a future that resolves when the loop is successfully shutdown.

Add io::lines

As discussed in the Chat example PR it would be nice to have a function in tokio-core, which takes a BufRead and returns Stream<Item=String, Error=io::Error>. This would make it easy to read lines asynchronously from a TcpStream/Read.

On the other hand side, there is tokio-line, a repository based on the idea of line-based asynchronous handling. Adding such a method to tokio-core would definitely interfere with the idea of tokio-line.

/cc @alexcrichton @carllerche

How to return futures generated from operating on a stream field?

For example, say we have a Client struct that owns a UnixStream:

pub struct Client {
    stream: UnixStream,
}

We implement a simple hello method on the Client:

impl Client { 
   pub fn hello(&self) -> IoFuture<()> {
       write_all(&self.stream, "Hello!\r\n").map(|_| ()).boxed()
    }
}

This won't compile, because the future returned from write_all wants to hold onto the reference to self.stream longer than the reference to self is guaranteed to survive.

Is there a correct way of doing this?

Document comparison with rotor

rotor is similar in purpose to this library, except that it uses explicit state machines instead of futures.

What are the advantages and disadvantages of each? Why is this library better?

Removing heap allocations

I'm trying to use tokio-core in a real-time (high-frequency trading) environment. I can allocate as much as I want during startup but would like to run allocation free once everything is up and running.

I understand that tokio-core allocates once per spawn, per TCP connection, etc. My concern is with mpsc_queue.rs. This queue allocates from the heap for every insert and it's also the underlying mechanism for channels and Remote.

I want to run one event loop per CPU core and have the loops talk to one another. I also want my own event sources, e.g. for high-precision timers and the like.

Did I miss any other places where tokio-core allocates and will you accept a PR that changes the mpsc queue to a bounded one?

Recv from channel hangs forever if other party is gone

Code:

extern crate tokio_core;
extern crate futures;

use tokio_core::Loop;

use futures::stream::Stream;


#[test]
fn recv_after_close() {
    let mut lp: Loop = Loop::new().unwrap();

    let (sender, receiver) = lp.handle().channel::<u32>();

    let receiver = lp.run(receiver).unwrap();

    drop(lp);
    drop(sender);

    receiver.wait().next();
}

This test hangs. I cannot tell for sure, but seems like it worked (i. e. did not hang) two days ago.

Yielding execution on the loop

When writing loops that shall be run for a long time on the event loop, it would be nice if we could yield execution to the loop at a certain point. E.g. with a using Handle::yield().

Allow RawFd Registration and Notification

There should be some way to re-use the event loop to run additional IO (e.g. from a raw fd).

This seems to be the closest thing to adding a new arbitrary stream to the event loop but it's private: https://github.com/tokio-rs/tokio-core/blob/master/src/net/tcp.rs#L227

That of course doesn't answer the other side of the coin of some kind of notification mechanism for when that fd is ready for read/write. I would imagine this looks something like the now deprecated mio event loop API but that could be debated.

Motivation: You want to build a service with the higher level primitives but you also need to integrate with additional libraries (e.g. an ffi library that you run the IO for the socket) or have low level control over the IO of another socket. More concretely maybe I want to build an IPC service with tokio-service/proto and need to integrate some library into the event loop somehow.

As it stands you could of course run a second event loop in another thread around mio primitives but that seems less than ideal as tokio-core is already running such a loop.

It looks as though mio is moving towards deprecating any event loop abstraction and just providing the common poll/registration interface which makes a lot of sense. As such I think this would belong in tokio-core as it now provides the event loop abstraction.

No way to get upstream back out of EasyFramed

Ideally EasyFramed would contain something like the following:

    pub fn into_upstream(self) -> T {
        self.upstream
    }

I find myself needing this for a protocol that switches between two different transport formats during a single session (authentication -> main data exchange).

I can submit this as a quick PR if it'd be welcome.

Possibly slow channel

Hi, I was playing around with tokio and implemented my goto benchmark for any kind of async distributed language: Pass a number around a ring of threads and see how how fast it is. In erlang with ring of 100 threads I get can pass the number around +/- 250K times a second. Below is an attempt to implement the same with Tokio channels but I thought the numbers are a bit low. Its possible I have not used you library properly or there is some hidden lock somewhere as my CPU hardly goes above 180% no matter how few/many threads I spawn. It takes about 14 seconds to send the number around a loop of 2 threads 1000000 times or around 77k sec.

Again I won't be surprised if I have done something wrong. Here is the code snippet:

extern crate futures;
extern crate tokio_core;

use tokio_core::{Sender, Receiver};
use tokio_core::io::IoFuture;
use futures::stream::Stream;
use futures::{Future,finished};
use std::thread::spawn;
use std::io;

fn spawn_aux(trx: Sender<u32>, rx:  IoFuture<Receiver<u32>>) {
    spawn(move || {
        let mut aux_loop = tokio_core::Loop::new().unwrap();
        let future = rx.and_then(|s| {
            s.for_each(|num| {
                trx.send(num + 1)
            })
        });
        aux_loop.run(future)
    });
}

fn main() {
    let mut main_loop = tokio_core::Loop::new().unwrap();
    let (first_trx, mut last_rx) = main_loop.handle().channel::<u32>();
    for _ in  1..2 {
        let (next_trx, rx2) = main_loop.handle().channel::<u32>();
        spawn_aux(next_trx, last_rx);
        last_rx = rx2
    }
    first_trx.send(0).unwrap();
    let future = last_rx.and_then(|s| {
        s.take(1_000_000)
         .fold(0, |_, num|{
             let num = num + 1;
             first_trx.send(num).unwrap();
             finished::<u32, io::Error>(num)
         })
    });
    let res = main_loop.run(future).unwrap();
    println!("res {}", res);
}

available here: https://github.com/josephDunne/ring_test.git

I am really excited for this project going forward!

FramedIo::write should borrow, not move, the input message

Currently FramedIo::write is defined as follows:

fn write(&mut self, req: Self::In) -> Poll<(), Error>

If you call framed.write(msg) and get back Ok(Async::NotReady), there's no easy way to attempt retransmission: msg has already been moved and discarded by write. The only way I can see to avoid this is to make a wasteful copy of msg before every transmission attempt.

Can req be switched from Self::In to &Self::In?

Cancel for timeouts

Hey guys, it is natural to think that timers and cancellation go hand in hand. So if i schedule a tokio_core::reactor::Timeout like

let to = Timeout::new(Duration::from_secs(2), &handle).unwrap();
handle.spawn(to); // spawn into a running event loop

is there a way i can cancel this ?

To explain more - i have a Complete stored and will fire it if i get something within a time period. If i don't I want to drop it, indicating that it was cancelled. So i spawn a corresponding Timeout. However if i do get that something before the timeout then i will fire Complete and would also to cancel the timeout.

tokio_core::reactor::Core can't be stored in a global variable

It's my understanding that only one instance of tokio_core::reactor::Core should be used for the entire lifetime of the program so I think having this instance in a global variable would be nice. So I tried making this instance thread safe by wrapping it in a RwLock and storing it using lazy_static!:-

extern crate tokio_core;
#[macro_use] extern crate lazy_static;

use tokio_core::reactor::Core;
use std::sync::RwLock;

lazy_static! {
    static ref CORE: RwLock<Core> = RwLock::new(Core::new().unwrap());
}

However, this gives me the following error:-

error[E0277]: the trait bound `std::rc::Rc<std::cell::RefCell<tokio_core::reactor::Inner>>: std::marker::Send` is not satisfied
 --> <lazy_static macros>:2:32
  |
2 | use std :: sync :: ONCE_INIT ; static mut $ NAME : $ crate :: lazy :: Lazy < $
  |                                ^
<lazy_static macros>:21:1: 21:40 note: in this expansion of __lazy_static_create! (defined in <lazy_static macros>)
<lazy_static macros>:9:1: 10:74 note: in this expansion of lazy_static! (defined in <lazy_static macros>)
src/lib.rs:7:1: 9:2 note: in this expansion of lazy_static! (defined in <lazy_static macros>)
  |
  = note: `std::rc::Rc<std::cell::RefCell<tokio_core::reactor::Inner>>` cannot be sent between threads safely
  = note: required because it appears within the type `tokio_core::reactor::Core`
  = note: required because of the requirements on the impl of `std::marker::Sync` for `std::sync::RwLock<tokio_core::reactor::Core>`
  = note: required by `lazy_static::lazy::Lazy`

[truncated]

This traces back to mio but I figured since @carllerche is also a member of this team it might be better to put it here.

Consider moving functions off of `LoopHandle` & `LoopPin`

I will use connecting a TcpStream as an example, but the idea applies to a number of other functions on LoopHandle.

The current primary API for connecting a TCP stream is to use LoopHandle::tcp_connect or LoopPin::tcp_connect (which doesn't currently exist, but I believe the current plan is to implement it). There also exists TcpStream::connect_stream which allows for more advanced socket configuration.

IMO there are two drawbacks to this strategy.

  1. It creates a scattered API. When a user wants to discover how to establish a TCP stream, they will most likely first look at TcpStream. They will see TcpStream::connect_stream and one conclusion is that this is the way to connect a TcpStream. They will have to know to look at either LoopHandle or LoopPin depending on their case.

  2. It is not an extensible pattern. The goal of tokio-core is to allow any mio::Evented type to work with tokio_core::Loop. Imagine struct TimerFd, a wrapper around Linux's timerfd feature. This library would not be able to easily implement LoopHandle::create_timer(), and will most likely fallback on the TimerFd::create(&loop_handle) API.

I propose to keep the functions on LoopHandle minimal in favor of keeping constructor functions on the types themselves:

struct TcpStream {
    fn connect(addr: &SocketAddr, event_loop: &LoopHandle) -> IoFuture<TcpStream> {
        // ...
    }
}

This leaves open the question of how to differentiate between LoopHandle, which must return an IoFuture and LoopPin which can complete the operation immediately.

To handle this, two functions are needed:

struct TcpStream {
    fn connect(addr: &SocketAddr, event_loop: &LoopPin) -> io::Result<TcpStream> {
        // ...
    }

    fn connect_on(addr: &SocketAddr, event_loop: &LoopHandle) -> IoFuture<TcpStream> {
        // ...
    }
}

However, LoopPin refers to the currently running Loop (this is due to the fact that it cannot be sent). So, one possible simplification would be to omit LoopPin in the first function in favor of having a way to retrieve it from a thread local:

struct TcpStream {
    fn connect(addr: &SocketAddr) -> io::Result<TcpStream> {
        // ...
    }

    fn connect_on(addr: &SocketAddr, event_loop: &LoopHandle) -> IoFuture<TcpStream> {
        // ...
    }
}

In this case, if TcpStream::connect is called off of an event loop, the function would either return an error or it could always return NotReady until it is operated on from within the context of a running Loop.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.