Giter VIP home page Giter VIP logo

differential-dataflow's Introduction

Differential Dataflow

An implementation of differential dataflow over timely dataflow on Rust.

Background

Differential dataflow is a data-parallel programming framework designed to efficiently process large volumes of data and to quickly respond to arbitrary changes in input collections. You can read more in the differential dataflow mdbook and in the differential dataflow documentation.

Differential dataflow programs are written as functional transformations of collections of data, using familiar operators like map, filter, join, and reduce. Differential dataflow also includes more exotic operators such as iterate, which repeatedly applies a differential dataflow fragment to a collection. The programs are compiled down to timely dataflow computations.

For example, here is a differential dataflow fragment to compute the out-degree distribution of a directed graph (for each degree, the number of nodes with that many outgoing edges):

let out_degr_dist =
edges.map(|(src, _dst)| src)    // extract source
     .count()                   // count occurrences of source
     .map(|(_src, deg)| deg)    // extract degree
     .count();                  // count occurrences of degree

Alternately, here is a fragment that computes the set of nodes reachable from a set roots of starting nodes:

let reachable =
roots.iterate(|reach|
    edges.enter(&reach.scope())
         .semijoin(reach)
         .map(|(src, dst)| dst)
         .concat(reach)
         .distinct()
)

Once written, a differential dataflow responds to arbitrary changes to its initially empty input collections, reporting the corresponding changes to each of its output collections. Differential dataflow can react quickly because it only acts where changes in collections occur, and does no work elsewhere.

In the examples above, we can add to and remove from edges, dynamically altering the graph, and get immediate feedback on how the results change: if the degree distribution shifts we'll see the changes, and if nodes are now (or no longer) reachable we'll hear about that too. We could also add to and remove from roots, more fundamentally altering the reachability query itself.

Be sure to check out the differential dataflow documentation, which is continually improving.

An example: counting degrees in a graph.

Let's check out that out-degree distribution computation, to get a sense for how differential dataflow actually works. This example is examples/hello.rs in this repository, if you'd like to follow along.

A graph is a collection of pairs (Node, Node), and one standard analysis is to determine the number of times each Node occurs in the first position, its "degree". The number of nodes with each degree is a helpful graph statistic.

To determine the out-degree distribution, we create a new timely dataflow scope in which we describe our computation and how we plan to interact with it.

// create a degree counting differential dataflow
let (mut input, probe) = worker.dataflow(|scope| {

    // create edge input, count a few ways.
    let (input, edges) = scope.new_collection();

    let out_degr_distr =
    edges.map(|(src, _dst)| src)    // extract source
         .count()                   // count occurrences of source
         .map(|(_src, deg)| deg)    // extract degree
         .count();                  // count occurrences of degree

    // show us something about the collection, notice when done.
    let probe =
    out_degr_distr
        .inspect(|x| println!("observed: {:?}", x))
        .probe();

    (input, probe)
});

The input and probe we return are how we get data into the dataflow, and how we notice when some amount of computation is complete. These are timely dataflow idioms, and we won't get in to them in more detail here (check out the timely dataflow repository).

If we feed this computation with some random graph data, say fifty random edges among ten nodes, we get output like

Echidnatron% cargo run --release --example hello -- 10 50 1 inspect
    Finished release [optimized + debuginfo] target(s) in 0.05s
    Running `target/release/examples/hello 10 50 1 inspect`
observed: ((3, 1), 0, 1)
observed: ((4, 2), 0, 1)
observed: ((5, 4), 0, 1)
observed: ((6, 2), 0, 1)
observed: ((7, 1), 0, 1)
round 0 finished after 772.464µs (loading)

This shows us the records that passed the inspect operator, revealing the contents of the collection: there are five distinct degrees, three through seven. The records have the form ((degree, count), time, delta) where the time field says this is the first round of data, and the delta field tells us that each record is coming into existence. If the corresponding record were departing the collection, it would be a negative number.

Let's update the input by removing one edge and adding a new random edge:

observed: ((2, 1), 1, 1)
observed: ((3, 1), 1, -1)
observed: ((7, 1), 1, -1)
observed: ((8, 1), 1, 1)
round 1 finished after 149.701µs

We see here some changes! Those degree three and seven nodes have been replaced by degree two and eight nodes; looks like one node lost an edge and gave it to the other!

How about a few more changes?

round 2 finished after 127.444µs
round 3 finished after 100.628µs
round 4 finished after 130.609µs
observed: ((5, 3), 5, 1)
observed: ((5, 4), 5, -1)
observed: ((6, 2), 5, -1)
observed: ((6, 3), 5, 1)
observed: ((7, 1), 5, 1)
observed: ((8, 1), 5, -1)
round 5 finished after 161.82µs

Well a few weird things happen here. First, rounds 2, 3, and 4 don't print anything. Seriously? It turns out that the random changes we made didn't affect any of the degree counts, we moved edges between nodes, preserving degrees. It can happen.

The second weird thing is that in round 5, with only two edge changes we have six changes in the output! It turns out we can have up to eight. The degree eight gets turned back into a seven, and a five gets turned into a six. But: going from five to six changes the count for each, and each change requires two record differences. Eight and seven were more concise because their counts were only one, meaning just arrival and departure of records rather than changes.

Scaling up

The appealing thing about differential dataflow is that it only does work where changes occur, so even if there is a lot of data, if not much changes it can still go quite fast. Let's scale our 10 nodes and 50 edges up by a factor of one million:

Echidnatron% cargo run --release --example hello -- 10000000 50000000 1 inspect
    Finished release [optimized + debuginfo] target(s) in 0.04s
    Running `target/release/examples/hello 10000000 50000000 1 inspect`
observed: ((1, 336908), 0, 1)
observed: ((2, 843854), 0, 1)
observed: ((3, 1404462), 0, 1)
observed: ((4, 1751921), 0, 1)
observed: ((5, 1757099), 0, 1)
observed: ((6, 1459805), 0, 1)
observed: ((7, 1042894), 0, 1)
observed: ((8, 653178), 0, 1)
observed: ((9, 363983), 0, 1)
observed: ((10, 181423), 0, 1)
observed: ((11, 82478), 0, 1)
observed: ((12, 34407), 0, 1)
observed: ((13, 13216), 0, 1)
observed: ((14, 4842), 0, 1)
observed: ((15, 1561), 0, 1)
observed: ((16, 483), 0, 1)
observed: ((17, 143), 0, 1)
observed: ((18, 38), 0, 1)
observed: ((19, 8), 0, 1)
observed: ((20, 3), 0, 1)
observed: ((22, 1), 0, 1)
round 0 finished after 15.470465014s (loading)

There are a lot more distinct degrees here. I sorted them because it was too painful to look at the unsorted data. You would normally get to see the output unsorted, because they are just changes to values in a collection.

Let's perform a single change again.

observed: ((5, 1757098), 1, 1)
observed: ((5, 1757099), 1, -1)
observed: ((6, 1459805), 1, -1)
observed: ((6, 1459807), 1, 1)
observed: ((7, 1042893), 1, 1)
observed: ((7, 1042894), 1, -1)
round 1 finished after 228.451µs

Although the initial computation took about fifteen seconds, we get our changes in about 230 microseconds; that's about one hundred thousand times faster than re-running the computation. That's pretty nice. Actually, it is small enough that the time to print things to the screen is a bit expensive, so let's stop doing that.

Now we can just watch as changes roll past and look at the times.

Echidnatron% cargo run --release --example hello -- 10000000 50000000 1 no_inspect
    Finished release [optimized + debuginfo] target(s) in 0.04s
    Running `target/release/examples/hello 10000000 50000000 1 no_inspect`
round 0 finished after 15.586969662s (loading)
round 1 finished after 1.070239ms
round 2 finished after 2.303187ms
round 3 finished after 208.45µs
round 4 finished after 163.224µs
round 5 finished after 118.792µs
...

Nice. This is some hundreds of microseconds per update, which means maybe ten thousand updates per second. It's not a horrible number for my laptop, but it isn't the right answer yet.

Scaling .. "along"?

Differential dataflow is designed for throughput in addition to latency. We can increase the number of rounds of updates it works on concurrently, which can increase its effective throughput. This does not change the output of the computation, except that we see larger batches of output changes at once.

Notice that those times above are a few hundred microseconds for each single update. If we work on ten rounds of updates at once, we get times that look like this:

Echidnatron% cargo run --release --example hello -- 10000000 50000000 10 no_inspect
    Finished release [optimized + debuginfo] target(s) in 0.04s
    Running `target/release/examples/hello 10000000 50000000 10 no_inspect`
round 0 finished after 15.556475008s (loading)
round 10 finished after 421.219µs
round 20 finished after 1.56369ms
round 30 finished after 338.54µs
round 40 finished after 351.843µs
round 50 finished after 339.608µs
...

This is appealing in that rounds of ten aren't much more expensive than single updates, and we finish the first ten rounds in much less time than it takes to perform the first ten updates one at a time. Every round after that is just bonus time.

As we turn up the batching, performance improves. Here we work on one hundred rounds of updates at once:

Echidnatron% cargo run --release --example hello -- 10000000 50000000 100 no_inspect
    Finished release [optimized + debuginfo] target(s) in 0.04s
    Running `target/release/examples/hello 10000000 50000000 100 no_inspect`
round 0 finished after 15.528724145s (loading)
round 100 finished after 2.567577ms
round 200 finished after 1.861168ms
round 300 finished after 1.753794ms
round 400 finished after 1.528285ms
round 500 finished after 1.416605ms
...

We are still improving, and continue to do so as we increase the batch sizes. When processing 100,000 updates at a time we take about half a second for each batch. This is less "interactive" but a higher throughput.

Echidnatron% cargo run --release --example hello -- 10000000 50000000 100000 no_inspect
    Finished release [optimized + debuginfo] target(s) in 0.04s
    Running `target/release/examples/hello 10000000 50000000 100000 no_inspect`
round 0 finished after 15.65053789s (loading)
round 100000 finished after 505.210924ms
round 200000 finished after 524.069497ms
round 300000 finished after 470.77752ms
round 400000 finished after 621.325393ms
round 500000 finished after 472.791742ms
...

This averages to about five microseconds on average; a fair bit faster than the hundred microseconds for individual updates! And now that I think about it each update was actually two changes, wasn't it. Good for you, differential dataflow!

Scaling out

Differential dataflow is built on top of timely dataflow, a distributed data-parallel runtime. Timely dataflow scales out to multiple independent workers, increasing the capacity of the system (at the cost of some coordination that cuts into latency).

If we bring two workers to bear, our 10 million node, 50 million edge computation drops down from fifteen seconds to just over eight seconds.

Echidnatron% cargo run --release --example hello -- 10000000 50000000 1 no_inspect -w2
    Finished release [optimized + debuginfo] target(s) in 0.04s
    Running `target/release/examples/hello 10000000 50000000 1 no_inspect -w2`
round 0 finished after 8.065386177s (loading)
round 1 finished after 275.373µs
round 2 finished after 759.632µs
round 3 finished after 171.671µs
round 4 finished after 745.078µs
round 5 finished after 213.146µs
...

That is a so-so reduction. You might notice that the times increased for the subsequent rounds. It turns out that multiple workers just get in each other's way when there isn't much work to do.

Fortunately, as we work on more and more rounds of updates at the same time, the benefit of multiple workers increases. Here are the numbers for ten rounds at a time:

Echidnatron% cargo run --release --example hello -- 10000000 50000000 10 no_inspect -w2
    Finished release [optimized + debuginfo] target(s) in 0.04s
    Running `target/release/examples/hello 10000000 50000000 10 no_inspect -w2`
round 0 finished after 8.083000954s (loading)
round 10 finished after 1.901946ms
round 20 finished after 3.092976ms
round 30 finished after 889.63µs
round 40 finished after 409.001µs
round 50 finished after 320.248µs
...

One hundred rounds at a time:

Echidnatron% cargo run --release --example hello -- 10000000 50000000 100 no_inspect -w2
    Finished release [optimized + debuginfo] target(s) in 0.04s
    Running `target/release/examples/hello 10000000 50000000 100 no_inspect -w2`
round 0 finished after 8.121800831s (loading)
round 100 finished after 2.52821ms
round 200 finished after 3.119036ms
round 300 finished after 1.63147ms
round 400 finished after 1.008668ms
round 500 finished after 941.426µs
...

One hundred thousand rounds at a time:

Echidnatron% cargo run --release --example hello -- 10000000 50000000 100000 no_inspect -w2
    Finished release [optimized + debuginfo] target(s) in 0.04s
    Running `target/release/examples/hello 10000000 50000000 100000 no_inspect -w2`
round 0 finished after 8.200755198s (loading)
round 100000 finished after 275.262419ms
round 200000 finished after 279.291957ms
round 300000 finished after 259.137138ms
round 400000 finished after 340.624124ms
round 500000 finished after 259.870938ms
...

These last numbers were about half a second with one worker, and are decently improved with the second worker.

Going even faster

There are several performance optimizations in differential dataflow designed to make the underlying operators as close to what you would expect to write, when possible. Additionally, by building on timely dataflow, you can drop in your own implementations a la carte where you know best.

For example, we also know in this case that the underlying collections go through a sequence of changes, meaning their timestamps are totally ordered. In this case we can use a much simpler implementation, count_total. The reduces the update times substantially, for each batch size:

Echidnatron% cargo run --release --example hello -- 10000000 50000000 10 no_inspect -w2
    Finished release [optimized + debuginfo] target(s) in 0.04s
    Running `target/release/examples/hello 10000000 50000000 10 no_inspect -w2`
round 0 finished after 5.985084002s (loading)
round 10 finished after 1.802729ms
round 20 finished after 2.202838ms
round 30 finished after 192.902µs
round 40 finished after 198.342µs
round 50 finished after 187.725µs
...

Echidnatron% cargo run --release --example hello -- 10000000 50000000 100 no_inspect -w2
    Finished release [optimized + debuginfo] target(s) in 0.04s
    Running `target/release/examples/hello 10000000 50000000 100 no_inspect -w2`
round 0 finished after 5.588270073s (loading)
round 100 finished after 3.114716ms
round 200 finished after 2.657691ms
round 300 finished after 890.972µs
round 400 finished after 448.537µs
round 500 finished after 384.565µs
...

Echidnatron% cargo run --release --example hello -- 10000000 50000000 100000 no_inspect -w2
    Finished release [optimized + debuginfo] target(s) in 0.04s
    Running `target/release/examples/hello 10000000 50000000 100000 no_inspect -w2`
round 0 finished after 6.486550581s (loading)
round 100000 finished after 89.096615ms
round 200000 finished after 79.469464ms
round 300000 finished after 72.568018ms
round 400000 finished after 93.456272ms
round 500000 finished after 73.954886ms
...

These times have now dropped quite a bit from where we started; we now absorb over one million rounds of updates per second, and produce correct (not just consistent) answers even while distributed across multiple workers.

A second example: k-core computation

The k-core of a graph is the largest subset of its edges so that all vertices with any incident edges have degree at least k. One way to find the k-core is to repeatedly delete all edges incident on vertices with degree less than k. Those edges going away might lower the degrees of other vertices, so we need to iteratively throwing away edges on vertices with degree less than k until we stop. Maybe we throw away all the edges, maybe we stop with some left over.

Here is a direct implementation, in which we repeatedly take determine the set of active nodes (those with at least k edges point to or from them), and restrict the set edges to those with both src and dst present in active.

let k = 5;

// iteratively thin edges.
edges.iterate(|inner| {

    // determine the active vertices        /-- this is a lie --\
    let active = inner.flat_map(|(src,dst)| [src,dst].into_iter())
                      .map(|node| (node, ()))
                      .group(|_node, s, t| if s[0].1 > k { t.push(((), 1)); })
                      .map(|(node,_)| node);

    // keep edges between active vertices
    edges.enter(&inner.scope())
         .semijoin(active)
         .map(|(src,dst)| (dst,src))
         .semijoin(active)
         .map(|(dst,src)| (src,dst))
});

To be totally clear, the syntax with into_iter() doesn't work, because Rust, and instead there is a more horrible syntax needed to get a non-heap allocated iterator over two elements. But, it works, and

Running `target/release/examples/degrees 10000000 50000000 1 5 kcore1`
Loading finished after 72204416910

Well that is a thing. Who knows if 72 seconds is any good? (ed: it is worse than the numbers in the previous version of this readme).

The amazing thing, though is what happens next:

worker 0, round 1 finished after Duration { secs: 0, nanos: 567171 }
worker 0, round 2 finished after Duration { secs: 0, nanos: 449687 }
worker 0, round 3 finished after Duration { secs: 0, nanos: 467143 }
worker 0, round 4 finished after Duration { secs: 0, nanos: 480019 }
worker 0, round 5 finished after Duration { secs: 0, nanos: 404831 }

We are taking about half a millisecond to update the k-core computation. Each edge addition and deletion could cause other edges to drop out of or more confusingly return to the k-core, and differential dataflow is correctly updating all of that for you. And it is doing it in sub-millisecond timescales.

If we crank the batching up by one thousand, we improve the throughput a fair bit:

Running `target/release/examples/degrees 10000000 50000000 1000 5 kcore1`
Loading finished after Duration { secs: 73, nanos: 507094824 }
worker 0, round 1000 finished after Duration { secs: 0, nanos: 55649900 }
worker 0, round 2000 finished after Duration { secs: 0, nanos: 51793416 }
worker 0, round 3000 finished after Duration { secs: 0, nanos: 57733231 }
worker 0, round 4000 finished after Duration { secs: 0, nanos: 50438934 }
worker 0, round 5000 finished after Duration { secs: 0, nanos: 55020469 }

Each batch is doing one thousand rounds of updates in just over 50 milliseconds, averaging out to about 50 microseconds for each update, and corresponding to roughly 20,000 distinct updates per second.

I think this is all great, both that it works at all and that it even seems to work pretty well.

Roadmap

The issue tracker has several open issues relating to current performance defects or missing features. If you are interested in contributing, that would be great! If you have other questions, don't hesitate to get in touch.

Acknowledgements

In addition to contributions to this repository, differential dataflow is based on work at the now defunct Microsoft Research lab in Silicon Valley, and continued at the Systems Group of ETH Zürich. Numerous collaborators at each institution (among others) have contributed both ideas and implementations.

differential-dataflow's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

differential-dataflow's Issues

Many batch descriptions have empty frontiers

At many points in the code the descriptions assigned to batches (indicating lower and upper frontiers for the times of updates) are simply (and incorrectly) set to &[]. This was because I was lazy and a fundamentally bad person. There should be none of these, except possibly for the very last batch as the collection is sealed (as there are no future times).

We could make the Description constructor test for non-emptiness and assert otherwise. Until this is cleared up, the descriptions cannot be relied on for neat and interesting optimizations.

Questions on memory profiling of differential dataflow

I would like to understand the memory usage of my differential-dataflow-based program, including the size of each individual collection, arrangement, history, etc., and how these sizes change over time (basically, whatever it takes to correlate the overall memory footprint of the program to specific nodes in the dataflow graph). Is there currently an API/example code I could use to accomplish this?

A related question: I understand that differential's memory footprint grows with the number of trace entries it stores, and that it now has the ability to compact the trace. When does is trace compaction triggered and is there a way to control it directly or indirectly?

Use of `isize` for count changes should be generic

Currently differential dataflow uses isize to represent changes to counts. This allows it to handle a broad range of changes, but it is limiting in a few ways and could fairly easily become generic.

  1. For many applications, an i32 is sufficient. In fact, for many applications an i8 is sufficient. Given the pervasive role of these differences, the size they take can matter. Several graph processing examples would prefer something like i32 for their edge collections, as each edge does not change its frequency so much. Even an i8 would be useful if it was stored separately from edge data (otherwise the (edge, diff) pair would remain the same size).

  2. Several applications can make use of more sophisticated counts. For example, to compute a sum or average instead of isize one could use (isize, isize) where one field is the count and the other field is the sum (or change to it). Updates with the same key should be able to compact using the natural += interpretation on pairs (or tuples, whatever) of numbers that can be added.

To be interpreted as a "diff", it seems (at first) that we just need a trait like so

pub trait Diff : Add<Self, Output=Self> + Neg<Output=Self> + Zero;

The Add and Neg constraints are there so that we can manipulate the updates, and Zero is there so that we can tell when it is safe to drop the update on the floor. Zero is unstable (Zeroable would be great to have, for the Option optimization, but NonZero is unstable too). Perhaps we could use Default in the interim, or we could add an is_zero method to the trait, as the implementations will be mostly trivial.

The join implementation currently multiplies differences, and I'm not exactly certain what the intended semantics are here. If you join two collections without first calling count you get the product of the collections with the counts multiplied; should you expect something different if you join two collections without first calling sum?

Perhaps a related question is "would these weird counts be useful anywhere other than leading in to an aggregation like sum?" I can't think of anything at the moment, but that may not be a great reason to hide them.

Edit: moving away from isize does raise the question of under- and over-flow. If we use i8s for diffs and an operator needs more, what do we do? One natural choice is to produce as many diffs as are necessary to accumulate up to the change, but this does mean that various operators may need to become aware that there may be multiple entries for a given (key, val, time, _) pattern.

Edit: Working hypothesis: the trait we want should probably be called Ring, following https://en.wikipedia.org/wiki/Ring_(mathematics).

Use descriptions for compaction

Batches come with "descriptions", which contain lower and upper bounds on the timestamps of updates contained within. Specifically, the times within are those greater or equal to some element of lower and not greater or equal to any element of upper.

It seems like we should be able to use these bounds to inform advance compaction, in which we consider advancing all times in a batch by a provided frontier. The frontier's interaction with lower and upper seems like it constrains what happens to the times in the batch, and it could tell us about whether we should expect the frontier to collapse all times down, or collapse no times at all, or what.

This information could let us advance more aggressively when appropriate, and to put off compaction when we can see that the involved batch is unlikely to be much improved.

Implement a JoinTotal trait and operators

Issue #74 calls for specialized implementations for totally ordered timestamps. The join operator variants may be good candidates, as they are pervasive even with totally ordered timestamps, and the core join logic is somewhat complicated by partial orders.

This may be a relatively minor copy / paste job, but this interpretation may be optimistic. I believe the main point at which the JoinCore trait must be careful due to partial orders is in the "complicated" case for the JoinThinker type: when our keys have "long" histories on both inputs we dare not risk a Cartesian product, and instead play back the histories in time order. As with GroupCore, this tracks the meet of times and compacts updates using advance_by with this meet; this should all be unnecessary with totally ordered times, and we can instead just update the collection of values in-place.

It would be excellent if we only specialized the implementation of the JoinThinker type, as this may be the only logic that needs to change. This might mean some horrible mess where JoinCore becomes parametric in the type of the thinker, in which case we should not do this, but should specialization ever land we could imagine expecting Rust to pick up the specialized implementation, perhaps?

Managing differential-dataflow dependencies

I am looking for a way to manage the dependency on differential-dataflow in my project's Cargo.toml so that the project does not break as the differential-dataflow master changes.

I do not want to use the crates.io version of differential, as it is somewhat outdated (and, given that the project is still under active development, will probably remain so for the foreseeable future).

An alternative would be to have a dependency on a specific differential revision on github. I could then advance this revision periodically, after testing my crate against the new revision. Problem is, differential dataflow's Cargo.toml itself has dependencies on the master branch of timely and abomonation, meaning that older revisions of differential may not work with the latest timely master.

One solution would be to change differential's Cargo.toml to also depend on specific revisions (or tags) of all crates from github. I realize this will introduce extra manual work for Frank to update Cargo.toml whenever dependencies change. Is there a better way?

What is the right way to read out a collection?

After a long dataflow computation, I end up with a collection that contains some answers to some questions I have. (Perhaps, using database terminology, the collection is the output of a query that I'm interested in.)

At some point, I want to pass the contents of this collection to some non-differential code (imagine, outputting the collection as the response to a REST query).

I don't see any guidance in the documentation on how to do this, but the two following ideas come to mind:

  1. Use inspect directly on the collection, and build something entirely outside of the dataflow which is responsible for taking those diffs and maintaining a Vec or similar. However, this seems fairly heavyweight.

  2. Use differential dataflow to create a new collection which just groups the collection in question into one giant Vec. Then inspects on that would just give the direct answer. However, this feels like it might be inefficient?

I'd love a pointer from you on what the best (lightweight, efficient) way is to do this operation! I hope I didn't miss anything in the documentation! Thanks!

Implement indices for Traces

The Spine trace implementation is a list of batches, each of which is a key-ordered collection of records. Different batch implementations make it differently easy to leap to the data for a key; the hash implementation lets you use hash values, the ord implementation lets you use binary search. In all cases, you need to independently look through each of the batches.

This design is handy if you would like to clone the list of batches and have an owned cursor through the collected batches.

At the same time, in many situations it is acceptable to borrow the trace reference, do a bounded amount of work, and then release the borrow. In these situations, we could implement and use a single index for each trace, with information for each key of the specific offsets it can be found (or not found) in each of the batches. This dramatically simplifies the cost of key lookups, reducing the amount of hashing and the random probing in memory for sparse updates. Ideally dense updates (many keys) would still result in an in-order traversal of memory, which might not be the case if we just use a HashMap as the index.

This is also where in the previous implementation of differential dataflow we slotted in a specialized index for small integer keys, based on vectors. This made each key lookup especially simple, and the index especially compact.

Batches should implement `advance_mut`

The advance_mut method exists to allow batches to update their times in place when they are the exclusive owners of their data. This is currently always the case just before we advance the times in batches, and so implementations should always be able to update in place.

High-resolution updates `group` perf harms `degrees.rs` example.

The degrees.rs example computes and maintains the distribution of degrees in a continually changing random graph. It looks roughly like so:

let degrs = edges.map(|(src, _)| src)
                 .count();

let distr = degrs.map(|(_, cnt)| cnt)
    		 .count();

The current group implementation has quadratic behavior with respect to the number of distinct times per key, within each batch. With sufficiently large graphs, random changes are unlikely to touch many nodes within the same batch, but they are quite likely to touch the same counts, in the second count operator.

Here are some timings with the current code, for batch size one:

Echidnatron% cargo run --release --example degrees -- 10000 50000 1
worker 0, round 1 finished after Duration { secs: 0, nanos: 71173 }
worker 0, round 2 finished after Duration { secs: 0, nanos: 73827 }
worker 0, round 3 finished after Duration { secs: 0, nanos: 57993 }
...

We would hope that with batching the throughput would improve, and it does for a bit. Here are batch sizes ten,

Echidnatron% cargo run --release --example degrees -- 10000 50000 10
worker 0, round 10 finished after Duration { secs: 0, nanos: 336847 }
worker 0, round 20 finished after Duration { secs: 0, nanos: 243068 }
worker 0, round 30 finished after Duration { secs: 0, nanos: 238114 }
...

one hundred,

Echidnatron% cargo run --release --example degrees -- 10000 50000 100
worker 0, round 100 finished after Duration { secs: 0, nanos: 1528624 }
worker 0, round 200 finished after Duration { secs: 0, nanos: 2137224 }
worker 0, round 300 finished after Duration { secs: 0, nanos: 2024237 }
...

and one thousand,

Echidnatron% cargo run --release --example degrees -- 10000 50000 1000
worker 0, round 1000 finished after Duration { secs: 0, nanos: 90445490 }
worker 0, round 2000 finished after Duration { secs: 0, nanos: 127967149 }
worker 0, round 3000 finished after Duration { secs: 0, nanos: 128467679 }

The average time to process each update in the first batch, for example, goes from 71us to 33us to 15us to 90us as we increase the batch size. That was looking pretty good, until we hit batch size one thousand. The quadratic horror is showing up here, and is probably also tainting the batch size one hundred execution as well.

This issue is non-trivial to fix, and involves engineering deep in groups bowels, but it should be done. This issue will stick around until there is a responsible fix.

Join has bugz!

The way join works, it accepts batches on each of its inputs and tracks which frontiers it has accepted through, so that it knows which subset of each input trace it should join new batches on the other input against. This makes lots of sense.

Unfortunately, the clever compaction mechanisms happily advance times behind the scenes. Even though we indicate "please do not advance beyond this frontier", the cleverness of the compaction mechanisms realize that they can advance to the frontier while still preserving the le relation with all elements in the future of the frontier. Elements at the frontier are not counted as being accepted, and so we miss some elements. The cleverness is due to using meets and joins, and doesn't have a lt interpretation, as far as I understand it.

I think the way to deal with this is to have the trace compaction mechanism, which currently takes a frontier that it uses for time manipulation, also block the merging of batches beyond that frontier. This should allow us to look at the batch descriptions when we open a cursor, and subset out those batches before the frontier. This subsetting has other applications in group for example, where we may like to subset out the "old" updates, i.e. those just before whichever batch we are about to accept.

In the case of join this would allow us to advance and compact times in a way that may bring them up to the frontier, but the batch's description would still reveal that it corresponds to a range of times less than the frontier. As long as there is a clean line to draw between batches, i.e. we want to subset using a frontier that is an upper and lower bound, we should be good. We should also be "good" if all batches are either (i) known before the frontier, or (ii) not compacted. In this case we can keep all of the first type and filter the second by time.

Implement a GroupTotal trait and operators

Issue #74 calls for specialized implementations for totally ordered timestamps. The group and group_u operators are currently very complex, in part due to the complexity of partially ordered times. This is a non-trivial implementation (much more complex than CountTotal or DistinctTotal), but there are at least a few important simplifications we can exploit:

  1. The times at which the group variants must be re-evaluated are exactly those found in the input differences. This is unlike for partially ordered times, which have a more complicated logic to determine further synthetic times that may produce output differences.

  2. The running accumulations for each key need not (i) track the meet of subsequent times (it would be just the min, equal to the next time in sorted order), nor (ii) maintain an intermediate collection of updates that have been advanced up to this meet (it can instead maintain a simpler running accumulation of the collection in question).

There are still a large number of subtleties, and the current group.rs file may not be the best starting point due to its complexity (both intrinsic complexity, and questionable structure). I recommend if anyone wants to try and work on this issue we should get in touch and map out the right way to spec out how it should work.

Specialize implementation for totally ordered times

Several computations, most computations I would say, actually use totally ordered timestamps. There are much simpler implementations of join and group in these cases, and it would be great to have better performance when we can.

I think this involves identifying something like a TotallyOrderedCollection type which we know has a totally ordered timestamp. Operator like iterate turn such collections into normal Collections, but come back out as whichever type they started as.

Alternately, if we could identify or exploit total orderedness at runtime, we do see many updates where the times we need to process are actually totally ordered. In iterative computations, for example, we often end up doing each iteration at a time; even with many rounds of input each one of these iterations form a totally ordered set of times (fixed iteration, ordered by round).

Docs timely version does not work with the examples

Since we had to complain when doc examples were not working.

[dependencies]
timely = "0.5"
differential-dataflow = "0.5"

If you use these dependencies you get no function or associated item named new found for type differential_dataflow::input::InputSession<_, _, _> in the current scope

If I upgrade the dependencies according to what I see in the repository, I get other errors.

The problem of implementation examples

Hi, I am a new of Rust. Now I am trying to implement the "arrange.rs" which is the first example in the examples of differential dataflow. when I try to run cargo build,I think this step is just to download and update the dependencies in the Cargo.toml. But it has this problem, "the function takes 1 parameter but 0 parameters were supplied"

134----ErrVirtualAlloc(i32),--defined here

324-- 0=>Err(ErrVirtualAlloc()),---expected 1 parameter

I don't know how to solve this problem. Could you give me some advice?
default

Is this the most efficient way to do a left join?

I want to left-join fairly regularly in my data flow, and have written this helper:

fn left_join<G: Scope, K: Data, V: Data, R: Diff, V2: Data>(a: &Collection<G, (K, V), R>, b: &Collection<G, (K, V2), R>) -> Collection<G, (V,Option<V2>), <R as Mul<R>>::Output>
where R: Mul<R, Output = R>, K: Data + Hashable, G::Timestamp: Lattice + Ord {
    let with = a.join_map(b, |_, v, v2| (v.clone(), Some(v2.clone())));
    let without = a.antijoin(&b.map(|(k, _)| k)).map(|(_, v)| (v, None));
    with.concat(&without)
}

FWIW, I often combine this with a

   .group(move |_key, input, output| {
        let out = input.into_iter().filter_map(|&(opt, _)| opt.as_ref()).cloned().collect::<Vec<_>>();
        output.push((out, 1));
    })

which gives me a Collection<G, (V, Vec<V2>), R>.

Is this reasonably efficient? (Also curious if we could get something similar in the standard library?)

The examples crash in debug mode with multiple workers

Both bfs and reachability examples crash when run with multiple workers and a number of round high enough.

Expected Behavior

Both examples should run without crashing.

Actual Behavior

The examples crash after a variable number of rounds with the error:

thread 'worker thread 1' panicked at 'Progress error; internal "Subgraph": ((Root, 86), 1)', /home/user/.cargo/registry/src/github.com-1ecc6299db9ec823/timely-0.3.0/src/progress/nested/subgraph.rs:979:28

Steps to Reproduce the Problem

Run the example with:
cargo run --example bfs -- 2 2 1 500 false -w 4
or
cargo run --example reachability -- 2 2 1 500 -w 4

Small batches merge awkwardly

Because of timely dataflow's restrictions on single capabilities per message, we often end up with several very small batches and one large batch. This can lead to several merges between small and large batches, which is a fairly inefficient way to do things.

Relatively little is known about this, except that we spend a fair bit of time in merging, mostly allocating and dropping random bits of memory. If this is because of lots of skewed merges, we could look into scheduling merges in a different order that works to merge small batches together.

Batch cursors should have a lifetime

The cursors exposed by batches provide references to the underlying keys and values, but they are only valid for the borrow of the cursor associated with the call. This means that users of these cursors cannot extract a collection of references and work on them, even though in all current cursors this would be fine (manipulating the cursor does not mutate the underlying data).

I believe the right way to do this is to extend the Cursor trait to have a lifetime 'batch, corresponding to the lifetime of the underlying batch:

pub trait Cursor<'batch, K, V, T, R> {
    // ...

Ideally this lifetime would get captured when we construct the cursor using BatchReader::cursor(), like so:

pub trait BatchReader<K, V, T, R> {

	/// The type used to enumerate the batch's contents.
	type Cursor: Cursor<'batch, K, V, T, R>;
	/// Acquires a cursor to the batch's contents.
	fn cursor(&'batch self) -> Self::Cursor;

but this doesn't work, because the 'batch lifetime is part of the Self::Cursor type, and for that to work I think we need to have the 'batch lifetime as part of the BatchReader trait. And, one expects, this lifetime taints just about everything upward. Eventually we are able to write X: for<'batch> Trait<'batch, ...>, but I haven't figured out where this happens.

Perhaps just in TraceReader; it could be that it is as simple as

pub trait TraceReader<Key, Val, Time, R> {

	/// The type of an immutable collection of updates.
	type Batch: for<'batch> BatchReader<'batch, Key, Val, Time, R>+Clone+'static;

but I think these constraints would still need to be re-iterated at each point that an implementor of TraceReader is bound. Because they are requirements Rust imposes but not uses, for now.

Anyhow, doing all of this, or something similar, would result in the ability for the cursor to return references with lifetime 'batch which would allow a consumer to stash and work with multiple references, rather than cloning everything.

Specialize Spine for default times

The current Spine implementation tracks a list of batches, each of which can hold arbitrary updates. At the same time, we often see the bulk of updates are at times in the past which are equivalent to <T as Lattice>::min(), the smallest possible time. These "updates" are essentially describing the current dataset.

The set of updates with times equivalent to the smallest possible time have a few important properties:

  1. They all have the effectively the same time. We can remove the (large) time field from the updates, economizing on storage.

  2. They accumulate up to positive counts, when the ring is the integers. In this case, we can use the multiplicity of the record to indicate the diff field.

These two optimizations, the first making lots of sense and the second being useful for some data, bring the in-memory size for graph collections down to pretty close to the adjacency list representation.

To implement such a specialized spine, we need to extract the appropriate updates from the general batches, as we cannot accommodate more general updates. This means we need to write some specialized merging, and take some care when we do it.

A `top_k` operator can be implemented more efficiently

We see a lot of top_k operators in computations. Things like cc and bfs use them for graph computation, and several of the TPCH queries have similar "top blah" queries (e.g. Query 15).

It is not hard to implement top_k with group, but there are much more efficient implementations you could imagine. Or, there are optimizations to group that make sense for top_k and perhaps not as much sense for more general group computation. For example:

  1. For each key, group reads out all of the value histories from the input and output traces. We could do this lazily, as each evaluation of the "user logic" only needs to pull record until they observe k-ish values.

  2. By watching which values are observed, we can restrict the future times that need to be warned about. If we only observe the histories of the first three values before finding enough values, we needn't warn about times not found in those first three values. Times in all the other values (e.g. often later iterations, in computations like cc,bfs), can potentially be ignored.

  3. When attending to new changes, we have the potential to swap in the output trace for values before the smallest value in the new changes. This needs to be double checked for correctness, but it seems that there is the ability to perform this substitutions safely in several cases.

  4. In the same setting as above, we can advance the substituted values from the output trace up to the smallest value in the new changes, as the specific values of these values are not as important as the fact that they are less or equal to the new changes and not changing. This has the potential to reduce the number of moments of variation, as any changes among the smallest values collapse down to just the moment where the size of the set changes.

  5. Having swapped in the output trace, advanced values, and advanced times, we can evaluate an interval by starting from the times in the input trace and evaluating the top_k logic at each of these, warning about the times that might be interesting in subsequent values, but only those values we need to explore to fill out the output. This restricts our attention to the moments where the count of elements in output changes, and only to the changes for values that would emerge. All of the updates to values greater than these, those that never appear in the output, are ignored.

All of this makes for a pretty sweet implementation, that should make top_k pretty effective. Right now TPCH's Query 15 is a real pain point for e.g. DBToaster, and this sort of implementation seems like it would clear up all sorts of issues, plus make bfs lots faster.

Implement `JoinGe` / `JoinLe`

Several TPCH queries (e.g. q11, q17, q20, q22) have inequality joins, where (key, value) pairs are joined against (key, threshold), and should result in (key, value) for each value greater than the corresponding threshold.

Using differential dataflow's JoinCore, this might look like

let values: Collection<_, (K, V)> = ...;
let threshes: Collection<_, (K, V)> = ...;

values.join_core(&threshes, |k,v,t| if v > t { Some((k,v)) } else { None });

This implementation can have pretty miserable performance: each time a threshold changes we reconsider all values. Even if the threshold changed only slightly.

However, we don't need to have miserable performance. The Cursor types backing the values collection have efficient (logarithmic) seek_val functionality, and given a threshold change (note: same time and negated diff):

(thresh_old, time, diff)
(thresh_new, time, -diff)

We can load only the subrange of values between thresh_min and thresh_max, whichever they happen to be.

I propose we consider a join variant implementing

values.join_core(&threshes, |k,v,t| if v > t { Some((k,v)) } else { None });

with the above observations as a performance optimization. There are probably further optimizations, but just doing this should be relatively easy and would dramatically improve some cases.

Query2 !

Hey ! Still obsessing on query2 :)

            let mut input = root.scoped::<u64, _, _>(move |builder| {
                let (input, stream) = builder.new_input::<((K, _), i32)>();
                let collections = Collection::new(stream);

                let group:Collection<_, (K,SaneF32)> = collections.group(|_,vs,o| {
                    let v:f32 = vs.map(|(sane,weight):(&SaneF32,i32)| sane.0*weight as f32).sum();
                    o.push( (SaneF32(v), 1) );
                });
                let count:Collection<_, (bool, u32)> = group.map(|(_,_):(K,SaneF32)| (true, 1)).consolidate();
                let count:Collection<_, (bool, i32)> = count.group(|_,vs,o| {
                    let c:i32 = vs.map(|(c,weight):(&u32,i32)| *c as i32 * weight).sum();
                    o.push((c, 1));
                });
                count.inspect(move |rec| result.store((rec.0).1 as usize, Relaxed));

            input
            });

            for (_i,chunk) in bibi.enumerate() {
                for item in chunk {
                    input.send(((item.0, SaneF32(item.1)), 1));
                }
            //    input.advance_to(i as u64 + 1);
            //    root.step();
            }

This is what I tried so far (with variation on the way to push the input as suggested byt the commented bit). Happy to say it is produces the correct result.

The problem I have is, it makes a gigantic memory consumption, compared to the timely impl, so it swaps and is quite slow.

diff-d:

tdd

timely-d:

timely

This is for query 2A (X=8). Timely and the hand made impl made it under 500MB, differential activates 25GB...

Is it something that is expected ? Can i mitigate it somehow ?

Operations on sorted collections

If an input collection has a total ordering defined over its elements, what would be the best way of doing things like, for example, given two such collections producing for each element in the second collection a pair of itself and the closest element in the first collection (a consequence of this is that elements of the first collection may appear in multiple pairs in the output collection). Is that even remotely near what this library is designed for? What about being able to perform "scan"-like operations over ranges in the input data?

iterate doesn't converge on docs example

The iterate operator doesn't seem to terminate for many supposedly converging functions. Instead, it endlessly removes and re-adds the elements yielded by the computation.

Example (taken from the documentation of iterate):

extern crate timely;
extern crate differential_dataflow;

use differential_dataflow::AsCollection;
use differential_dataflow::operators::*;
use timely::dataflow::operators::*;

fn main() {
    timely::example(|scope| {
        let numbers = (0..5).map(|x| (x, 1)).to_stream(scope).as_collection();

        numbers.iterate(|values| {
            values
                .map(|x| if x % 2 == 0 { x/2 } else { x })
                .inspect_batch(|ts ,x| println!("{:?}: {:?}", ts, x))
        });
    });
}

Output:

((Root, 0), 0): [(0, 1), (1, 1), (1, 1), (3, 1), (2, 1)]
((Root, 0), 1): [(0, -1), (1, -1), (1, -1), (3, -1), (2, -1), (0, 1), (1, 1), (1, 1), (3, 1), (1, 1)]
((Root, 0), 2): [(0, -1), (1, -1), (1, -1), (3, -1), (1, -1), (0, 1), (1, 1), (1, 1), (3, 1), (1, 1)]
((Root, 0), 3): [(0, -1), (1, -1), (1, -1), (3, -1), (1, -1), (0, 1), (1, 1), (1, 1), (3, 1), (1, 1)]
((Root, 0), 4): [(0, -1), (1, -1), (1, -1), (3, -1), (1, -1), (0, 1), (1, 1), (1, 1), (3, 1), (1, 1)]
…

It goes on like this forever, while I would expect it to stop producing updates at some point.

If one uses a compacting operator like consolidate, group or distinct in the loop body, then the negating deltas eventually disappear in the materialized form and the underlying stream ceases to produce updates.

I'm not sure if this is an actual bug or just a case of "you're holding it wrong" :) If it is the users' responsibility to compact, then we should probably update the documentation to reflect this.

Error messages can be confusing

I have this smallish example:

extern crate differential_dataflow;
extern crate timely;
type Timestamp = u64;
fn main() {
    use differential_dataflow::input::Input;
    use differential_dataflow::operators::group::Group;
    timely::execute(timely::Configuration::Thread, move |worker| {
        let (mut handle, probe) = worker.dataflow::<Timestamp, _, _>(|scope| {
            let (handle, data) = scope.new_collection();
            let probe = data
                .group(|_group, log_ids, out| {
                    let log_id: u64 = log_ids.iter().map(|&(&ts, &diff)| ts).max().unwrap();
                    out.push((log_id, 1)) 
                })
                .inspect(|&((group, log_id), _time, _diff)| {
                    println!("{} {}", group, log_id)
                })
                .probe();
            (handle, probe)
        });

        let (group, log_id) = (4, 1004);
        handle.advance_to(0);
        handle.insert((group, log_id));
        handle.close();
        worker.step_while(|| !probe.done());
    }).unwrap();
}

But when trying to compile it gives me 6 errors of two varieties:

error[E0277]: the trait bound `&_: differential_dataflow::Diff` is not satisfied
   --> src/timely.rs:131:40
    |
131 |             let (handle, data) = scope.new_collection();
    |                                        ^^^^^^^^^^^^^^ the trait `differential_dataflow::Diff` is not implemented for `&_`
    |
    = help: the following implementations were found:
              <i32 as differential_dataflow::Diff>
              <isize as differential_dataflow::Diff>
              <i64 as differential_dataflow::Diff>
              <differential_dataflow::difference::DiffPair<R1, R2> as differential_dataflow::Diff>

and

error[E0599]: no method named `insert` found for type `differential_dataflow::input::InputSession<u64, (_, u64), &_>` in the current scope
   --> src/timely.rs:146:16
    |
146 |         handle.insert((group, log_id));
    |                ^^^^^^

My standard debugging mechanism in cases like this is to add type annotations and see when there's a contradiction. But when you add the annotations to the closure params, like .group(|_group, log_ids: &[(&(_, &isize))], out: &mut Vec<_>| {, you get a different cryptic error message (maybe rust-lang/rust#41078?):

error[E0631]: type mismatch in closure arguments
   --> src/timely.rs:133:18
    |
133 |                 .group(|_group, log_ids: &[(&(_, &isize))], out: &mut Vec<_>| {
    |                  ^^^^^ ------------------------------------------------------ found signature of `for<'r, 's, 't0, 't1> fn(_, &'r [&'s (&u64, &'t0 isize)], &'t1 mut std::vec::Vec<(u64, {integer})>) -> _`
    |                  |
    |                  expected signature of `for<'r, 's, 't0, 't1> fn(&'r _, &'s [(&'t0 _, _)], &'t1 mut std::vec::Vec<(_, _)>) -> _`

In this case, the actual problem is that the &diff should be diff - both of the failed compiles contain hints if you look hard enough, but it's not easy to figure out. The correct place to add type annotations to help out inference is actually to pin down the diff type, i.e. scope.new_collection::<_, isize>().

I don't know if there's anything actionable here, but I thought I'd raise the issue. One thing I'd observe is that many of the examples and documentation show type annotations on worker.dataflow to pin down the timestamp type - I suppose one possible solution is to just make it so that the 'idiomatic' examples demonstrate a similar thing for the diff type.

Relicense under dual MIT/Apache-2.0

This issue was automatically generated. Feel free to close without ceremony if
you do not agree with re-licensing or if it is not possible for other reasons.
Respond to @cmr with any questions or concerns, or pop over to
#rust-offtopic on IRC to discuss.

You're receiving this because someone (perhaps the project maintainer)
published a crates.io package with the license as "MIT" xor "Apache-2.0" and
the repository field pointing here.

TL;DR the Rust ecosystem is largely Apache-2.0. Being available under that
license is good for interoperation. The MIT license as an add-on can be nice
for GPLv2 projects to use your code.

Why?

The MIT license requires reproducing countless copies of the same copyright
header with different names in the copyright field, for every MIT library in
use. The Apache license does not have this drawback. However, this is not the
primary motivation for me creating these issues. The Apache license also has
protections from patent trolls and an explicit contribution licensing clause.
However, the Apache license is incompatible with GPLv2. This is why Rust is
dual-licensed as MIT/Apache (the "primary" license being Apache, MIT only for
GPLv2 compat), and doing so would be wise for this project. This also makes
this crate suitable for inclusion and unrestricted sharing in the Rust
standard distribution and other projects using dual MIT/Apache, such as my
personal ulterior motive, the Robigalia project.

Some ask, "Does this really apply to binary redistributions? Does MIT really
require reproducing the whole thing?" I'm not a lawyer, and I can't give legal
advice, but some Google Android apps include open source attributions using
this interpretation. Others also agree with
it
.
But, again, the copyright notice redistribution is not the primary motivation
for the dual-licensing. It's stronger protections to licensees and better
interoperation with the wider Rust ecosystem.

How?

To do this, get explicit approval from each contributor of copyrightable work
(as not all contributions qualify for copyright, due to not being a "creative
work", e.g. a typo fix) and then add the following to your README:

## License

Licensed under either of

 * Apache License, Version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or http://www.apache.org/licenses/LICENSE-2.0)
 * MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT)

at your option.

### Contribution

Unless you explicitly state otherwise, any contribution intentionally submitted
for inclusion in the work by you, as defined in the Apache-2.0 license, shall be dual licensed as above, without any
additional terms or conditions.

and in your license headers, if you have them, use the following boilerplate
(based on that used in Rust):

// Copyright 2016 differential-dataflow Developers
//
// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
// http://opensource.org/licenses/MIT>, at your option. This file may not be
// copied, modified, or distributed except according to those terms.

It's commonly asked whether license headers are required. I'm not comfortable
making an official recommendation either way, but the Apache license
recommends it in their appendix on how to use the license.

Be sure to add the relevant LICENSE-{MIT,APACHE} files. You can copy these
from the Rust repo for a plain-text
version.

And don't forget to update the license metadata in your Cargo.toml to:

license = "MIT OR Apache-2.0"

I'll be going through projects which agree to be relicensed and have approval
by the necessary contributors and doing this changes, so feel free to leave
the heavy lifting to me!

Contributor checkoff

To agree to relicensing, comment with :

I license past and future contributions under the dual MIT/Apache-2.0 license, allowing licensees to chose either at their option.

Or, if you're a contributor, you can check the box in this repo next to your
name. My scripts will pick this exact phrase up and check your checkbox, but
I'll come through and manually review this issue later as well.

Consider LSM for `operators::ValueHistory`

The ValueHistory type is used to wrap the (V, T, R) triples describing the values associated with a key, and their history. Its intent is to provide efficient replay of the history, from which it can produce the corresponding value collection at any point along the way.

At the moment, this works by ordering edits by time and incorporating edits as we move forward in time. As time is not totally ordered, "incorporated" edits hang out, and require rescanning and reaccumulation to determine the actual state of the collection at any time.

It seems not unreasonable to think about incorporating these edits in a LSM (log structured merge) fashion, where we maintain multiple sorted lists of (V, T, R) triples that are "in effect", which we merge live as the user navigates around the values.

This has some advantages and some disadvantages, compared to the current approach which is to re-sort every time a batch of triples come "into effect".

  1. The main disadvantage is complexity: for small histories, just sorting the updates is pretty easy, especially if the consumer wants access to the whole collection of values anyhow.

  2. Some amount of time is spent reforming the current input collection, currently by collecting input differences less or equal to the current time, then sorting and coalescing updates. Much of this work could become redundant with a LSM structure that keeps the data sorted by value. We would need to scan the histories for each value, to accumulate the diffs, but the values would already be in-order and the accumulation doesn't require reorganizing any data.

  3. Consider even more interesting computations, where the consumer may only want a subset of the values, either a prefix (for "top k" computations) or random access to data-driven locations (for intersection, e.g. in GenericJoin). In these cases, what would have been linear work for each update becomes amortized logarithmic, and if the query only required some few elements, this could be a substantial reduction.

  4. Going further, it seems plausible that we could load data from the underlying collection traces on demand, only loading the history for a value when we actually need it (having ValueHistory act as a cache in front of the collection trace). This would reduce the fixed cost of ingesting the data for a key, and extend the benefits to cases where there may just be a few updates to apply to a large collection.

Almost all of the data are already organized as (V, T, R) triples sorted by V, even in the underlying collection trace, so it seems reasonable that we might be able to wrap this representation, and do relatively little work when pulling in triples from a collection trace. The only "reorganization" that I see is that we want to be able to present the times at which group may need to re-evaluate its logic, which requires the times of updates sorted by their total order (but which does not require reorganizing the data).

One concern is that we may not have good benchmarks for evaluating this. Computations like bfs perform group operations on relatively small groups, so we may need to investigate more skewed degree distributions to provoke the scary quadratic behavior. Alternately, queries like TPCH Q15 perform a maximum, currently implemented hierarchically, but if done with a flat implementation would immediately suffer from quadratic behavior. Q15 has the problem that we won't see any benefit until we do the more exotic "lazy" access to data.

Spec out Trace Adapters?

Traces represent a pile of updates (key, val, time, diff), indexed by key then val then time. There are several transformations on collections that we could also apply to these indexed representations. The most appealing example I can think of is enter, which brings a collection into a loop, changing its timestamp. We could imagine defining a graph in the outermost streaming scope, and then import the single indexed representation into multiple iterative contexts, sharing the state between all of them.

This requires a bit of thinking to understand how easily we can implement the Trace trait, and all of its dependent traits, for something as simple as "put a zero at the end of each timestamp". There are possibly some issues about types; for example, it is harder to return references to timestamp if none of those timestamps actually exist in the trace (the trace cursors will probably have to stage the timestamps in local variables, perhaps).

Running on snap datasets

I am running the command:
cargo run --release --example cc -- ~/Projects/Datasets/wiki-Vote.txt, wiki-Vote.txt being a snap dataset. However, I get the following error:
Finished release [optimized + debuginfo] target(s) in 0.0 secs Running target/release/examples/cc /home/fre14/Individual-Project/Graph-Database-Ext/datasets/wiki-Vote.txt thread 'worker thread 0' panicked at 'error opening file', libcore/option.rs:917:5 note: Run with RUST_BACKTRACE=1 for a backtrace. thread 'main' panicked at 'called Result::unwrap() on an Err value: Any', libcore/result.rs:945
What am I doing wrong? Is there a command I am supposed to run beforehand?

Thank you

The `group` operator is insane

The group operator does several clever things, each of which are meant to make sense, but collectively it does some things that are hard to rationalize.

Specifically, I have recently seen (but can no longer reproduce, since #27) panics in group where it determines an interesting time t for which it does not hold a capability, and panics when it tries to warn itself about the time (as it cannot acquire a capability). In each round group downgrades its capabilities to the frontier of times it has been warned about, and collects new capabilities from its inputs. For the panic to happen, it would seem to mean that the new warning was in potentially dead space, where the operator has previously committed to no output.

This isn't fundamentally wrong. The warnings are conservative, and would could take the fact that the warning is not in the future of a capability as evidence that we could not possibly produce data at its time. That may be true, but it makes it much harder to sanely understand the properties of the group operator.

I would have much higher confidence about the implementation of group if there were more invariants, and corresponding debug_assert! statements, sprinkled throughout the code. Correspondingly, it would help if the structure of the implementation lent itself to having more invariants, which I think is possible (and has been its recent direction).

Add traits and operators for totally ordered timestamps

The CountTotal trait in operators/count.rs is a substantial simplification of the logic for a counting operator when the timestamps are totally ordered. This both leads to performance improvement, but also clarity improvements: the implementation is plausibly readable.

We should add these implementations for other operators, and perhaps think about whether we can provide these operators using Rust's specialization feature (incoming?), or other type-based methods rather than asking the user to know about and invoke them (a fine stopgap, but it requires others writing generic code to provide two variants).

This is a tracking issue for several candidate implementations, as will stick around until we come up with a sane way of helping the user find the methods through types (perhaps specialization, perhaps an OrderedCollection type, not clear).

  • Implement a DistinctTotal trait and operators. Issue #75.
  • Implement a GroupTotal trait and operators. Issue #76.
  • Implement a JoinTotal trait and operators. Issue #77.
  • Provide uniform methods dispatching on total or partial orderedness.

Remove `distinguish_since` capability

Currently any TraceAgent, the thing you hold on to to be able to re-use an arrangement, has two capabilities (with horrible names):

  1. advance_by: indicates the lower bound (frontier) on times that maybe be used as comparisons against the entries in the trace. As this advances, the trace is able to compact its own representation by determining that some times are equivalent, and their differences coalesced.

  2. distinguish_since: independently, each trace maintains boundaries between each of the batches it has created, and waits for trace agents to decline an interest in keeping them from merging. This is so that operators like join can get access to what may be historical traces, because they advance_by a trace based on the progress in the other input (weird, but true).

The times in the distinguish_since capability currently have no particular semantic use, and could as easily be replaced with the sequence number of the batch. Although one could imagine a general and rich "bookmarking" functionality, the current use is "for this batch, could I have the trace up to but not including it, please".

Rather than track two separate capabilities, which causes a fair amount of headache (especially when they head in different directions), I propose that we scrap the distinguish_since capability, and simply capture the state of the trace at the moment each batch is created (in the form of a list of batches). The associated streams of data would change from "batch" to "batch + history", and allow each implementation to hold on to the history as long as appropriate, without blocking other users of the trace.

One possible downside of this is that the distinguish_since capability was used as a way of blocking merging until a user had downgraded their advance_by capabilities, increasing the chance that when time came for a merge there would be the potential to collapse the updates as well. Now as soon as a batch is entered into the trace it can be merged, even if it would be best overall for that merging to be delayed just a moment. Perhaps we can build in a one-call delay to give each downstream operator a chance to perform the associated work.

This might also be a good time to rename the advance_by capability, which describes what the trace does to each of the times (they are "advanced by" the frontier), but isn't especially descriptive for the user of the trace. Awkwardly, it seems that "distinguish since" is a lot closer to what the capability represents (all times in advance of the frontier must be distinguishable from each other). I'd prefer not to simply swap that name in, in the interest of shaking out bugs where we forget to rename things, but it might be a good time to float some synonyms.

Proposal: Shared Arrangements

Differential dataflow manages its state through the arrange operator, which takes a disordered stream of timestamped updates, and produces a sequence of batches of organized, consolidated updates. These batches are also integrated into a T: Trace the arrange operator manages, and which is shared with downstream operators. The arrange operator handles common work like merging and advancing ("garbage collection") of these batches, so that each of the consumer operators do not have to. The bundle of stream and trace is called an Arrangement.

There is currently the structural restriction that to use an arrangement, you must be in the same timely dataflow scope, because the stream is branded with the scope and can only be used there. However, the batches and the trace are only branded with the timestamp, and could in principle be used in any other scope with the same timestamp.

This proposal starts to detail how we might enable the sharing of traces and batch sequences with dataflow graphs and scopes other than that in which the data are arranged.


The design and implementation of current trace and batch types make extensive use of Rc<_> reference counting shared ownership, so that the references can be shared within the worker (the boundary for an Rc<_> type is a thread of control). This is what allows us to share references to the trace among operators, to share references to batches between the trace and the channels out of the arrange operator, and various other sharing moments (e.g. cursors clone ref-counted pointers to simplify their lifetime story). Reference counted shared ownership is read-only, and associated resources are collected when the last of the referring owners gets dropped.

All of this means that there is nothing obviously hard about sharing this data between multiple dataflow scopes. These shared references to the data can be handed out widely, and we are not at risk of accidentally de-allocating or consolidating or otherwise transforming the data while others outside our dataflow scope are hoping we do not. This is different from providing meaningful semantic guarantees (which we will get to), but actually handing pointers around does not seem to be a limitation.

There are some considerations to make about what is guaranteed to other users, as we will update the logical contents of the trace (e.g. by compaction as time advances), but these guarantees are currently provided by the trace itself, with the underlying shared resources immutable and much easier to reason about.


The main handle to a trace is the appropriately named arrange::TraceHandle type. This type has shared ownership of [a wrapper around] a trace, and as long as it is held provides access to the trace. This is the way that operators interact with the trace.

If left alone, the trace handle provides access to the full history of the trace. It also provides the operators the opportunity to downgrade their access, by stating that certain historical times have become indistinguishable to the operator. When this happens, the trace wrapper (aware of the existence of multiple handles, with multiple frontiers of indistinguishable times) may eventually notify the trace that it can compact its state representation. The compaction updates the logical contents of the trace, but does not mutate the underlying resources (it consolidates updates at indistinguishable timestamps into new batches, which replace the existing ones).

Importantly, as the trace makes progress forward, it permanently forgets about the distinctions in the past. Other operators that may want to attach to this trace and stream of batches need to acknowledge that their view of the data, though containing all data ever introduced, can only be from the perspective of whatever times the trace has promised will remain distinguishable. Practically speaking, this means that when you attach to a collection you can see its contents "now", and watch their changes from this point on; you may not be able to get access to the changes the collection experienced five minutes ago.

The trace and trace handle also provide another capability, which is relatively new but seemingly important: the ability to bookmark batches and receive the trace restricted to only those updates through the bookmarked batch. This seems crucial for correct behavior of join, and implementation-wise is relatively simple: merging is blocked wherever these bookmarks exist. Any new references to the trace need to accept and understand the restriction on bookmarks just as with compaction: their opportunities are restricted by the state of the trace when they attach to it.


It is not hard to export a trace handle from the scope it was produced in; the example/arrange.rs example does exactly this. The trace handle can simply be cloned and returned from the scope, as it contains no references to the scope, only to the timestamp of the scope.

Can use a trace handle to populate a new operator in another dataflow scope. Or perhaps more appropriately: can we create a new Arrangement in another scope, a pair of trace handle and stream of immutable batches?

The trace handle part seems relatively easy: we have a trace handle, and it is safe to clone. The stream is more complicated, as we cannot exfiltrate the existing stream of updates from the dataflow scope in which they exist. Can we create a new operator which taps into the sequence of batches and reposts them in this other scope?

There is precedent for extracting a stream from one scope and replaying it in another dataflow scope: timely dataflow's capture and replay operators. These operators give us a clue about the moving parts we might require, but are not themselves sufficient (they capture and replay the entire history of the stream; unboundedly much data). The replay operator defines a new timely dataflow operator, and consults a shared sequence of observations made by capture to drive its behavior: both records produced and progress indicated.

I believe most of the complexity of shared arrangements between dataflow scopes lies in the design of this operator, so let's talk about it.


To write a timely dataflow operator using the generic tools, we need to do a few things. We might need to hand-roll everything because our intended operator has no inputs, but ignoring that let's talk through what we need to sort out.

Each timely dataflow operator, and especially inputs, need to specify what initial capabilities they require. Without initial capabilities, an operator with no inputs will never acquire capabilities and can never send anything. So we want some initial capabilities so that the operator can act as a source of data. Conventionally, we just take T::min() as the initial capability, and then downgrade it once we get started. If it were helpful, we could think harder about the state of affairs in the trace and start with an advanced timestamp, but it isn't clear if this is helpful; if it ever becomes helpful the option exists.

In operation, a timely dataflow operator needs to indicate what it does each time it gets tickled, which in our restricted setting includes (i) the sending of batches, and (ii) the downgrading of output capabilities. In essence, we would like our operator to replay the actions of the source arrange operator, and can imagine a shared queue from the source operator into which batches are deposited. Each batch is "self-describing", in that it has an upper and lower bound on the times contained within. Assuming a FIFO queue, the upper bound in an observed batch is the input frontier at the source operator at the time the batch was minted. These upper bounds should allow the proxy operator to manage its capabilities, conservatively.

Importantly, the proxy operator should not track the capabilities of the source operator; the source operator only holds capabilities for updates it is currently buffering, and it may receive new updates with upgraded capabilities in the future. Timely dataflow is paying attention to that, and ensuring that downstream consumers do not get carried away. The proxied operator does not have this benefit, and must present the full view of progress information, which is what is reflected by the input frontier to the source arrange operator.

Although the upper bounds in the descriptions of the produced batches typically track this input frontier, they can lag behind in empty regions of time: although the arrange operator can see that it can mint a new empty batch with advanced upper bounds, from its input frontier, it cannot send this batch to anyone as it lacks the capability to do so. In the source dataflow scope this is not a problem, as downstream operators use the progress information provided by timely dataflow. In the proxy scope, we do not have this benefit. Instead, the source arrange operator can explicitly communicate that its frontier has advanced, if this happens when it cannot send a batch. Probably the best way to do this is to allow the shared queue to have two types of content: a batch and a progress statement. Other options include putting an empty batch with the right bounds into the shared queue, but we diverge from "exactly the same stream" at this point, and operator logic may be subtly wrong or different.


To sum up, I propose adding an initially empty shared list of shared queues in the arrange operator, extending the trace handle to have access to this shared list, and creating a new method that will mint a new operator, mint and register a new shared queue, and produce a new Arrangement reflecting the handle and the new stream.

The registration process should do a few things; in addition to adding the queue to the list, it should probably push copies of all existing batches into the queue, so that the operators see the accumulated history. Most operators require this for correct operation. I believe this should "just work" in that these batches are also self-describing, and the proxy operators behavior on seeing them should be no different than its behavior when it receives new batches.

The behavior of the arrange operator will likely need to be slightly enriched, so that it not only enqueues batches in each shared queue, but also observes the advance of its input frontier and interacts with the queues even when it cannot create and send batches along its outputs. This seems mostly straightforward, but who knows.


I can imagine a few possible issues, or things to think about:

  1. The bookmark functionality is under-specified at the moment. The main user, join, may get stressed out when it sees batches but cannot use the bookmarks corresponding to their bounds (as part of the replayed historical batches). This may not be a problem in the current code, because join drains each input completely before reflecting on bookmarks, and so doesn't worry about intermediate bookmarks, but the lack of problem would be by good luck rather than sane design.

  2. I'm not entirely certain what will happen when we feed a group operation with advanced batch data. At the moment there is no guarantee that the times in the data have any semantic meaning other than that they compare equivalently to future times. Thus, we may do crazy things like advance initial increments past subsequent decrements; replaying the collection as indicated would suggest transiently negative counts.

    I expect this to be pretty broken, really. It may require a careful thinking through of operators and what they are allowed to assume about the times in their batches. We might require that all times be "caught up" before they are used, something we do not do at the moment because of course a newly received batch is in advance of previously received batches, right?

    We could also consider the advancement strategy, so that old updates cannot be advanced past newer updates; we would need to be pretty thoughtful to guarantee that we only see transient collections that actually existed. For example, if whenever we advance we advance all batches in the trace, to a frontier no greater than future batches that may arrive, we should always have batches with some sane properties (and consistent advancement has other benefits: updates are more likely to cancel if advanced to the same frontier).

  3. As we hand out more handles to the trace, we have more operators that can impose requirements on the shared trace. The more folks who make bookmarks, or who require the trace be held back from consolidation, the less efficient the representation and the more work done by other operators. This might be enough of a pain-point to start to develop more robust traces: for example, we can start merging batches even with bookmarks within them, as long as we do not yet discard the old batches. Oblivious users can use the merged batches and faster cursors. Likewise, we could start speculatively advancing batches, retaining old batches for users that require them but providing the partially redundant compacted batches to users that do not. As the bounds advance, we just discard the old data and pivot to the new, rather than panicking to produce the new data.

    For now, I think we can ignore this and just see what happens.

Question: SIMD support in the worker

Looking at it can't find any reference of SIMD support in the worker.

Haven't had the chance to look more thoroughly at the project, but I assume it serializes the worker task and sends it accross the network(i'm interested in the distributed usage) where the job is launched as a thread.

It would be great to also be able to execute SIMD within each thread, as usually each physical core has a SIMD engine.

Is SIMD supported in workers? If, so, how about libraries like faster(https://github.com/AdamNiederer/faster)

Worker threads not going idle

I've noticed that worker threads in my program go into some form of an infinite loop when there is no input data. Specifically, I use worker 0 to receive input from an I/O thread. This one behaves well and blocks waiting for data, but all other workers are consuming 100% of CPU even when there is no data to process.

Here is the high-level structure of the program:

            timely::execute(Configuration::Process(nworkers), move |worker: &mut Root<Allocator>| {
                ...
                let mut sessions = worker.dataflow::<u64,_,_>(|outer: &mut Child<Root<Allocator>, u64>| {
                    ...
                });
                /* Only worker 0 receives data */
                if worker_index != 0 {return;};

                loop {
                    /*wait for user input*/
                    ...
                };
            }).map(|g| {g.join(); ()})

The stack trace of the worker that consumes 100% CPU looks like this:

#0  0x000055f9f020f9f0 in core::iter::iterator::Iterator::try_for_each ()
#1  0x000055f9f020fb56 in core::iter::iterator::Iterator::any ()
#2  0x000055f9f020831a in <T as core::slice::SliceContains>::slice_contains ()
#3  0x000055f9f027b616 in <timely::progress::frontier::MutableAntichain<T>>::rebuild_and ()
#4  0x000055f9f033d909 in <timely::progress::frontier::MutableAntichain<T>>::update_iter_and ()
#5  0x000055f9f031cf50 in <differential_dataflow::trace::wrappers::rc::TraceBox<K, V, T, R, Tr>>::adjust_through_frontier ()
#6  0x000055f9f01f6186 in <differential_dataflow::operators::arrange::TraceAgent<K, V, T, R, Tr> as differential_dataflow::trace::TraceReader<K, V, T, R>>::distinguish_since ()
#7  0x000055f9f036fa7c in <differential_dataflow::operators::arrange::Arranged<G, K, V, R, T1> as differential_dataflow::operators::group::GroupArranged<G, K, V, R>>::group_arranged::{{closure}} ()
#8  0x000055f9f0361fbd in <timely::dataflow::stream::Stream<G, D1> as timely::dataflow::operators::generic::operator::Operator<G, D1>>::unary_notify::{{closure}}::{{closure}} ()
#9  0x000055f9f0362528 in <timely::dataflow::stream::Stream<G, D1> as timely::dataflow::operators::generic::operator::Operator<G, D1>>::unary_frontier::{{closure}}::{{closure}} ()
#10 0x000055f9f0387a9f in <timely::dataflow::operators::generic::builder_rc::OperatorBuilder<G>>::build::{{closure}} ()
#11 0x000055f9f0366733 in <timely::dataflow::operators::generic::builder_raw::OperatorCore<T, PEP, PIP> as timely::progress::operate::Operate<T>>::pull_internal_progress ()
#12 0x000055f9f01e12e6 in <timely::progress::nested::subgraph::PerOperatorState<T>>::exchange_progress ()
#13 0x000055f9f01c6bd1 in <timely::progress::nested::subgraph::Subgraph<TOuter, TInner> as timely::progress::operate::Operate<TOuter>>::pull_internal_progress ()
#14 0x000055f9f0453fd9 in timely::dataflow::scopes::root::Wrapper::step::{{closure}} ()
#15 0x000055f9f045363b in <core::option::Option<T>>::map ()
#16 0x000055f9f0453f5b in timely::dataflow::scopes::root::Wrapper::step ()
#17 0x000055f9f03b9428 in <timely::dataflow::scopes::root::Root<A>>::step ()
#18 0x000055f9f03e8468 in timely::execute::execute_logging::{{closure}} ()
#19 0x000055f9f03d7870 in timely_communication::initialize::initialize_from::{{closure}} ()
#20 0x000055f9f03da9c2 in std::sys_common::backtrace::__rust_begin_short_backtrace ()
#21 0x000055f9f03dba72 in std::thread::Builder::spawn::{{closure}}::{{closure}} ()
#22 0x000055f9f03cbff2 in <std::panic::AssertUnwindSafe<F> as core::ops::function::FnOnce<()>>::call_once ()
#23 0x000055f9f02504e9 in std::panicking::try::do_call ()
#24 0x000055f9f04ad85a in __rust_maybe_catch_panic () at libpanic_unwind/lib.rs:102
#25 0x000055f9f025042b in std::panicking::try ()
#26 0x000055f9f03db1f2 in std::panic::catch_unwind ()
#27 0x000055f9f03db96c in std::thread::Builder::spawn::{{closure}} ()
#28 0x000055f9f03e3128 in <F as alloc::boxed::FnBox<A>>::call_box ()
#29 0x000055f9f04a059b in _$LT$alloc..boxed..Box$LT$$LP$dyn$u20$alloc..boxed..FnBox$LT$A$C$$u20$Output$u3d$R$GT$$u20$$u2b$$u20$$u27$a$RP$$GT$$u20$as$u20$core..ops..function..FnOnce$LT$A$GT$$GT$::call_once::h21b1f1e31097750a () at /checkout/src/liballoc/boxed.rs:656
#30 std::sys_common::thread::start_thread () at libstd/sys_common/thread.rs:24
#31 0x000055f9f048f116 in std::sys::unix::thread::Thread::new::thread_start () at libstd/sys/unix/thread.rs:90
#32 0x00007fbb66c746db in start_thread (arg=0x7fbb655fe700) at pthread_create.c:463
#33 0x00007fbb6678588f in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:95

I'd appreciate any hints on how to investigate this.

Implement a DistinctTotal trait and operators

Issue #74 calls for specialized implementations for totally ordered timestamps. The distinct and distinct_u operators seem to be good candidates for this: they are superficially similar to count, and their current implementation is via the very expensive group and group_u operators.

This implementation should be very nearly that of CountTotal, except that there is further opportunity to avoid sending out when changes in the count of input keys does not change the occurrence of the output key.

Batches contain more `Rc` that one might want

The batch implementations currently use Rc somewhat liberally, mostly in support of easily written cursors that can own layers of the batch (the allocations containing keys, values, times, etc). This seems relatively harmless in normal function, as the batches themselves are wrapped with Rc and liberally shared where possible; they are intended to be immutable, and treated as such.

However, certain bits of logic would be much simpler if each batch was uniquely owned. The main one I'm thinking of at the moment is durability, where we would like to be able to serialize and deserialize such an object with something like Abomonation, i.e. just mapping some memory, but the existence of Rc internal to the type makes this not possible (Rc is not safe to abomonate, due to the Cell wrappers around the counts, which are meant to keep allocations alive but wouldn't).

Another less critical example, advance_mut is the method that transforms a batch into one where times have been advanced to the current frontier, and it is intended to allow this to happen in place if there is unique ownership of the underlying resources. This is relatively easy to sort out if there is one Rc around the batch, and more complicated if there are several Rcs inside as well.

The main benefit of the internal Rc types is that cursors can be written independently for each layer, taking (shared) ownership of the layer rather than worrying about borrowing it. This allows us to write cursors modularly, and compose them easily. If we removed the option of shared ownership, we would seemingly need to either (i) rewrite cursors as acting on borrowed slices, or (ii) write a cursor monolithically, taking (shared) ownership of the whole batch.

Some thoughts on each:

  1. Borrowing is annoying, but there is reason to think that we might eventually want this anyhow, if we end up with indices in each trace (from key to locations in a subset of batches). These indices need to be borrowed, rather than shared, (the benefit is that they are updated in place), and so we end up with a borrow anyhow.

  2. Monolithic cursor writing doesn't need to be complicated, as the cursor side of the trace layers is pretty simple (compared to the merge logic, which is a bit more complicated and should stay modular, in my opinion). It is probably worth prototyping this out to see if it is horrid or not.

In either case, we may need to rethink "what is a batch" to understand whether it is something pre-Rcd, or whether it is something we should expect to wrap in a Rc. I am inclined toward making Batch : Clone with the understanding that the clone implementation should be relatively efficient (i.e. will be called regularly). If this means an internal Rc great, and outside of the batch we won't wrap it in one.

failed to execute the bfs example

Hi,

I tried to execute the bfs example as described in README but it failed. cargo complains that the required radix sort repository is deleted. Is it possible to recover the repository?

Below is an error message:

$ cargo run --release --example bfs
    Updating git repository `https://github.com/frankmcsherry/graph-map.git`
    Updating registry `https://github.com/rust-lang/crates.io-index`
    Updating git repository `https://github.com/frankmcsherry/radix-sort.git`
Unable to update https://github.com/frankmcsherry/radix-sort.git

Caused by:
  failed to clone into: /Users/mitake/.cargo/git/db/radix-sort-544b6b7f8774bde6

Caused by:
  failed to authenticate when downloading repository

To learn more, run the command again with --verbose.

type annotations required: cannot resolve `_: timely::progress::Timestamp`

Hi,
I am trying to just see how fast it takes to update the graph format being used:
` timely::execute_from_args(std::env::args().skip(5), move |worker| {
let index = worker.index();
let peers = worker.peers();

let (edgeArray, graph) = worker.dataflow(|scope| {
let (mut input, graph) = scope.new_collection();
let mut v = Vec::<(u32, u32)>::new();
let file = BufReader::new(File::open(filename.clone()).unwrap());

for (count, readline) in file.lines().enumerate() {
    let line = readline.ok().expect("read error");
    if !line.starts_with('#') {
        let mut elts = line[..].split_whitespace();
        let src: u32 = elts.next().unwrap().parse().ok().expect("malformed src");
        let dst: u32 = elts.next().unwrap().parse().ok().expect("malformed dst");
            v.push((src, dst));
         }
    }
    (v, graph)
});`

However, I am getting this error:
error[E0283]: type annotations required: cannot resolve '_: timely::progress::Timestamp'
--> examples/updates.rs:34:41
|
34 | let (edgeArray, graph) = worker.dataflow(|scope| {
| ^^^^^^^^
(Arrows are pointing under dataflow)

Any ideas why? Thank you!

Depend on timely 0.2

Playing around with differential-dataflow atm, I realized that you are depending on timely's git repository rather than the 0.2 version crates.io. Is there a specific reason for that, i.e., would it be harder to quickly iterate on new features, if you'd depend on 0.2?

From a user's perspective, it's always nicer to depend on a stable-ish version, only updating once in a while.

Description bounds could have type `Antichain`.

In several places we use Vec<T> and &[T] to indicate frontiers, with the implied contract that they should be antichains. We could actually use antichains through timely dataflow's Antichain<T> type.

Perhaps with some additional wrapping (e.g. Frontier<T>) we (or timely) could partially order these antichains too, using the rule that one comes before another if each element of the other comes after some element of the former.

fn le(a1: &Frontier<T>, a2: &Frontier<T>) -> bool {
    a2.elements().iter().all(|t2| a1.elements().iter().any(|t1| t1.le(t2)))
}

This would clean up a bit of code (not lots) and make parts less error-prone / higher confidence.

Relax requirements of `Diff`

The Diff trait constraints the types "differences" that differential dataflow updates can use. It currently corresponds to an Abelian group, which is something that can be added, has an inverse, and satisfies associativity and commutativity.

This property is great in that operators consuming such differences as inputs can produce such differences as outputs. In general, operators like group need the flexibility to subtract, and we have not great information about the order of accumulation (we could try harder here, but there are some semantic issues iirc).

At the same time, some operators do not need to negate records, for example join. As well as operators like map, filter, concat, and such. It would seem such operators could use a less restrictive requirement, in particular they may not require inversion.

Removing the requirement of inversion would allow differential dataflow to express monotonic computations more clearly, in their type: if a record may only be added and never removed, this can be reflected in the type () which simply doesn't have an inverse (nor a zero, perhaps?). If a maximization or minimization can only move in one direction, we need not support full retractions, and combined with the type information the consumer can know that it should never expect such.

Both inputs and outputs can reflect this type information, and we could know that a collection at any point is monotonic or not, and if appropriate specialize the implementation.

It seems that many of the internal accumulation strategies could still apply even without inverses, as the rules of accumulation merge the results without negation (reflecting an opportunity we are ignoring). We may want to understand whether we require a Zero associated value, which is what allows us to discard fully ineffective updates, but for many monotonic quantities you simply can't achieve this anyhow (i.e. no retraction).

RadixBatcher compaction re-sorts sorted data

In order to make sure that we manage collections that reduce in size when accumulated (e.g. due to cancellation or consolidation), the Batcher implementations need to organize the data they receive. Right now, the RadixBatcher does this by radix sorting the data, but the easiest way to do this over time is just to feed in sorted and unsorted data and sort them all again.

Ideally, we might sort the unsorted data and maybe merge with the previously sorted data. Alternately, we might use the Batch type to store the data in a compact form, if the batch implementation can be relied on to compact the data (none of the current implementations compact data, other than in advance).

Crash in nested scope

I am using the Variable implementation from the dataflog.rs example to construct recursive relations. However, my program crashes any time I use a nested scope. Here is the minimal failing example. The definition of struct Variable is copied from dataflog.rs verbatim. The actual dataflow consists of collection1 and collection2, with collection2 being an exact copy of collection1.

Note that there is no recursion here, so this does not actually require a nested scope. I only use it demonstrate the failure. It still crashes if I use an actual recursive definition.

extern crate timely;
extern crate differential_dataflow;

use timely::dataflow::*;
use timely::dataflow::operators::probe;
use timely::dataflow::operators::feedback::Handle;

use differential_dataflow::input::Input;
use differential_dataflow::{Collection,Data,Hashable};
use timely::progress::nested::product::Product;
use timely::dataflow::scopes::Child;
use timely::dataflow::operators::*;
use differential_dataflow::operators::*;
use differential_dataflow::lattice::Lattice;

pub struct Variable<'a, G: Scope, D: Default+Data+Hashable>
where G::Timestamp: Lattice+Ord {
    feedback: Option<Handle<G::Timestamp, u64,(D, Product<G::Timestamp, u64>, isize)>>,
    current: Collection<Child<'a, G, u64>, D>,
    cycle: Collection<Child<'a, G, u64>, D>,
}

impl<'a, G: Scope, D: Default+Data+Hashable> Variable<'a, G, D> where G::Timestamp: Lattice+Ord {
    /// Creates a new `Variable` from a supplied `source` stream.
    pub fn from(source: &Collection<Child<'a, G, u64>, D>) -> Variable<'a, G, D> {
        let (feedback, cycle) = source.inner.scope().loop_variable(u64::max_value(), 1);
        let cycle = Collection::new(cycle);
        let mut result = Variable { feedback: Some(feedback), current: cycle.clone(), cycle: cycle };
        result.add(source);
        result
    }
    /// Adds a new source of data to the `Variable`.
    pub fn add(&mut self, source: &Collection<Child<'a, G, u64>, D>) {
        self.current = self.current.concat(source);
    }
}

impl<'a, G: Scope, D: Default+Data+Hashable> ::std::ops::Deref for Variable<'a, G, D> where G::Timestamp: Lattice+Ord {
    type Target = Collection<Child<'a, G, u64>, D>;
    fn deref(&self) -> &Self::Target {
        &self.cycle
    }
}

impl<'a, G: Scope, D: Default+Data+Hashable> Drop for Variable<'a, G, D> where G::Timestamp: Lattice+Ord {
    fn drop(&mut self) {
        if let Some(feedback) = self.feedback.take() {
            self.current.distinct()
                        .inner
                        .connect_loop(feedback);
        }
    }
}

fn main() {
    timely::execute_from_args(std::env::args(), move |worker| {
        
        let mut probe = probe::Handle::new();
        let mut input1 = worker.dataflow(|scope| {

            let (input1, collection1) = scope.new_collection();
            let (_, collection2) = scope.new_collection();

            let collection2 = scope.scoped(|inner| {
                let var1 = Variable::from(&collection1.enter(inner));
                let mut var2 = Variable::from(&collection2.enter(inner));

                var2.add(&var1);
                var2.leave()
            });

            collection2.inspect(|x| println!("\t{:?}", x))
                       .probe_with(&mut probe);

            input1
        });

        input1.insert(1);

        input1.advance_to(1); input1.flush();
        worker.step_while(|| probe.less_than(input1.time()));
    }).unwrap();
}

Here is the stack trace:

thread 'worker thread 0' panicked at 'failed to find index', libcore/option.rs:916:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
stack backtrace:
   0: std::sys::unix::backtrace::tracing::imp::unwind_backtrace
             at libstd/sys/unix/backtrace/tracing/gcc_s.rs:49
   1: std::sys_common::backtrace::print
             at libstd/sys_common/backtrace.rs:71
             at libstd/sys_common/backtrace.rs:59
   2: std::panicking::default_hook::{{closure}}
             at libstd/panicking.rs:207
   3: std::panicking::default_hook
             at libstd/panicking.rs:223
   4: std::panicking::rust_panic_with_hook
             at libstd/panicking.rs:402
   5: std::panicking::begin_panic_fmt
             at libstd/panicking.rs:349
   6: rust_begin_unwind
             at libstd/panicking.rs:325
   7: core::panicking::panic_fmt
             at libcore/panicking.rs:72
   8: core::option::expect_failed
             at libcore/option.rs:916
   9: <core::option::Option<T>>::expect
             at /checkout/src/libcore/option.rs:302
  10: <differential_dataflow::operators::group::history_replay::HistoryReplayer<'a, V1, V2, T, R1, R2> as differential_dataflow::operators::group::PerKeyCompute<'a, V1, V2, T, R1, R2>>::compute
             at ./src/operators/group.rs:840
  11: <differential_dataflow::operators::arrange::Arranged<G, K, V, R, T1> as differential_dataflow::operators::group::GroupArranged<G, K, V, R>>::group_arranged::{{closure}}
             at ./src/operators/group.rs:425
  12: <timely::dataflow::stream::Stream<G, D1> as timely::dataflow::operators::generic::unary::Unary<G, D1>>::unary_notify::{{closure}}::{{closure}}
             at /home/lryzhyk/.cargo/git/checkouts/timely-dataflow-7b255df3956d217b/e741c1d/src/dataflow/operators/generic/unary.rs:94
  13: <timely::dataflow::stream::Stream<G, D1> as timely::dataflow::operators::generic::operator::Operator<G, D1>>::unary_frontier::{{closure}}::{{closure}}
             at /home/lryzhyk/.cargo/git/checkouts/timely-dataflow-7b255df3956d217b/e741c1d/src/dataflow/operators/generic/operator.rs:337
  14: <timely::dataflow::operators::generic::builder_rc::OperatorBuilder<G>>::build::{{closure}}
             at /home/lryzhyk/.cargo/git/checkouts/timely-dataflow-7b255df3956d217b/e741c1d/src/dataflow/operators/generic/builder_rc.rs:128
  15: <timely::dataflow::operators::generic::builder_raw::OperatorCore<T, PEP, PIP> as timely::progress::operate::Operate<T>>::pull_internal_progress
             at /home/lryzhyk/.cargo/git/checkouts/timely-dataflow-7b255df3956d217b/e741c1d/src/dataflow/operators/generic/builder_raw.rs:211
  16: <timely::progress::nested::subgraph::PerOperatorState<T>>::exchange_progress
             at /home/lryzhyk/.cargo/git/checkouts/timely-dataflow-7b255df3956d217b/e741c1d/src/progress/nested/subgraph.rs:802
  17: <timely::progress::nested::subgraph::Subgraph<TOuter, TInner> as timely::progress::operate::Operate<TOuter>>::pull_internal_progress
             at /home/lryzhyk/.cargo/git/checkouts/timely-dataflow-7b255df3956d217b/e741c1d/src/progress/nested/subgraph.rs:542
  18: <timely::progress::nested::subgraph::PerOperatorState<T>>::exchange_progress
             at /home/lryzhyk/.cargo/git/checkouts/timely-dataflow-7b255df3956d217b/e741c1d/src/progress/nested/subgraph.rs:802
  19: <timely::progress::nested::subgraph::Subgraph<TOuter, TInner> as timely::progress::operate::Operate<TOuter>>::pull_internal_progress
             at /home/lryzhyk/.cargo/git/checkouts/timely-dataflow-7b255df3956d217b/e741c1d/src/progress/nested/subgraph.rs:542
  20: timely::dataflow::scopes::root::Wrapper::step::{{closure}}
             at /home/lryzhyk/.cargo/git/checkouts/timely-dataflow-7b255df3956d217b/e741c1d/src/dataflow/scopes/root.rs:159
  21: <core::option::Option<T>>::map
             at /checkout/src/libcore/option.rs:404
  22: timely::dataflow::scopes::root::Wrapper::step
             at /home/lryzhyk/.cargo/git/checkouts/timely-dataflow-7b255df3956d217b/e741c1d/src/dataflow/scopes/root.rs:159
  23: <timely::dataflow::scopes::root::Root<A>>::step
             at /home/lryzhyk/.cargo/git/checkouts/timely-dataflow-7b255df3956d217b/e741c1d/src/dataflow/scopes/root.rs:46
  24: <timely::dataflow::scopes::root::Root<A>>::step_while
             at /home/lryzhyk/.cargo/git/checkouts/timely-dataflow-7b255df3956d217b/e741c1d/src/dataflow/scopes/root.rs:59
  25: example::main::{{closure}}
             at examples/example.rs:81
  26: timely::execute::execute_logging::{{closure}}
             at /home/lryzhyk/.cargo/git/checkouts/timely-dataflow-7b255df3956d217b/e741c1d/src/execute.rs:152
  27: timely_communication::initialize::initialize::{{closure}}
             at /home/lryzhyk/.cargo/registry/src/github.com-1ecc6299db9ec823/timely_communication-0.5.0/src/initialize.rs:167
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Any', libcore/result.rs:945:5
stack backtrace:
   0: std::sys::unix::backtrace::tracing::imp::unwind_backtrace
             at libstd/sys/unix/backtrace/tracing/gcc_s.rs:49
   1: std::sys_common::backtrace::print
             at libstd/sys_common/backtrace.rs:71
             at libstd/sys_common/backtrace.rs:59
   2: std::panicking::default_hook::{{closure}}
             at libstd/panicking.rs:207
   3: std::panicking::default_hook
             at libstd/panicking.rs:223
   4: std::panicking::rust_panic_with_hook
             at libstd/panicking.rs:402
   5: std::panicking::begin_panic_fmt
             at libstd/panicking.rs:349
   6: rust_begin_unwind
             at libstd/panicking.rs:325
   7: core::panicking::panic_fmt
             at libcore/panicking.rs:72
   8: core::result::unwrap_failed
             at /checkout/src/libcore/macros.rs:26
   9: <core::result::Result<T, E>>::unwrap
             at /checkout/src/libcore/result.rs:782
  10: <timely_communication::initialize::WorkerGuards<T> as core::ops::drop::Drop>::drop
             at /home/lryzhyk/.cargo/registry/src/github.com-1ecc6299db9ec823/timely_communication-0.5.0/src/initialize.rs:192
  11: core::ptr::drop_in_place
             at /checkout/src/libcore/ptr.rs:59
  12: example::main
             at examples/example.rs:56
  13: std::rt::lang_start::{{closure}}
             at /checkout/src/libstd/rt.rs:74
  14: std::panicking::try::do_call
             at libstd/rt.rs:59
             at libstd/panicking.rs:306
  15: __rust_maybe_catch_panic
             at libpanic_unwind/lib.rs:102
  16: std::rt::lang_start_internal
             at libstd/panicking.rs:285
             at libstd/panic.rs:361
             at libstd/rt.rs:58
  17: std::rt::lang_start
             at /checkout/src/libstd/rt.rs:74
  18: main
  19: __libc_start_main
  20: _start

Why is `group` slow? Is it slow?

The group operator seems to get down to about a few microseconds per update, which seems too large especially if there is little to do. With large updates, we are essentially sorting the data, and then zipping two sorted lists. The amount of per-key logic could in principle be very small, but several microseconds per update is surprisingly large.

Specifically, the degrees.rs example does two count operators in sequence, each of which should be very simple logically. The first count has low contention (single updates per key) and the second count has higher contention, but neither of these seem like they should take much compute per update. Clearly that isn't the case at the moment, and it would be good to either learn why it is important, or clean things up so that the painful cases are eased.

Simple things to check out: there is no logic in group for an early exit for trivial updates. If there is a single time, we still prep a great deal of state to help implement things asymptotically awesomely; perhaps we should bail out in the same way that join does.

Insertion sort for consolidation

We very often "consolidate" collections of updates, triples (data, time, diff), so that there is at most one element with each data and time. This is currently done by sorting such a collection by (data, time) and then swinging through the result.

Most sorting algorithms use insertion sort on small sets, either in the recursive base case for quicksort or to form sorted runs for mergesort. We could perform a similar action when adding elements to a "compactable list", with the improvement that if we find a matching element it can be compacted in place. Ideally this would be no worse than sorting the data (although it probably does work that subsequent sorting can't exploit perfectly), and often much better (when adjacent elements collapse, or when the set of data and times is small).

More generally, we should be mindful that consolidation is not always cheap (we must allocate the backing memory, and then perform super-linear sorting), and a solid implementation of compaction should have pervasive performance wins.

Bring back progressive merging

Right now, when the Spine type decides it is time to merge two batch, because they have roughly the same size, it just goes and does it. This is fine for small batches, but for larger batches it can take an amount of time disproportionate to the amount of input that may have triggered it (e.g. just a few updates). This should lead to scaling issues, where as we add more workers, we are all the more likely that one of them wants to do a large-ish merge (though, the sizes of batches each manage should decrease).

Way back when, we tested out "progressive merging", where instead of doing the merge immediately we schedule the merge and do a certain amount of work on it each time we drop some tuples in, roughly proportional to the number of tuples we dropped in. This works out mathematically, in that before we need to merge again we must have introduced at least as many tuples as we are merging, so the merge should be done by then, and it also lowered the peak latency by ~5x or so (though not to uniform, because, .. I guess things still depend on how many merges are live at once).

This was pretty easy to put into place, although it bypassed the "clever" merging of the current layer design, which allows for memcpying of large blocks. It had some throughput overhead, in that continuing to present un-merged data means more work for readers than just using the merged data. But, we should be able to put together an experiment that reveals the scaling limitation, with the expectation that the scaling could improve with progressive merging (the throughput hit of progressive merging should be weighed against the current throughput hit of idle time for workers as one does a large merge).

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.