binpash / pash Goto Github PK
View Code? Open in Web Editor NEWPaSh: Light-touch Data-Parallel Shell Processing
License: MIT License
PaSh: Light-touch Data-Parallel Shell Processing
License: MIT License
It is possible to create a deadlock in the shell by not consuming the output of a command. This behaviour can be created in the shell without dish
(see https://github.com/andromeda/dish/blob/master/evaluation/scripts/head_deadlock.sh).
In dish
it shows up when we parallelize a command and in the end we only ask for its head (or fewer lines than the input). Then some commands never exit since no reader has opened their output pipe for reading. This prevents some of the unix50 pipelines from being executes.
I see some possible alternative ways to solve this:
/dev/null
(as we did for split
in the past).Based on the work by Farzan and NIcolet (http://www.cs.toronto.edu/~azadeh/papers/pldi17-ex.pdf) the class of commands that can be parallelized using divide-and-conquer (map-reduce) can be extended if auxiliary state is kept.
An interesting example where this is the case is:
tee s2 | tail +2 | paste s2 -
This can be parallelized using the map and reduce shown in: (https://github.com/andromeda/dish/blob/master/evaluation/tools/bigrams_aux_map.sh).
Dish has to be extended to handle this generalized map-reduce pattern. This requires:
Need to include distributability statistics for POSIX commands, similar to GNU coreutils.
Need to add a high-level overview figure showing the sequence of phases that take place internally in Dish.
Related to #57.
At the moment PaSh compiles commands to calls to the PaSh runtime (since they could be run in parallel).
This leads to export
commands also being compiled to calls to the PaSh runtime by the preprocessing step, leading to exported variables not being seen in the parent shell.
We can avoid compile calls to export
but this cannot be safely done always (e.g. when EXP=export
and $EXP $MYVAR=1
).
A hacky way to solve this would be to keep track of the environment in the pash_runtime, and propagate its changes to the parent shell manually.
However, a solution that solves both these issue and the one in #57 would be to have a more tight integration with a shell (instead of calling to the PaSh runtime in places of potential performance benefits).
@nvasilakis @mgree We should discuss if the hacky solutions for this and #57 are adequate or if we prefer to have a more principled approach and work on the tight integration alternative.
At the moment when debugging a parallel script, if we kill it using SIGINT, then all of its spawned background processes stay alive and this might lead to inconsistent results (due to commands waiting to read and write to pipes).
We should catch the sigint in the output script and kill all the children processes. This could happen in the backend that could output a signal handler in the output parallel script (see https://stackoverflow.com/questions/2618403/how-to-kill-all-subprocesses-of-shell on how to kill).
Address the issues shown in this script.
They refer to issues with the currect dataflow termination mechanism.
Copied here for completeness:
## This way of fixing the problem suffers from some issues.
##
## - First of all, gathering the children after the end of the graph
## seems to gather more than just the alive nodes. This could lead
## to killing some random pid in the system. This could potentially
## be solved by gathering all pids incrementally.
##
## - In addition, this way of getting the last pid does not work if
## there is more than one output. (This is never the case in our
## tests, but could be.
##
## - Finally, it is not local, since all of the monitoring happens
## globally. Ideally, it should be done by a wrapper in each -
## node. The wrapper should monitor if the node dies, and if so it -
## should send SIGPIPE to all its producers.
This issue is for tracking progress on the super-optimization component. This component solves the challenge of generating a script that is likely to exhibit the highest possible performance for the available script, input data, and broader environment. The script that meets these requirements may well be the sequential script โ in which case, pash
needs to avoid delaying its execution at all costs. Keeping the fastest path fast will be challenging, because this component will incorporate some exploration of a configuration search space.
More discussion in the docs/superopt document.
The current optimizer (parallelization module) located in distr_plan.py
performs two optimizations. It duplicates stateless and pure commands that have multiple inputs, and it splits a command's input pipe (if there is only one) to several so that the first optimization is enabled (see more: https://github.com/andromeda/dish/blob/master/compiler/distr_plan.py#L126).
At the moment, the second optimization is manual and is configured in config.yaml
by the fan_out
and batch_size
fields. Fan out determines the number of splits, and batch_size
determines the maximum number of lines of each split. The fact that batch size is fixed throughout the optimization leads to 2 problems:
The reason why batch_size
is needed in the first place is that the split happens sequentially, i.e. the first 100 lines of the input file go to the first batch, the second 100 lines to the second, ... A non-sequential split, would require a way to reorder batches afterwards, which might require some extra metadata and wrappers that deal with batch sequence numbers.
Can we somehow improve the split to be more automatic and adaptable depending on the circumstances? Since the optimizations are currently done statically, we don't really have information about the size of each input. In the future when the optimization process will be tightly coupled with interpretation, we might be able to have a more dynamic split optimization.
For reference on how split is currently implemented by the backend, (see https://github.com/andromeda/dish/blob/master/evaluation/tools/split_pipe.sh).
Parallelizing commands that have multiple inputs (such as comm -23
) needs to ensure that the non-parallelized (and thus duplicated input) is not a pipe but a file.
Example:
cat in1 in2 | comm -23 - in3
should not become:
comm -23 in1 in3 | cat pipe1 &
comm -23 in2 in3 | cat pipe2 &
cat pipe1 pipe2
if in3 is a pipe
This is a low priority issue to deal with after OSDI. It would require some analysis (or a materialization of the pipe) as file to make this correct.
Note: This is a possibly large task.
Implement the DSL for the stateless and pure commands from the PLDI paper.
This would require the following:
find_command_category(command, options)
in ir.py
for each commanddish
that would allow parallel execution (e.g., paste)(The only thing remaining is cleaning up the directory of intermediary results.)
Extend the runtime to starts the sequential script and the pash_runtime_compiler
at the same time and only runs the parallel one if the sequential is not done yet. The main challenge is to ensure that a half executed sequential script does not affect the state of shell and the whole environment such that the parallel script will not have correct results.
At first sight, it seems that if the compiler succeeds in optimizing the script, then it should be a dataflow program meaning that it only affects the file system state with its output files. Is there any case where this is not safe? If the inputs and outputs are not simple files (e.g. some device or pipe), it is probably not safe to do that.
This requires some deeper investigation.
Even though this extension seems to not significantly improve performance, it really helps when running a lot of small trivial scripts. Finding a benchmark with a lot of small scripts can help showcase the improvements achieved by it.
RaftLib is a streaming library with a few benchmarks we could quickly express in shell scripts. This would allow us to compare both performance and line counts.
The code was recently restructured to have an internal AstNode structure for the AST tree.
However currently the ast is transformed from a json object to an AstNode at the start of compile_ast
, then transformed back to json at the end of compile_ast
, and again to an AstNode at the start of replace_irs
and at its end. This makes all internal compile and replace functions ugly as they produce output with a different from their input.
We should modify compilation so that the transformation to AstNode happens once before compile_ast
and then both these functions work in our representation.
We need to add installation instructions and the versions that we use to run everything for the parser and compiler.
At the moment the optimizer introduces splits and duplicates commands that are stateless or pure independently. Therefore, in the optimized script, there could be a split
followed by cat
.
This pattern is the identity, so we could completely remove it. We could introduce an optimization that removes that pattern where it sees it.
It is important to be able to reflect changes in environment variables to handle whole shell scripts.
An example script is: https://github.com/andromeda/dish/blob/master/evaluation/intermediary/environment_changes_2_seq.sh
A good starting point could be this SO post: https://stackoverflow.com/questions/22969121/can-i-run-a-bash-script-in-python-and-keep-any-env-variables-it-exports
There is a (non-blocking) issue with xargs, that requires specific treatment. I am not sure if we need to figure this out and mention this in the OSDI paper or if we should deal with this later. The problem can be seen in shortest_scripts
(https://github.com/andromeda/dish/blob/master/evaluation/microbenchmarks/shortest_scripts.sh).
More precisely, the command xargs -L 1 wc -l
returns 0 if the input is empty, and otherwise it returns line_number script_path
for each line of the input script_path
. This means that if we parallelize and some of the chunks end up being empty (because of the previous grep), the output will contain lines with 0.
There are two observations to be made from the above:
xargs -L 1
is not always stateless. In general our treatment of xargs is kinda hacky, so we should discuss how exactly we address its issues in the paper. @nvasilakis @abenetopoulos Maybe we can say that xargs
due to its dynamicity needs a specific treatment and a developer must annotate each xargs in their code with whether it can be parallelized or not.BTW for the OSDI evaluation I have dealt with the issue by adding a grep
that filters 0
lines (which makes sense in general as with empty input this pipeline should return empty output.
Similarly to how we handle |
and &
by combining the dataflow graphs that they combine, we can also propagate up the dataflow graph that was extracted from a script
in a bash -c "script"
and combine it with the external dataflow graph.
This is also useful when calling whole scripts in a pipeline (e.g. ./bigrams.sh
).
Since PaSh interposes calls to PaSh's runtime in parallel sections of the script, shell variables (non-exported local ones) are not accessible to the runtime. This leads to the following script not being correctly compiled:
N=100
seq 1 $N | sort -rn
as it is compiled to the following:
N=100
pash_runtime ...
This needs to be addressed somehow.
A possible (hacky) solution could be to diff
the environment and local variables at the beginning of each pash_runtime and make sure that the local ones are saved for PaSh to know about.
The assertion in (https://github.com/andromeda/dish/blob/master/evaluation/tools/eager.c#L89) sometimes fails mysteriously. (See https://github.com/andromeda/dish/blob/master/evaluation/results/minimal_sort_100_distr.time)
It seems to mostly be failing for some big testcases. I suspect that lseek
errs and returns a negative value. Maybe it has to do with the fact that the offset is too big to be returned from lseek
?
Could we somehow parallelize trivial for
loops? We need some analysis on the files that the body affects. We also need to statically now the range of the for loop, to inline its body in the internal representation. For now we could deal with only the simplest loops.
I would expect that the check would be added in compile_node_for, and if the loops is parallelizable, the body would be included in the IR as many times as the loop would run (similarly to compile_node_pipe).
If we try to execute the bigrams script (by running ./execute_evaluation_scripts.sh
in the compiler/
directory) it crashes. The error happens at https://github.com/andromeda/dish/blob/master/compiler/definitions/ir/command.py because BigramGMap is not imported.
I am not sure what is the best way to address this issue. One solution would be to somehow connect the map and reduce functions of a command (in this case BigramGMap and BigramGReduce with the command bigrams_aux
) in a generic way, so that in this line of code, the number of outputs can be extracted from the command itself.
Adding a few to-fix items from a recent discussion:
dish
to pash
distr
to par
in [scripts](./compiler/execute_compile_evaluation_script.sh, e.g.2_distr.time
)width
parameter everywhereeager
by (i) buffering in memory (ii) use dissk up to a limit.Implement the from_ir_to_shell
function in dish.py
. This function should call the libdash parser to produce an output script from the AST.
https://github.com/andromeda/dish/blob/master/evaluation/scripts/max-temp/max-temp.sh
Invalid weather data URL.
With the new evaluation script _distr.time
shows the combined compilation and execution time. Add a flag (or config option) in dish that produces timing information in stderr.
Make sure that web-index can be executed and gather results.
For the sets of pipelines that we have, it would be great to run experiments with distributed execution. For now, Dish can rewrite pipelines using a custom wrapper around nc
, and custom network-aware splitters and mergers similar to the single-nodes ones we have now.
Warning: Requires major rewriting, but can hopefully be done in parallel.
At the moment, the dataflow graph only supports nodes with one input and one output stream. Many incoming edges represent reading from them in order.
In order to support paste, and any command that arbitrarily reads from its inputs, we could add nodes that act as enforcers of a reading pattern from inputs. For example, a cat
node could enforce reading from its inputs in order. A zip
node would read in lockstep from its inputs. This way, the model will be able to handle any node.
Changes required:
Node
subclasses like cat
and zip
.cat
with many incoming edges followed by a stateless node can be parallelized to many cat
s followed by many stateless nodes and a final cat
.Include the rests of the evaluation scripts in the test directory (with small inputs) and make sure that they all succeed.
This also requires dealing with the extra character that is included in the distributed output.
Now that we have thought a bit more regarding the validity of the DFG (and that we have pushed expansion and compilation to the runtime), we can actually implement the validity checks when compiling an AST to a DFG (no cycles, no more than one reader/one writer for each pipe, no more than one writer for each file etc).
As part of complete POSIX compliance (see #7), we have to handle redirections in input scripts. At the moment they are completely ignored.
This is essential as an optimization for better load-balancing splits.
Essentially independent lines can be split into microbatches that are shuffled between different lines. The microbatches need to be augmented with a sequence number so that they can be reassembled afterwards. Obviously to get benefit we need to be able to commute the reassembler with other nodes and move it as far to the right as possible. Also nodes between a microbatcher and a reassembler have to be wrapped with a component that removes the metadata (sequence number) and then feeds them the necessary data.
Tasks:
A schematic of the optimization can be seen below:
---[p]---
---[p]---
---[p]---
where p is a simple node.
turns to:
---| |---[w(p)]---| |---
---|m|---[w(p)]---|r|---
---| |---[w(p)]---| |---
where
|m| is a microbatch-er
w(p) is a wrapper of the simple node that removes and reinserts sequence numbers
and |r| is a microbatch reassembler.
Move the libdash submodule and directory under the parser, and include it in the parser's build process.
Solve the bigrams implementation bug. At the moment, in both bigrams scripts, the last line of tee x | tail +2 | paste x -
contains only one word. When separating the inputs then more single words appear, and also some words might get lost.
A solution would be to extend alt_bigrams and bigrams with a command that removes the last line from output, and then extend the map
functions to send this extra word if it is needed for reducers to work.
FileId
class from unused and duplicate methodsAdd a ./dish
wrapper around each command in the AST. For now this wrapper should just be the identity function.
Eventually the DIsh architecture will change, and the wrapper will initiate compilation and optimization. The reason is that this way, the compiler/optimizer will be called as late as possible and will have as much information as possible.
Write a script to generate all *_n_seq.sh
, *_n_env.sh
, and *_n_env.sh
. These are now generated by hand and go into the ./intermediary results, but we would want to be able to create it automatically.
One of the key claims for an {O,N}SDI paper would be full distribution---including fs, runtime, and planner. Divide this to smaller tasks, and what it would take to complete them.
From the README:
Distributed file-system (and, more generally, networked distribution): as much of Unix is about files, incorporating an existing Unix-like distributed file-system is key to shell script distribution. With a DFS, /x/y would resolve to the filesystem of a specific node---but it should be location-aware so that Dish sends commands to where files (streams) reside.
Planner: augment and extend the functionality of the planner with the ability to optimize the distributed script and map it efficiently on the underlying infrastructure---using information about the workload, infrastructure, etc. The planner should also add and leverage metadata (lines, batches etc.) for key planning decisions.
Distributed runtime environment: currently Dish rewrites pipelines to extract parallelism---that is, the end result of our toolchain is still a shell pipeline. A more sophisticated runtime environment could provide significant benefits by manipulating metadata for both accelerating performance (e.g., wrapping commands with packing/unpacking) and fault tolerance (by knowing which of calls fail). It is important for the runtime to be optimized significantly so as to not have any issues.
The current JSON serializer produced from atdgen
does not result in valid JSON (it creates tuples etc). The bug seems easy to fix from the OCaml side, rather than our to_standard_json
function in json_ast.py.
Note: This is a large task that doesn't have clear subtasks at the moment.
At the moment dish
executes the first dataflow graph in the output script without executing the parts before or after it. After #6 is done, we should be able to execute the complete script together with all of its internal distributed dataflow sections.
This would require ensuring that the effect of environment changes in the dataflow graph (e.g. exporting a variable) is visible in the other script parts, as well as changes in the script parts should be visible to the dataflow graph.
Either delete them before exiting the eager command, or before exiting from the dish output script.
At the moment the aggregator for uniq
is uniq
and there is no implemented aggregator for uniq -c
.
We could implement very efficient aggregators for both of them
Because of the evaluation urgency, keeping track of the evaluation goals, who's working on them, and their progress is important:
alt_bigrams
(@angelhof )bigrams
deadlocksset-diff
(@angelhof )head
after a stateless) (@angelhof)wc
, string-matching
) (@abenetopoulos )speeduptest
to zoom into arbitrary commands (@nvasilakis )&
} macro-benchmark that provides a compelling story (@nvasilakis )speeduptest
targeted benchmarking to identify source (@nvasilakis ), and as well as more results within 50x and 100x to identify tipping point (@angelhof )parallel
as a point of comparisonThe current bigrams
(and alt_bigrams
) map and reduce steps are not totally correct because they don't return frequencies. We have to correct them to make a fair comparison with standard bigrams.
They can be found in https://github.com/andromeda/dish/blob/master/evaluation/microbenchmarks/bigrams_funs.sh and https://github.com/andromeda/dish/blob/master/evaluation/microbenchmarks/alt_bigrams_funs.sh
In the parser directory, after installing all the Ocaml tool-chain and dependencies on a new machine---i.e., running make dependencies
---, tried to run make
leads to:
make: *** No rule to make target `ast_json.ml', needed by `parse_to_json.native'. Stop.
Running the micro_1000
benchmark now takes forever. The optimization takes a long time. I think that the culprit is the method get_next_nodes_and_edges
that is called often in the distributed planner. Since the edges are not saved in the IR, every time someone asks for the next nodes and edges all other nodes have to be searched which leads to quadratic overhead. The reason why this was implemented like this is because of concerns that the saved edges might become inconsistent after a union
of files ids.
If we want to run the micro_1000
benchmark for OSDI, someone needs to take a look at that. However, I feel that this is relatively low priority, since there are other more important parts of the evaluation that we should work on.
In order to prepare for an evaluation on different machines we need to setup an infrastructure that allows running experiments on a remote machine and then collects the results together with some metadata about the machine.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.