Giter VIP home page Giter VIP logo

timely-dataflow's People

Contributors

aljoscha avatar anonion0 avatar antiguru avatar benesch avatar cecca avatar comnik avatar d-e-s-o avatar frankmcsherry avatar gereeter avatar kachayev avatar kixiron avatar llogiq avatar maddyblue avatar max-sixty avatar milibopp avatar mre avatar petrosagg avatar qiuxiafei avatar qmx avatar reu avatar sga001 avatar shamrin avatar stkfd avatar tisonkun avatar umanwizard avatar utaal avatar victorzz0 avatar vks avatar waywardmonkeys avatar zerebubuth avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

timely-dataflow's Issues

Optimize progress traffic for some operators

Several operators do not require progress traffic.

Operators like map, concat, partition all produce exactly as many output records as they take as input. There should be no harm in immediately advancing all counts of records declared on their input as counts of records on their outputs. Although we don't know where the records are, or exactly when they will be consumed, the operators themselves are agnostic to this information and the only information downstream operators require is whether more records might arrive, not where they are at the moment.

Similarly, the feedback operator produces output or not based on the timestamp in question. If it is less than its upper bound, it produces the same output as input, and if not it does not produce the output. This also seems to require no advanced thinking, and could just be compiled down (although the rule is more complicated: "apply this logic" rather than "forward").

Hypothetically, the queue operator, which batches records by timestamp and releases them not-out-of-order, is an example of an operator that produces its input as output by requires notifications, so not all of these "simple" operators need to be progress-oblivious.

I think in the cases mentioned above the summary returned from get_internal_summary is actually a guarantee, not just a lower bound. It would be interesting if this could be communicated, as our current progress tracking mechanisms could just fold this into the path summaries.

Comparison between Timely-Dataflow and Apache Flink

I have been exploring both Timely-DataFlow (TD) and Apache Flink (AF) for some graph analytics use cases and it seems that both the frameworks offer similar features (e.g. iterations, statefulness, streaming/batch computations etc..). Although Flink seems to be production ready, the performance of TD was impressive - Ran a PR algo on a LiveJournal dataset on macbook pro with 16GB RAM, with worker level aggregation (only one worker). TD finished in about 13 secs where as AF took about 220 secs. These are out of the box numbers.

Are there any features in TD that are missing in AF or vice-versa ? I am trying to figure out the main architectural differences.

Periodic State Saving

So in my example I had a PR for I essentially had a bunch of aggregators, if I have a continuous calculation I'd like to persist my aggregator state every so often in real time (like every few minutes or something) how would you suggest going about doing that?

Is having the same number of workers in each process necessary?

I ran the following commands in two separate shells for the "hello" example.

cargo run --example hello -- -w2 -n2 -p0
cargo run --example hello -- -w3 -n2 -p1

And it stops after printing

worker 0:	hello 0
worker 1:	hello 1

for worker with index 0.

Is this behavior expected and you have some assumption of having the same number of workers in each process? Otherwise, I can fix it if you point me to the place to start looking for the issue.

Dynamic creation of dataflow graphs

I'm back with more questions! I'm going to state the problem/task I'm trying to solve first. I'm curious what's the correct/idiomatic approach.

Consider a stream of records identified by some composite primary key ((foo_id, bar_id, baz_id), value). Now lets say I want to compute aggregates by grouping by bar and baz and summing over foo. Now normally I could just use the aggregate operator, but the extra wrinkle is that foo_ids are related to each other via a hierarchy and I want to compute sums for every node in the tree.

So lets say the hierarchy is like {1: {2: [5,6], 3: [7,8,9]} and my input records are for ids 5,6,7,8,9 and I want to compute 1, 2 and 3.

I've done this using unary_frontier -- take each record, group it by bar/baz correctly, and accumulate it into its parent foo id. Once I get a notification that all foos within a bar/baz group have been input into the dataflow, I aggregate the rest of the way up the hierarchy and emit the aggregates. What I'm curious about is if putting all the logic into the custom operator is the "best" (most idiomatic? Best performance?) way to do this, vs creating more operators and putting more logic in timely control flow.

It's been working great on 1-2 million test records, but the full scale of my problem is ~1-5 billion records/~1-30 tb (each "value" in my case is large-ish array). I'm pretty sure this could be done differently if I was smarter about using timely control flow/ subscoping... I could see adding a nested scope for each level of this hierarchy and then using aggregate. Is that a good way to solve this problem?

What if the hierarchy isn't known at compile time? Can I create multiple subscopes if I don't know how many I'll need until runtime?

A Question About Asynchrony

How are async operations handled?

If a message to a vertex represents a request for an HTTP Request, which causes another messages to be emitted with the returned payload, how would that fit into this model?

I may be misreading things but I don't see how an async operation and the processing of a message that doesn't output in messages can be told apart.

Are async operations exclusively intended to be "outside" the system?

(Admittedly I'm new to rust and may just be misunderstanding things).

Fault tolerance and availability

In the Naiad paper, we learn that:

Naiad has a simple but extensible implementation of fault tolerance: each stateful vertex implements a CHECKPOINT and RESTORE interface, and the system invokes these as appropriate to produce a consistent checkpoint across all workers.

Does Timely Dataflow also have this mechanism? Does it have an alternative? Does it not need it? Might it one day get it? Can it be implemented by users?

Sorry if these are silly questions. I'm only just learning about Naiad and Timely.

Use cases for timely dataflow

I've been working on some data streaming mechanisms from devices (like IoT) into data bases. The on-device part of data generator is implemented in Rust. Currently using MQTT for transport of the data and then held in a queue (rabbitmq or cloud pub/sub).

I'm currently evaluating Apache beam to push data into Google's BigQuery which I've been using already and hugely impressed with. I stumbled across this project as I was looking for a Rust alternative. I haven't fully gone through your documentation or series blog post (apologies!) but before that I have few queries

Am I correct to assume timely is addressing the same use cases as spark or storm or beam like projects?

Also, could you have computations running in separate nodes?

Stream ergonomics vs code complexity

The Stream type presently wraps a G: Graph reference so that it is fully capable of attaching itself to an operator in the graph with no additional help. This introduces some annoyances, in that the Stream type also needs a lifetime, and these are some times mysterious borrows and runtime panics that might not be expected (the programmer needs some discipline to not try and access the graph twice at the same time).

An alternate approach, less appealing from the ergonomics point of view but a bit simpler, is to require that methods on Stream, things like map, filter, concat, each take a &mut G in addition to other arguments, and to keep the Stream itself from managing this reference.

The intent here is to see if it is possible to leverage Rust's own borrowing constraints to statically provide the constraints that RefCell enforces dynamically. If graph is exclusively borrowed, perhaps because a subgraph has borrowed it to do some work, it is unavailable for others to use. This would disable graph manipulation anywhere other than the current subgraph.

Code that previously looked like:

let result = stream.concat(&mut other)
                   .distinct()
                   .map(|x| x + 1);

would instead look like

let result = stream.concat(&mut other, &mut graph)
                   .distinct(&mut graph)
                   .map(|x| x + 1, &mut graph);

Perhaps the ergonomics can be improved if the methods act on pairs (Stream, &mut G):

let result = (stream, &mut graph).concat(&mut other)
                                 .distinct()
                                 .map(|x| x + 1)
                                 .0;

Or slightly differently, pedagogically:

let result = stream.enable(&mut graph)
                   .concat(&mut other)
                   .distinct()
                   .map(|x| x + 1)
                   .disable();

This is basically stating explicitly what we expect to happen with the RefCell<&mut G> in the current implementation, but statically enforced.

Although the code here looks more pained, I would imagine improvements to the respective bits of code related to setting up RefCell<&mut G> guards and ensuring that they live long enough but not too long. However, it may be that zero people spend time setting up subgraphs, and far more want to use the iterator-looking syntax.

Lifetimes

A downside to the proposed approach is that Stream having a lifetime has the potential (I'm not sure about guarantee) to only allow streams to interact with streams from the same subgraph scope. Without the lifetime constraint they would be able to interact with any other stream with the same G: Graph type, which could be a different subgraph (though, with the same timestamp). Needless to say this would cause other runtime issues (panic would be the best case).

It seems like Stream could keep its lifetime independent of the above choice. The lifetime wouldn't be used as part of a reference, just to constrain each Stream to have a common lifetime, presumably set by the subgraph enter function, which would use the lifetime of the subgraph reference. We wouldn't get a simpler Stream with the above proposal in this case, just one that has better static guarantees about graph structure.

Possibility of Timely-dataflow in HPX- C++Runtime system based on PARALLEX!

Hi Guys !
I'm PraveenVelliengiri a student from India. I did Google Summer of Code - 2017 for STE||AR Group on the Project Idea Distributed Component Placement. https://summerofcode.withgoogle.com/projects/#6358041686441984.
I have recently(2 days before) came across the term " Timely Dataflow". I'm not sure what it about, but I found you Guys have docs and blogs I need to look into these. I don't know Rust though, the code base "amaze" me. In the first place I need to ask you Guys what Timely Dataflow all about ?
How it is different from Google Dataflow Model ?
I willing to implement it for the HPX - C++ runtime System https://github.com/STEllAR-GROUP/hpx.
HPX has a implementation of Dataflow- Local Control Object,
It's functionality is to support Async computations.
First Let me explain its use case.

hpx::future - represent the result of async computation, whose result is not known yet !
hpx::future f1 = // some lambda
hpx::future f2 = // some other lambda
......
hpx::future fn = //some mess

hpx::dataflow( associates , func); - Although it's not a correct syntax, it is good for explaining the meaning.
associates - represents the futures the dataflow is associated.
Dataflow will spawn a task (don't wait) once the associated futures(*) became ready and executes the corresponding the function(func) on the worker thread. Unfortunately, HPX has this one implementation for the support for dataflow. So I like to implement things which support to represent dataflow in the Program. I clearly don't know whether timely dataflow is related to this. But I'm just trying to get the essence of timely dataflow.
Whether timely dataflow is only suitable in batch-stream Processing ? whether it makes sense to implement it in some Runtime System which supports Parallel and distributive computing. It generally sits between OS and Application.
What time really represents whether its the time to execute a certain piece of code or whether the time to notify from some node for data, what actually it means ?
Would you please explain some example related to timely ( please in a pseudo-code style ) because I don't know Rust : ) Sorry for Submitting this as an issue, I don't find any communication medium to get you people though...
Please reply
Thanks
Praveen

Progress information flows slowly

Not that slowly, not to worry. ;)

Each round of execution, a Subgraph will do one round of exchange of progress information with other workers. This is sane, and I don't know that we should do it any differently. However, it has the defect that for a single-threaded computation, a subgraph with a sequence of n operators that hold capabilities until their frontiers advance will require n rounds of execution of the hosting Subgraph before moving data through each operator. In each round, progress information makes it to the next operator, but that operator's progress is not reflected until the next round of execution.

Alternately, we could flow worker-local progress information forward after each operator, and accumulate the information for exchange with other workers. This could accelerate single-threaded computations, and perhaps computations where one worker is holding up the show (although the edges to operators requiring progress information are probably exchange edges).

There is the additional limitation that an operator is only informed of progress on its inputs once it has consumed them, which means returning control back to the Subgraph for each operator. Perhaps the progress information supplied to each operator could provide enough information for the operator to locally update the information (e.g. in response to message counts), or the progress information could be indirectly hosted by something that is updated as the messages are drained (currently, the operator itself reports the count back to the parent Subgraph, who does some math and next-time-around informs the operator that all messages are consumed).

Another option is to more aggressively incrementalize the progress flow inside a Subgraph, so that we only encounter and execute child operators who have experienced change in their frontiers. This is probably tricky, in that we should still be scheduling these operators so they can read from their inputs and such, but when there are n-1 operators with nothing to do it feels silly to check with each of them to confirm this, over and over.

Improving documentation

This is a long-term improvement issue, to identify and improve lacking documentation. Although timely has #deny[missing_docs], this doesn't mean that the documentation is uniformly good. Several structs and methods could have clearer explanations of their roles and functionality, and several modules could have better explanations of their organization and principles.

Document improvement PRs are great and loved, especially with an eye towards increased clarity (if not necessarily increased word count).

Progress performance poor with open-loop input

When inputs are supplied with multiple distinct timestamps from workers which do not wait for the system to catch up, we can get into a spot with a large number of distinct timestamps in the system. Having a large number isn't fundamentally bad, but there are certain operations that are currently quadratic in the number of outstanding timestamps, which typically just causes overloaded workers to fall further behind.

The main culprits are

  1. CountMap, which just uses an unordered list for an interface that is more like a HashMap<T, i64>. This could be fixed, perhaps with an enum that switches the internal representation between a Vec and a HashMap based on sizes.

  2. MutableAntichain is a bit more problematic. The "maintain the antichain lower envelope" logic is quadratic as written, and I'm not sure of a better algorithm for general partial orders. This is a problem even if the frontier is compact, as we have a large collection of non-frontier times that we have a hard time organizing (when a frontier element departs, which element next joins the frontier?).

Given these two things, it might be good to come up with a discipline for batching inputs from workers who aren't clear on how many outstanding timestamps they've introduced into the system but not yet ensured have cleared the system.

Each worker knows how many timestamps it has introduced, and by consulting its probes it can determine how many of these it is sure have cleared (others may have cleared, perhaps having been dropped, but could be ahead of the probe frontier). It seems reasonable to have workers limit themselves by their own knowledge, though I am not certain if they should buffer their input indefinitely until timestamps drain, or start geometrically increasing buffer thresholds before shipping their data, or what. I'll have a ponder, unless others have thoughts too!

`Root` probably shouldn't implement `Scope`.

The root of timely dataflow computations currently implements the Scope trait, which gives it access to several convenience methods and simplifies the definition of child scopes (which can always point to a parent "scope" to get fresh identifiers and channels and such). At the same time, the root "scope" doesn't do several things that its contained children do, like track communication between operators it hosts.

At the same time, it would help to have more clarity with the root scope for current issues like attaching resources to contained scopes (e.g. for binding libraries lifetimes to those of running dataflow graphs).

The Root struct should either behave as a full and complete scope, or it should not implement the trait. More likely, it should implement a restricted trait related to its role as a dataflow resource allocator.

Timely communication `from_bytes` cannot re-use allocations

Timely communication has a Serialize trait, with methods

pub trait Serialize {
    /// Append the binary representation of `self` to a vector of bytes. The `&mut self` argument
    /// may be mutated, but the second argument should only be appended to.
    fn into_bytes(&mut self, &mut Vec<u8>);
    /// Recover an instance of Self from its binary representation. The `&mut Vec<u8>` argument may
    /// be taken with `mem::replace` if it is needed.
    fn from_bytes(&mut Vec<u8>) -> Self;
}

The first method seems fine for now, but from_bytes has the issue that it is unable to re-use existing Self allocations, as there is nowhere to grab one from (short of a stashed Rc<RefCell<_>> cache).

Timely's Message type gets around this by using Vec<u8> as its backing storage, which is great except that no one else can rely on this, and so returned Message objects just get dropped on the floor, as we don't know how to destructure them into useful parts.

I think we could take inspiration from Rust's Clone trait, which has a method

fn clone_from(&mut self, source: &Self) { ... }

We could totally change the signature of from_bytes to be

fn from_bytes(&mut self, &mut Vec<u8>);

with the intent that one should feel free to re-use as much of &mut self as you can afford, as long as what results is still valid. If you happen to swap around the &mut Vec<u8> in the process, good for you.

This seems strictly better than requiring a freshly constructed Self, as simply clobbering the &mut self input argument is always an option.

Assess integrating ownership

Timely dataflow exclusively moves owned data around. Although operators are defined on &Stream references, the closures and such they take as arguments can rely on getting buffers of owned data. The map operator will, no matter what, be handed owned data, and if this means that the contents of a stream need to be cloned, timely will do that.

This seems a bit aggressive in cases where you want to determine the length of some strings, or where you have a stream of large structs (e.g. 75 field DB records) and various consumers want to pick out a few fields that interest them.

It seems possible (though not obviously a good idea) that this ownership could be promoted so that timely dataflow programmers can control cloning and such by having

  1. Owned streams, Stream<Data> with methods that consume the stream and provide owned elements of type Data.

  2. Reference streams, &Stream<Data> whose methods provide only access to &Data elements, which the observer could then immediately clone if they want to reconstruct the old behavior. Maybe a .cloned() method analogous to Iterators .cloned() method?

I think this sounds good in principle, but it involves a lot of careful detail in the innards of timely. Disruption is fine if it fixes or improves things, but there are several consequences of this sort of change. For example,

  1. Right now Exchange pacts require shuffling the data, and this essentially requires owned data, when we want to shuffle to other threads in process. It does not require it for single thread execution (because the exchange is a no-op) nor for inter-process exchange (because we serialize the data, which we can do from a reference).

  2. Similarly, exchanged data emerges as "unowned" references when it comes out of Abomonation. Most operators will still hit this with a clone, though for large value types the compiler can in principle optimize this down. To determine the length of a String, the code would almost certainly still clone the string to check its length (it does this already, so this isn't a defect so much as a missable opportunity).

  3. Anything we do with references needs to happen as the data are produced at their source, or involve Rc<_> types wrapping buffers and preventing the transfer of owned data. This is because once we enqueue buffers for other operators, we shift the flow of control upward and lose an understanding of whether the references remain valid. This could be fine, because we can register actions the way we currently register "listeners" with the Tee pusher, but would need some thought.

  4. Operators like concat currently just forward batches of owned data. If instead they wanted to move references around, it seems like they would need to act as a proxy for any interested listeners, forwarding their interests upstream to each of their sources (requiring whatever the closure used to handle data be cloneable or something, if we want this behavior).

  5. Such a "register listeners" approach could work very well for operators like map, filter, flat_map and maybe others, where their behavior would be to wrap a supplied closure with another closure, so that a sequence of such operators turn in to a (complicated) closure that the compiler could at least work to optimize down.

Mostly, shifting around where computation happens, as well as whether parts of the timely dataflow graph are just fake elements that are actually a stack of closures, is a bit of a disruption for timely dataflow, but it could be that doing it sanely ends up with a system where we do less copying, more per-batch work with data in the cache, eager filtering and projection, and lots of other good stuff.

Abomonate with Enum's

I'm working on trying out timely with a more realistic use case I have and I'd like to use structs...

one of my structs looks like this...

enum EventType {
    Hover = 1,
    Click = 2,
}

struct Event {
   event_type: EventType,
   // other simple fields
}

unsafe_abomonate!(Event : event_type, ....);

The macro of course does not like the EventType field since there's no Abomonation trait impl for it. Seems like I could implement it myself though thats highly discouraged. I could just switch to an integer type, but that means rust no longer gives me the nice error when I don't match all possibilities when doing stuff like...

match event.event_type { .... }

I think in general abomonate is cool, just wondering what your thoughts are here

Compilation error

Hi,

I'm not really familiar with Rust so apologies if that is a user error. I am trying to run your pagerank and I am getting this:

Compiling timely v0.2.0
error[E0599]: no method named pop found for type timely_communication::allocator::thread::Pusher<dataflow::channels::message::Message<T, D>> in the current scope
--> /home/.../.cargo/registry/src/github.com-1ecc6299db9ec823/timely-0.2.0/src/dataflow/channels/pact.rs:26:39
|
26 | (Box::new(Pusher::new(pushers.pop().unwrap(), allocator.index(), allocator.index(), identifier)),
| ^^^

error[E0308]: mismatched types
--> /home/.../.cargo/registry/src/github.com-1ecc6299db9ec823/timely-0.2.0/src/dataflow/channels/pact.rs:27:31
|
27 | Box::new(Puller::new(puller, allocator.index(), identifier)))
| ^^^^^^ expected struct std::boxed::Box, found struct timely_communication::allocator::thread::Puller
|
= note: expected type std::boxed::Box<timely_communication::Pull<dataflow::channels::message::Message<_, _>> + 'static>
found type timely_communication::allocator::thread::Puller<dataflow::channels::message::Message<T, D>>

error: aborting due to 2 previous errors

error: Could not compile timely.

Failure building due to name "Timely Dataflow Test"

Build failed due to the spaces in the name of the crate. I expect this is a vestige of an older version of Cargo, but still happens when you import the dependency via crates.io. Seemed to fix itself when I manually compiled the timely-dataflow git repo.

Question about heap allocated objects

Hi! I think timely is really neat, and I've been trying to get a handle on it. I have a question about the correct way to send things like Vecs from operator to operator. I'm new to rust so this is probably a dumb question.

In the example below, the compiler says "cannot move out of borrowed content" on the &datum line. What's the proper way to give the vector to the session? Is it session.give(datum.clone())? When I try to clone the compiler asks for type annotations and I'm not sure if I'm on the right track.

extern crate timely;
use timely::dataflow::{InputHandle, ProbeHandle};
use timely::dataflow::operators::{Inspect, Probe};
use timely::dataflow::operators::generic::unary::Unary;
use timely::dataflow::channels::pact::Pipeline;

fn main() {
    // initializes and runs a timely dataflow.
    timely::execute_from_args(std::env::args(), |worker| {


        // create input and output handles.
        let mut input = InputHandle::new();
        let mut probe = ProbeHandle::new();

        // build a new dataflow.
        worker.dataflow(|scope| {
            input.to_stream(scope)
                .unary_stream(Pipeline, "identity",
                              move |input, output| {
                                  input.for_each(|time, data| {
                                      let mut session = output.session(&time);
                                      for &datum in data.iter() {
                                            session.give(datum);
                                      }
                                  })
                              })
                .inspect(|x| println!("seen: {:?}", x))
                .probe_with(&mut probe);
        });
// feed the dataflow with data.
        let vals = vec![1.0, 2.0];
        input.send(vals.clone());
        while probe.less_than(input.time()) {
            worker.step();
        }
        input.advance_to(1);
    }).unwrap();
}

Distributed merge sort example

I'm having fun learning timely-dataflow. To do so, I've mainly been reading @frankmcsherry's blog posts and essays.

Repeatedly, I've found myself wanting to know how a distributed merge sort would work in timely-dataflow. In particular, I want to know how to say, "keep reading until RAM is almost full, then sort the batch and emit the tuples". Are there any examples of this?

Investigate unifying `input` with `replay`, and `probe` with `capture`.

We have two general methods for introducing and exfiltrating timely streams, replay and capture respectively. At the same time, we have custom operators for input and probe, which introduce input from and report progress back to external sources. It seems like these could be unified, though perhaps there are unforeseen complications.

Similarly, to_stream seems like it could be unified with either replay or input, or ideally both. I think at the moment there is a blanket impl issue, in that the source for replay implements Iterator with an Event item type, rather than whatever data type we actually want the stream to present.

Scheduler could avoid operators with neither messages nor capabilities

In principle, the computation.step() code, which swings through each Box<Operate> recursively, has some insight into whether the operators it is about to schedule have either pending inbound messages or outstanding capabilities. The scheduler could avoid such operators, under the presumption that they cannot currently influence the rest of the dataflow. This could improve latency by moving only between operators with work to do, avoiding chit-chat with the often large numbers of operators that are currently dry.

There are some important details to double-check. For example, each call to pull_internal_progress reports a bool which means "other than messages and capabilities, do I have any other work?" which the scheduler should probably respect (it is meant for un-apparent work, like pushing data to disk, printing things to the screen, etc). This might also prevent operators from performing opportunistic work to fill empty time, unless we notice when there is nothing to do, and give everyone a slice.

Ideally, we could produce a clear explanation of when and how often operators would be scheduled. At the moment it is "once per call to step(), and if it becomes less than that, we might see some unintended consequences.

Notifications out of order

I was under the assumption that the order in which notifications are delivered to an operator is following the partial order defined on timestamps. At least this seemed to be the case in past versions of timely, since our sessionization code relies on this.

However, the following code (which sometimes requests notification for future times during a notifications, like sessionization) observes notifications in a weird order:

last: (Root, 0), curr: (Root, 0), frontier: [(Root, 4)]
last: (Root, 0), curr: (Root, 1), frontier: [(Root, 4)]
last: (Root, 1), curr: (Root, 2), frontier: [(Root, 4)]
last: (Root, 2), curr: (Root, 3), frontier: [(Root, 4)]
last: (Root, 3), curr: (Root, 2), frontier: [(Root, 4)]
thread 'worker thread 0' panicked at 'assertion failed: last_notification.less_equal(curr.time())', src/bin/bug.rs:22:24

Source code:

extern crate timely; // 0.3.0

use timely::PartialOrder;
use timely::dataflow::operators::{Input, Unary, Probe};
use timely::dataflow::channels::pact::Pipeline;
use timely::progress::timestamp::RootTimestamp;

fn main() {
    timely::execute_from_args(std::env::args(), move |computation| {
        let (mut input, probe) = computation.dataflow(move |scope| {
            let (input, stream) = scope.new_input::<()>();

            let mut last_notification = RootTimestamp::new(0);
            let probe = stream.unary_notify::<(), _, _>(Pipeline, "foo", Vec::new(), 
                move |input, _, notificator| {
                    input.for_each(|time, _| {
                        notificator.notify_at(time);
                    });
                    notificator.for_each(|curr, _, notif| {
                        println!("last: {:?}, curr: {:?}, frontier: {:?}",
                            last_notification, curr.time(), notif.frontier(0));
                        assert!(last_notification.less_equal(curr.time()));
                        last_notification = curr.time().clone();
                        if *curr == RootTimestamp::new(0) {
                            notif.notify_at(curr.delayed(&RootTimestamp::new(2)));
                        }
                   });
               }).probe();
            (input, probe)
        });
    
        for epoch in 0..5 {
            input.advance_to(epoch);
            input.send(());
        }
        
        computation.step_while(|| probe.less_than(input.time()))
    }).unwrap();
}

Capture with `k` workers, replay with `j`, `k โ‰  j`

.capture()-d streams may be sent over sockets to a separate computation.

It's currently unclear how to replay those streams with a different number of workers compared to the source computation.

For example, replaying a 1-worker stream in a j-worker stream, might look like this - but will not work.

                let one_stream: Stream<_, Thing> = if index == 0 {
                    one_stream.lock().unwrap().take().unwrap().replay_into(scope)
                } else {
                    vec![].to_stream(scope)
                };

The broken progress behaviour seems due to .to_stream() expecting to exist on every worker, https://github.com/frankmcsherry/timely-dataflow/blob/eae63eb2063a6e1d3da70c781c3e879a5ccd7d93/src/dataflow/operators/generic/operator.rs#L554

We may just need a source that does not advertise initial capabilities? I'm not sure this checks out with the progress tracking math.

Doesn't build with Rust beta 1.0

There are a few dependencies that might be replaceable, including:

  1. std_misc: Future used to start up networking. This could probably be any thread stuff.
  2. collections: many places use drain, append, and push_all. They don't have to, but...
  3. hash: used in distinct for hash tables and data exchange. It is important at least in exchange that the hash function be deterministic (not re-seeded) to get data to the same place despite starting in different processes. Not sure what the right fix is at the moment.

How do you build while loops?

This is more about understanding than a bug report/feature request.

I have successfully build a little dataflow program that runs a fixed number of iterations of a particular computation. It basically generates a loop_variable, does its thing, and connects the loop at the end. Now I would like to be able to express something like "do this until this particular condition is met" rather than "do this a hundred times".

So far I'm at a loss of how to do this. I suppose there is some complexity in making sure all workers are on the same page about the decision whether to stop or not. But I can't really see, how the API supports this kind of while loop to begin with.

PS: is there a support forum/chat of some kind or is it okay to ask questions like this as a GitHub issue?

Generic aggregate operator

I've been thinking about implementing a generic aggregate operator (e.g., a reduce in MR lingo), since I've found myself writing the same pattern over and over.

The way I currently do aggregation is through a combination of unary_notify + state (let me know
if this is not the way to go!). It usually looks something like this:

let state =  // some collection (perhaps a HashMap<Timestamp, Vec<T>>). 

stream.unary_notify(Pipeline, "aggregate", vec![], |input, output, notificator| {
  input.for_each(|time, data| {
    notificator.notify_at(time.clone());
    state[&time].append(data.drain(..).collect());
  });

  notificator.for_each(|time, _| {
    let aggregate = //perform aggregation logic for all values of state[time]
    output.session(&time).give(aggregate);
   // garbage collect state[&time]
  }
}

Sometimes it is possible to update state as you go instead of appending to a list and updating at the end. So it'll look like this:

let state =  // some collection (perhaps a HashMap<Timestamp, T>). 

stream.unary_notify(Pipeline, "aggregate", vec![], |input, output, notificator| {
  input.for_each(|time, data| {
    notificator.notify_at(time.clone());
    state[&time] = // aggregation_logic(state[&time], data)
  });

  notificator.for_each(|time, _| {
    output.session(&time).give(state[&time]);
    // garbage collect state[&time]
  }
}

I was wondering if it would be easy to generalize these two cases so one can just do:

stream.aggregate(|x: Vec<T>| { aggregation logic that acts on x })  
stream.aggregate_it(|x: T, y: T| { aggregation logic in terms of x and y } )

Does this seem like a good idea? I think that all that would need to be done is something very similar to the way unary_notify is implemented (in unary.rs), but extend the struct Operator to incorporate this extra state (in addition to writing the necessary boilerplate)?

Any thoughts? Apologies if this is already covered by differential_dataflow's group operator (I honestly couldn't make sense of it).

build failure

I get an error when trying to build this. Not sure why it does not compile. This is with 1.6, but I also tried 1.7-beta and got the same error. HEAD is dee0919

 src/dataflow/channels/pact.rs:21:37: 21:65 error: no associated item named `new` found for type `timely_communication::allocator::thread::Thread` in the current scope
 src/dataflow/channels/pact.rs:21         let (mut pushers, puller) = Thread::new::<Message<T, D>>();
                                                                      ^~~~~~~~~~~~~~~~~~~~~~~~~~~~
 src/dataflow/channels/pact.rs:23:31: 23:44 error: the type of this value must be known in this context
 src/dataflow/channels/pact.rs:23         (Box::new(Pusher::new(pushers.pop().unwrap(), allocator.index(), allocator.index(), identifier)),
                                                                ^~~~~~~~~~~~~
 error: aborting due to 2 previous errors
 Could not compile `timely`.

Thanks,

Improving doc_tests

Document tests, examples written into the documentation for structs, methods, and modules, do a great job of explaining how one might use the associated code. They are also somewhat rare in the code at the moment. Occasionally this is because the associated code is terrifying and shouldn't be used, but when this isn't the case (or shouldn't be the case), we can improve the situation with better examples.

Document test PRs are much loved, especially ones that demonstrate the fundamental features of the code with some sort of test at the end showing what should be true about the result or behavior.

Pivot scheduling from polling to event-driven.

At the moment timely workers effectively poll the operators they manage, scheduling each operator once for each call to worker.step(). This works as long as you don't mind the wasted effort, but it is somewhat gross, can tie up time in larger dataflows, and recently has gotten in the way of logging as well (all of the pointless activations gum up the logs).

Instead, we could switch to an event-driven approach, where operators think a bit harder about whether they should be run at all. For example, operators with no incoming messages, no progress changes, and no work performed in their last invocation are probably up for being parked until something changes. Entire subgraphs can similarly be parked, so that we only drill down into parts of the dataflow graph that are currently active, or that need to be woken up for some reason.

I'm not 100% sure what those reasons should be at the moment, but my current thinking is:

Operators are executed when (i) there are inbound messages or progress changes, (ii) the operator has recently performed work (in its last invocation), or (iii) when the operator indicates that despite the absence of these it has some work it would like to do. In the absence of new stimulus, ongoing work, or explicit requests, the operator may be parked and re-executed only when they would receive additional stimulus.

  1. At the moment workers are not explicitly told about incoming messages, from the communication threads, which is part of what would signal "new stimulus" (either in the form of data messages or progress messages). We could totally introduce this. My current thinking is that each worker could have a "todo" queue of operator identifiers, which can be written to by other threads. This would allow the communication thread to tickle operators without requiring them to upcall through owned data, and allow external threads to tickle operators when e.g. callbacks complete (e.g., async io to the filesystem or network). We log (and could surely provide to workers) the map from operator identifier to its "address" in the dataflow (a sequence of integer identifiers indicating its root-to-leaf path), from which the worker can determine which operators it should explore.

  2. At the moment operators do not have a way to indicate that they are comfortable being de-scheduled or not. The pull_internal_progress call returns a boolean, which indicates whether the operator has any non-obvious work, i.e. should be scheduled again despite the absence of inbound frontier and held capabilities. This could be enriched to be an Option<u64> indicating a number of nanoseconds for which the operator is comfortable being descheduled; Some(0) would indicate "please reschedule asap", whereas None would indicate "no requirement to reschedule".

There is probably a bit of additional work ensuring that the Subgraph scheduling logic is comfortable with operators that may run varying numbers of times. Historically, the implementation has reset various buffers under the assumption that each operator has had a chance to react appropriately, and if nothing else some assert!s will probably need to be rethought.

This doesn't seem like something that is going to land in a big hurry, but moving towards it in steps seems like a good idea.

BigData series

Hey,

I wanted to let you know there is an early version of the part 5 of my big data series here:
https://github.com/kali/kali.github.io/blob/master/_drafts/Embrace_the_glow_cloud.md

The results graphs are here:
https://github.com/kali/kali.github.io/blob/master/assets/2016-02-11-m3xl.png
https://github.com/kali/kali.github.io/blob/master/assets/2016-02-11-m32xl.png
https://github.com/kali/kali.github.io/blob/master/assets/2016-02-11-c38xl.png

I am a bit puzzled by the fact the huge c3.8xl instances are performing so badly in network configuration (probing show network is fairly good, 6 or 7 several GB/s whether or not there are started in the same "placement group").

I will also investigate if my use of ansible as a way to start the processes is not offsetting their start time enough to forbid getting under the 7 or 8 second limit. I'll try to find some trick to start all instances more simultaneously.

Anyway. If you want to shed some wisdom, feel free to.

Timely and bigdata

Hi,

I have a few weeks ahead of me to try and implement some bigdata/rust toolbox, and I'm trying to assess how timely could help me.

I am toying right now with https://amplab.cs.berkeley.edu/benchmark/ , have single-host implementation for Query 1 and Query 2 and starting to put some timely in the mix (focusing on query2, query1 is too boring).

I must admit I am struggling a bit. I got it running on small datasets, but so far haven't been able to coerce it into doing things in the right order โ€” not loading everything from disk before starting to reduce the data, in order not to overflow memory. Would you be interested in having a look and give me some insight as to what I should do ? The code is there, https://gitlab.zoy.org/kali/dx16 . The most interesting part would be https://gitlab.zoy.org/kali/dx16/blob/master/src/bin/query2_timely.rs.

I was hoping to grab you on IRC, but haven't seen you there for a while.

Thanks.

Minimize copies

I've been trying to do a bit of review of the copies that go on in the timely and timely communication path. I think several of them can be removed, but first I thought I would try and explain what each of them are.

Let's go in order from a message received in timely communication, in BinaryReceiver::recv_loop().

  1. There is almost certainly a copy in

    let read = self.reader.read(&mut self.buffer[self.length..]).unwrap_or(0);

    where we collect whatever data the kernel has for us. In the absence of some zero-copy interface to the networking, I think this is probably going to stick around. Though, we may have to think a bit harder about where we copy into.

  2. Just a bit below, we have

    target.send(slice[..h_len].to_vec()).unwrap();

    This is where we peel out the bytes destined for a specific (worker, dataflow, channel) tuple and send the bytes along to that destination. Because this is a different thread with no lifetime relationship, we invoke .to_vec() to get an owned allocation.

  3. The byte allocation arrives at communication's binary::Puller, where it is not copied. This is a clever moment where we deserialize into the Message type, whose from_bytes method takes ownership of the Vec<u8> buffer and invokes Abomonation's decode method to get references.

  4. This Message gets handed to operator code, and if the operator only needs a &[Record] then no copy needs to happen. However, if the operator needs a &mut Vec<Record> then the DerefMut implementation will invoke a clone() on the &Vec<Record>, which will surely do some allocations. The byte buffer is dropped at this point.

  5. Operators can supply outputs either record-by-record, or as a ready-to-send batch of records. In either case, if they hit a data exchange channel they will need to be moved. This is essentially a copy, but it seems largely unavoidable if we want to put the records destined for remote workers into contiguous memory. This is where the "shuffle" actually needs to happen, and it seems legit.

  6. If serialization is required, then <Message as Serialize>::into_bytes() is invoked, and it will do an allocation of a Vec<u8> and a copy into it. The only way we know how to turn general Vec<Record> types into bytes is using Abomonation's encode, and this copies. In principle, we could "steal" the allocation of the vector itself, and only serialize subsequent owned data.

  7. The header (fixed sized struct) and bytes are sent to BinarySender::send_loop(), in which we write both to a W: Writer. This happens to be a BufWritter wrapped around a network stream, which mean a copy into the buffer, and probably a copy out of the buffer when it eventually gets around to writing at the network stream in bulk (the second of which is intrinsic in the TcpStream api).

I think three of these are somewhat non-negotiable at the moment: i. the copy from kernel buffers when we read from the network stream (in 1.), ii. the copy as we do the data shuffle (in 5.), and the copy back into kernel buffers (in 7.).

This leaves us with four potential copies that could be superfluous.

  1. This copy could be avoided using something like the bytes crate, where one hands out multiple references to a common allocation, and the API ensures that the references are to disjoint ranges.

    This could also be avoided by doing smaller reads into independently owned allocations; each read pulls down the next payload and the subsequent header, which tells us how much to read for the next allocation (and could tell us a size and alignment). This has the potential risk that if there are many small methods we do many small reads, possibly doing lots of kernel crossings. In that case, it seems like copies are an unavoidable cost of moving many messages using few kernel crossings.

  2. This wasn't actually a copy, but it has a number so we want to put it here.

  3. This copy is self-inflicted, in that one could write operator code that doesn't even need a mutable reference to the source data. It isn't always natural to do this, but if your code insists on owned data with owned allocations then this is non-negotiable, as we don't control the Rust codegen that needs to work correctly with the data it is handed.

    One candidate bit of cuteness is: if we are handed an owned Vec<u8>, in conflict with the optimization for (2.), we could arrange that the data are laid out so that the same allocation can be used as the spine of the Vec<Record>. This could still mean copying if these types have allocations behind them, and it is important that we got the Vec<u8> as a Vec<Record> because the deallocation logic is allowed to explode if we dealloc with a different size or alignment, but it could be possible for something like this to work.

  4. We decided that the shuffle move was non-optional, but I have to put it here to make markdown numbers work out.

  5. When we go from Vec<Record> to Vec<u8> we have relatively few options. We could grab the spine of the vector and only serialize auxiliary data (perhaps to the same allocation, if there is enough space). This would mean no copies here for data without further owned memory, and in the absence of any further information we would have no choice I think (if each Record contains a bunch of String fields and such).

    One alternative is something like the CapnProto builder patterns, where instead of allocating a Rust object for output you directly serialize the result at a byte buffer. This is possible, though I don't know how ergonomic it ends up being (that is, you could write the code by hand, but you probably wouldn't want to).

  6. One of these copies seems unescapable (the kernel buffer copy), but the BufWriter copy seems optional. It does some very good things for us, in terms of minimizing kernel crossings. This could be somewhat avoided if each worker produced a consolidated Vec<u8> to send to each remote process, rather than separate allocations for each channel, and each remote worker. This seems possible, though again awkward. The shuffle that happens means to colocate data for each worker, and we don't know how large each of these will be before sending them. We could commit to certain spacing (e.g. 1024 records for each worker) and start writing each worker at an offset of a common buffer for each process, with some inefficiency if there is skew of any sort. In any case, operator code currently produces output records one at a time, and we need to do something with each of these records.

One meta-point, which I don't really know that I understand, is that we may be able to absolve ourselves of copies that do not leave low level caches. Each copy that remains within the L1 cache could be viewed as pretty cheap, relative to all the other operations going on. So, deserialization code for small buffers might be relatively cheap, as compared to copying each frame into a new allocation (2.). I'm slightly making this up, but understanding "zero copy" and which costs it is really avoiding seems like a good thing to do along the way.

Decoupling progress reasoning from Subgraph structure

The progress/nested directory relates to a specific subgraph construction patten, one which happens to be fairly useful but which is not the only way one might do things. Intermingled with this are the structures and reasoning for establishing the summaries of how ports on operators might reach each other (what must happen to the various timestamps).

This should surely be decoupled, so that others can use the infrastructure if needed (and so that it can be independently documented and tested).

Unify generic operator implementations

We have at least two paths to generic operator implementations at the moment, the unary and binary modules, and the operator module. The first two are "fine", but have several baked in system support, whereas the latter uses capabilities exclusively and has no other secret contracts with the system. With the operator module, for example, notification is a strictly "user-mode" operation.

Each of unary and binary have two variants, for whether the operator should or should not be presented with frontier information on each input. The distinction is minor, but when you give up access to the frontier information you slightly optimize the inner loop of progress tracking (this had noticeable benefits in microbenchmarks back in the day, but could be re-tested now). Each operator required the user to specify what initial capabilities they required, which often lead to Vec::new() arguments "just because" even though no one knew why they needed them.

By contrast, the operate implementations have you provide a closure from an initial capability (wrapped around the default time), to a closure that acts on input and output handles. Each of its unary and binary variants have a with and without frontier, as above. There is also a "zero-ary" method source which allows you to define an input operator with just a closure (and which should probably become how operators like input and replay_into work).

This issue is that we should unify these two implementations, as they have fundamentally overlapping functionality but different syntax, idioms, and secret contracts with the system.

At the same time, I think we should review the operators for ergonomics, and whether there are simple defaults that present an easily understood interface, with more sophisticated options presented as more sophisticated methods (or some other way, but simple defaults at least).

For example, I endorse the idea that unary should ultimately be a method that takes (i) a parallelization contract, (ii) a tasteful name, and (iii) a closure on frontiered input and output handles. Operators that require an initial capability could have a more complex method (e.g. unary_with_capability). Operators that want to discard the ability to consult frontiers in the interest of system performance could have a more complex method (e.g. unary_without_frontier). Operators that want both could have yet another method (or perhaps we always provide a capability to sophisticated methods, as the user has "opted in" to reading the docs and understanding what is going on).

In any event, it would be good to have a sense for ergonomics of these operators. I've written quite a few and am probably the wrong person to ask. I would love to hear observations from others (e.g. "simple would be great; I was often confused" or "it was fine because you could just ignore the argument" or "closures to closures are wicked complicated" or whatever else).

`Input` doesn't support batch loading

The input::Handle type takes record-at-a-time input, but doesn't have support for folks who show up with a pile of pre-batched data. It seems relatively easy to flush the current buffered contents and then ship the provided batch of data.

The advantage here could be much less copying. In particular, several tpch differential dataflow queries do fairly aggressive filtering as the first step, and this would remove a fairly substantial amount of copying from the first step of the dataflow (instead, filter---which uses a Pipeline channel---could receive the provided buffers directly, and read out of them without ever copying).

Reviewing motivation sections in the book

Finally, the following two statements on "When to use Timely Dataflow" section helped me a lot to understand things. Thank you!

Timely dataflow is a dataflow system, and this means that at its core it likes to move data around.

Dataflow systems are also fundamentally about breaking apart the execution of your program into independently operating parts.

Build of frontier.rs fails with 0.13-nighly

I tried the current code with the latest nightlies today (rustc 0.13.0-nightly (42deaa5e4 2014-12-16 17:51:23 +0000), apparently), and I get lots of lifetime-related errors for frontier.rs:

/home/malte/Projects/timely-dataflow/src/progress/frontier.rs:76:80: 76:97 error: the parameter type `T` may not live long enough
/home/malte/Projects/timely-dataflow/src/progress/frontier.rs:76         self.update_iter_and(updates.iter().map(|&x| x), |time, val| { results.update(time, val); });
                                                                                                                                                ^~~~~~~~~~~~~~~~~
/home/malte/Projects/timely-dataflow/src/progress/frontier.rs:76:80: 76:97 help: consider adding an explicit lifetime bound `T: 'static`...
/home/malte/Projects/timely-dataflow/src/progress/frontier.rs:76         self.update_iter_and(updates.iter().map(|&x| x), |time, val| { results.update(time, val); });
                                                                                                                                                ^~~~~~~~~~~~~~~~~
/home/malte/Projects/timely-dataflow/src/progress/frontier.rs:76:80: 76:97 note: ...so that the declared lifetime parameter bounds are satisfied
/home/malte/Projects/timely-dataflow/src/progress/frontier.rs:76         self.update_iter_and(updates.iter().map(|&x| x), |time, val| { results.update(time, val); });
                                                                                                                                                ^~~~~~~~~~~~~~~~~
[...]

Grepping just for the errors gives:

/home/malte/Projects/timely-dataflow/src/progress/frontier.rs:76:80: 76:97 error: the parameter type `T` may not live long enough
/home/malte/Projects/timely-dataflow/src/progress/frontier.rs:81:42: 81:62 error: the parameter type `T` may not live long enough
/home/malte/Projects/timely-dataflow/src/progress/frontier.rs:101:33: 101:48 error: the parameter type `T` may not live long enough
/home/malte/Projects/timely-dataflow/src/progress/frontier.rs:116:25: 116:41 error: the parameter type `T` may not live long enough
/home/malte/Projects/timely-dataflow/src/progress/frontier.rs:132:33: 132:47 error: the parameter type `T` may not live long enough
/home/malte/Projects/timely-dataflow/src/progress/frontier.rs:145:25: 145:42 error: the parameter type `T` may not live long enough
/home/malte/Projects/timely-dataflow/src/progress/frontier.rs:166:46: 166:66 error: the parameter type `T` may not live long enough

... following what rustc suggests and adding +'static to the definition of MutableAntichain fixes the problem and things compile. It would seem to me that Rust is a bit worried about the element that is stored in the Vec that frontier deals with going out of scope before we get to the methods here.

I don't really claim to understand what I'm doing here though ;-)

Cheers,
M.

Messages could have multiple capabilities

Right now, messages in timely dataflow carry a single capability. This simplification is helpful in that for several programs the capability's timestamp is semantically meaningful: all data in the message are treated as "arriving" or "occurring" at that timestamp. However, we could enforce single-capability messages as a discipline rather than a requirement, if it was valuable to have multiple capabilities.

I'm currently writing the "high resolution" version of differential dataflow, in which each piece of data also carries its own timestamp. There have been several cases where it would be convenient for a message to support multiple capabilities.

The general situation is: an input frontier advances from one antichain to another, and we would like to commit and transmit all updates whose times lie between the two antichains. The elements of the first frontier are sufficient capabilities to send the set of updates, but no single element is necessarily sufficient. Nor can we mint a capability at the meet of the capabilities we do hold. What happens instead is that we determine for each capability which elements it is responsible for, and commit and send these separately. This results in

  1. more and smaller messages sent around,
  2. for differential dataflow more and smaller batches that cost when we need to traverse a collection trace,
  3. artificial serialization when processing keys (as we must partition times by capability),
  4. increased code complexity as we need to handle all of this segmentation logic ourselves.

It seems reasonable to have messages carry an arbitrary number of capabilities, although it is likely to result in serious breakage of existing code, and potentially annoying ergonomics in the future. We could very easily layer a "single capability" abstraction on top to prevent the breakage and ergonomic horror, but it would be very helpful to be able to send and receive messages with multiple capabilities.

Local validation of progress updates

Ideally the progress API exposed by Operate trait, by which operators report input messages consumed, capabilities dropped and held, and output messages produced, would have some pleasant invariants about batches of updates by which we could increase our confidence in their global properties.

One intended (but not satisfied) invariant is "each newly held capability must be greater or equal to some consumed input message or held capability" and "each send output must have time greater or equal to some consumed input message or held capability".

This invariant is not currently satisfied because subgraphs eagerly report information about the future that are known to be true, but aren't justifiable yet.

For example, consider a subgraph with two inputs, managing an operator that consumes input and may hold capabilities. Imagine this graph is executed across multiple workers. One subgraph instance can receive a report from its managed operator that it has consumed a message and now holds a capability. The subgraph currently chooses to report that information upwards (it now holds a capability) but it is not yet in a position to indicate which of the two inputs the message came in through (that information is perhaps with the other worker, who performed the ingestion and sent a progress update relating this, but it has not yet been received).

Although this information is not incorrect, nor does it lead to errors in the protocol (known errors, at least), it is nonetheless confusing from the point of view of invariants maintained. The subgraph appears to be claiming a capability without consuming any input messages. There is the intent to do so in the future, and in this case we know that the only way the report of a consumed message can arrive is through the subgraph.

Perhaps the subgraph should delay this information, even though it knows it will happen, until the progress update "makes sense". It could wait until it hears about the message that justified the capability, and only then report claiming the capability; as long as the system only needs to know about the frontier, this would be the first moment it could advance to the frontier because until this point the unacknowledged message blocked the frontier.

This would allowed us to get closer to imposing invariants on each batch of updates, which should increase confidence in the protocol, at the expense of complicating the responsibility of the subgraph. On the positive side, this buffering would result in less movement of progress updates, communicating changes less frequently.

Scopes should separate channel allocation and operator ownership

The current scope design has one struct that manages both channel and identifier allocation, and operator ownership. Because of the former, the scopes get cloned all over the place so that folks can create channels and identifiers. Consequently, ownership gets cloned all over the place and we end up with reference cycles, and resources don't get released until operators are tombstoned (which is not guaranteed to happen).

Obviously this is horrible if you rely on releasing resources (or have any standards at all).

The fix doesn't seem too complicated: scopes like Root and Child should clearly distinguish between the cloneable Allocator types they have, and the non-clonable operators they own. When we need to clone a scope, we should determine that we only need the Allocator component (otherwise: weird), and just clone that.

Specifically, where Root currently looks like

pub struct Root<A: Allocate> {
    allocator: Rc<RefCell<A>>,
    graph: Rc<RefCell<Vec<Box<Operate<RootTimestamp>>>>>,
    identifiers: Rc<RefCell<usize>>,
}

calls to allocate channels and identifiers currently resolve to the first and third fields. These two should be bundled up into something called Allocator or so, and made available. At the same time, it would be helpful to remove the Clone implementations from scopes, as well as their Rc wrappers around owned operators, to clarify the single ownership.

Timely remote channels assume strong synchronization

The set-up of timely communication channels, from timely_communication, make strong assumptions about the synchronization of the workers. In particular, they assume that if a message is received for a channel that does not yet exist, it is safe to spin waiting for it to appear (the associated worker is assumed to also be constructing the same graph at the same moment, perhaps just slower).

This has the potential to go wrong if the worker is for whatever reason blocked, for example on the wrong side of a worker.step_while() call. While the workers are expected to be running equivalent code, slight non-determinism could cause some divergence.

Instead, the channels could probably easily rendezvous, with either end-point creating the appropriate (send, recv) pairs in some common location, and extracting their endpoint from the list. The process-local channels look a bit like this.

I haven't actually seen this happen in practice yet, but we haven't exercised dataflow construction at anything other than start of computation, before workers might diverge on their synchrony. If nothing else, it would be valuable to spec out what is expected to work when, for guidance on writing worker code that doesn't diverge.

Product<TOuter, TInner> implements total order

I'm not completely certain if this is a bug, but I found it very confusing that Product<TOuter, TInner> has an (automatically) derived implementation of the Ord trait which is incompatible with its (manually implemented) PartialOrd implementation. Example:

let a = Product::new(0, 1);
let b = Product::new(1, 0);
println!("{:?}", a.cmp(&b)); // A is less than B
println!("{:?}", a.partial_cmp(&b)); // A is incomparable to B

It seems to be the case that Extract is also requiring timestamps to implement Ord, so I'm not sure what this means.

Comparison to Apache Spark (which I believe was inspired by Dryad)

This may be premature, but I wondering if the authors could share a few thoughts for the data engineering laity on the conceptual (and perhaps also practical) difference between Timely Dataflow and Apache Spark.

I believe Apache Spark was inspired by the Dryad research project at Microsoft, which was a predecessor to the Naiad research project, which in turn inspired Timely Dataflow. (Did I get that right?)

timely-0.0.13/src/execute.rs:22:5: 26:7 error: mismatched types:

Hi,
I found an issue.
When I use the example of hello.rs, it shows an error as follow:
image
Cargo.toml as follow:
image
Rust version:
image
I think the version of timely crate from crates.io is old. When I replace the source code using
https://github.com/frankmcsherry/timely-dataflow
in:
/home/hduser/.cargo/registry/src/github.com-88ac128001ac3a9a/timely-0.0.13
it works.
So, it need to update timely crate in crates.io ??

Broadcast data to all workers

Is there an easy way to construct a broadcast operator (similar to exchange) that sends (i.e., copies) all stream data to all workers?

Right now I'm doing the following which sort of works, but I wonder if this is the right way of doing this.

root.scoped(|builder| {
  let index = builder.index();
  let peers = builder.peers();
  let (input, stream) = builder.new_input();
  let mut streams = vec![];

  for i in 0..peers {
      streams.push(stream.exchange(move |_| i as u64));
  }

builder.concatenate(streams).inspect(|x| println!("{}, {:?}", index, x));

Ideally, one could build something like:
stream.broadcast().inspect(|x| println!("{}, "{:?}", index, x);
My guess is that this can be done provided one implements a new ParallelizationContract. I think the only difference between Exchange and this new Broadcast pact would be the implementation of the Push trait in pushers::exchange::Exchange no?

Any thoughts?

Logging and instrumentation overhaul

For various use-cases we're working on overhauling the logging in timely dataflow, with a couple of requirements in mind:

  • instrumentation should be low/zero overhead when it's not in use;
  • we should be able to stream the logs out to a separate computation/system for real-time analysis (this is specifically for some research use-cases and should be discussed);
  • we'd like to be able to replay the instrumentation in the same computation that produced them, if at all possible.

Logging nice-to-haves

We are about to land a PR #92 that overhauls the logging infrastructure. This issue keeps track of desirable features that should be possible with this infrastructure, but aren't yet designed/implemented.

  • re-introduce the log crate
  • allow ad-hoc logging of user code (i.e. allow users of timely to produce logging streams of a type picked by them)
  • support log filtering, and selectively enabling log statements
  • make sure that log streams can be flushed when necessary
  • remove the time crate
  • determine what timestamp resolution scale is appropriate, consider using a synchronisation point between processes as the time base for the logs
  • let users toggle logging for certain dataflows

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.