Giter VIP home page Giter VIP logo

datafusion's Introduction

License Version Build Status Coverage Status Gitter chat

DataFusion: Modern Distributed Compute Platform implemented in Rust

DataFusion is an attempt at building a modern distributed compute platform in Rust, leveraging Apache Arrow as the memory model.

NOTE: DataFusion was donated to the Apache Arrow project in February 2019. Source is here.

See my article How To Build a Modern Distributed Compute Platform to learn about the design and my motivation for building this. The TL;DR is that this project is a great way to learn about building a query engine but this is quite early and not usable for any real world work just yet.

Status

The current code supports single-threaded execution of limited SQL queries (projection, selection, and aggregates) against CSV files. Parquet files will be supported shortly.

To use DataFusion as a crate dependency, add the following to your Cargo.toml:

[dependencies]
datafusion = "0.6.0"

Here is a brief example for running a SQL query against a CSV file. See the examples directory for full examples.

fn main() {
    // create local execution context
    let mut ctx = ExecutionContext::new();

    // define schema for data source (csv file)
    let schema = Arc::new(Schema::new(vec![
        Field::new("city", DataType::Utf8, false),
        Field::new("lat", DataType::Float64, false),
        Field::new("lng", DataType::Float64, false),
    ]));

    // register csv file with the execution context
    let csv_datasource = CsvDataSource::new("test/data/uk_cities.csv", schema.clone(), 1024);
    ctx.register_datasource("cities", Rc::new(RefCell::new(csv_datasource)));

    // simple projection and selection
    let sql = "SELECT city, lat, lng FROM cities WHERE lat > 51.0 AND lat < 53";

    // execute the query
    let relation = ctx.sql(&sql).unwrap();

    // display the relation
    let mut results = relation.borrow_mut();

    while let Some(batch) = results.next().unwrap() {

        println!(
            "RecordBatch has {} rows and {} columns",
            batch.num_rows(),
            batch.num_columns()
        );

        let city = batch
            .column(0)
            .as_any()
            .downcast_ref::<BinaryArray>()
            .unwrap();

        let lat = batch
            .column(1)
            .as_any()
            .downcast_ref::<Float64Array>()
            .unwrap();

        let lng = batch
            .column(2)
            .as_any()
            .downcast_ref::<Float64Array>()
            .unwrap();

        for i in 0..batch.num_rows() {
            let city_name: String = String::from_utf8(city.get_value(i).to_vec()).unwrap();

            println!(
                "City: {}, Latitude: {}, Longitude: {}",
                city_name,
                lat.value(i),
                lng.value(i),
            );
        }
    }
}

Roadmap

See ROADMAP.md for the full roadmap.

Prerequisites

  • Rust nightly (required by parquet-rs crate)

Building DataFusion

See BUILDING.md.

Gitter

There is a Gitter channel where you can ask questions about the project or make feature suggestions too.

Contributing

Contributors are welcome! Please see CONTRIBUTING.md for details.

datafusion's People

Contributors

andygrove avatar atouchet avatar avantgardnerio avatar gnieto avatar hntd187 avatar kensuenobu avatar maccam912 avatar malu avatar mgxm avatar pmaciolek avatar srenatus avatar sunchao avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

datafusion's Issues

Implement data type support using generics and add support for all base types

The current type system uses an enum as a wrapper around some standard types but this should probably be re-implemented using generics instead to make it easier to add new types.

The system should support all the standard primitve types (bool, int, long, float, double) as well as string, date/time, and binary.

Implement integration test

We need an integration test that runs as part of the ci build:

  • Start etcd
  • Start a worker node
  • Run the SQL console to execute a query (probably means we need a command-line parameter for specifying a query to run)

Move code to CircleCI for automated builds

I can see that TravisCI is currently being used, but it might behoove us to move this project to CircleCI.

CircleCI 2.0 gives us a lot of flexibility, and allows us to run things in containers with versions of code that we desire. It also means we don't have to maintain our own build cluster.

Just something to think about. I can take this on if you want; shouldn't take longer than a few hours.

good work

good work, good start, I will learn it and do something.

Some valid/invalid sql query not accepted/accepted

Invalid but parsed successfully:

  • SELECT CREATE EXTERNAL TABLE test (id VARCHAR(1)) FROM foo WHERE 1=2
  • SELECT 1 FROM foo WHERE CREATE EXTERNAL TABLE test (id VARCHAR(1))

Valid (debatable if useful) but not accepted:

  • CREATE EXTERNAL TABLE test ()

I'm trying to fix the parser, but it seems to be non-trivial. Is there a reason why you wrote your own parser and didn't use some parser combinator library like nom or combine?

SQL Parser should never panic

Need to implement correct error handling i.e. returning Result in SQL parser to avoid panics.

Here's an example:

DataFusion Console
$ SELECT * FROM foo
Executing: SELECT * FROM foo
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: ParserError("Prefix parser expected a keyword but found Mult")', /checkout/src/libcore/result.rs:906:4

Dockerize the worker process

Create a Dockerfile to run the worker process (to try out the worker locally just run cargo run worker).

A/C

  • There should be a bash script to create the worker binary (e.g. cargo build --release) and create the Docker image
  • The Docker image should not contain Rust or Cargo, just the pre-compiled worker executable
  • Port 80 should be exposed (for web UI and REST API)
  • It should be possible to map a volume containing user-defined types and functions

Default configuration file

I wonder if we could write a simple configuration file.
A very simple .toml file that defines all the necessary configurations.
So when the project forwards and needs more some configurations it would be easier to use and we do not need to pass all the arguments every time.

For example:

# Here we can put some configuration that
# are common to both
[datafusion]
etcd = "http://127.0.0.1:2379"

[worker]
bind = "0.0.0.0:8080"
data_dir = "./path/data_dir"
webroot = "./src/bin/worker"

[console]
# some specific configuration
# to the console

When we initialize the console or the worker we could pass a flag that searches for the configuration.

  • will try to load the configuration from a default path
    worker -c
  • or we can pass the path to configuration file
    worker -c /path/to/config.toml

And likewise for the console

Great feedback from reddit

I looked at the code 2-3 weeks ago when it was first announced with the 2018 big-data on Rust blog post. I took a cursory look today. I am going through similar exercise in Rust for scratching my own itch after working 4+ years in the same domain on JVM/Scala/etc.

I would like this project to succeed so here are some observations that may help in the long run (caveat - I may have missed some of the points in your code).

The streaming model is more general and it can be easily relaxed to batch processing - the inverse is always hard. On the other side batch processing is more efficient than processing event-by-event. In my experience something like micro-batching works best in terms of flexibility and performance.

Columnar oriented processing is more efficient on the current crop of hardware as it plays better with dispatch overhead, cache locality and data prefetching. It is also a natural extension of micro-batching from point 1. My own benchmarks show 20x difference with batches of 1024 tuples, around 2-3 cycles per arithmetic operation on f64, including SQL NULL correctness.

You definitely don't want to dispatch on types in the leaves, e.g. in the function/expression bodies like https://github.com/andygrove/datafusion-rs/blob/master/src/functions/math.rs#L14 - The data types are compilation concept, they should not exist at run-time. You can use generics, type erasure and columnar dispatch to get it. Think about this in the same vein as Rust vs. Ruby - Rust is faster because it does not need to check the run-time type on each operation because the compiler guarantees it.

Instead of doing switch type interpreter, where you have dispatch cost on each event you may want to think about expressions in more functional way, e.g.

fn gteq(lhs:expr, rhs:expr) -> Box<Fn(input: tuple) -> value>

or if you get into columnar approach:

fn gteq(lhs: expr, rhs: expr) -> Box<Fn(in: &Frame) -> Column>

and compose the whole computation just once before executing it: following a reference once per 1000 tuples is a lot cheaper than going trough a match on each tuple.

You can take a look at a minimalistic example I put together some time ago: https://gist.github.com/luben/95c1c05f36ec56a57f5624c1b40e9f11

worker should keep trying to ping etcd and not give up on first error

Currently the worker terminates if it cannot connect to etcd immediately. It should instead keep trying in case etcd is still starting up.

Worker 8987a3f3-71e1-5cca-aadf-bc165f528fac listening on 127.0.0.1:8088 and serving content from /opt/datafusion/www
Heartbeat loop failed: Error { repr: Kind(NotFound) }

Generate closures from relational plan

Instead of interpreting the plan and expressions at runtime, the executor should generate closures from the plan and these closures can be called per row. This should be faster.

Implement CREATE EXTERNAL TABLE

We need a way to register CSV files and defining a schema via SQL so we can query CSV from a SQL console without having to write Rust code.

The standard way of doing this is by using a CREATE EXTERNAL TABLE command that defines the schema and provides other information required so open the CSV, like the directory path.

Here are docs for how Hive does this:

https://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.6.3/bk_data-access/content/moving_data_from_hdfs_to_hive_external_table_method.html

Design user-defined type traits

Datafusion needs to be able to process tuples containing values that are of a user-defined type not known at compile time. Datafusion doesn't need to understand the types too much but needs to be able to perform serde operations and pass them to user-defined functions.

HDFS Support

This is the lowest common denominator for being to run some distributed workloads on existing Hadoop clusters.

There is no working pure Rust HDFS library unfortunately, so we'll probably have to start with this one which wraps a C library that wraps the Java client.

http://hyunsik.github.io/hdfs-rs/hdfs/index.html

Scalar functions

It should be possible to use some pre-defined scalar functions in a query. For example:

SELECT sqrt(x) FROM foo

SELECT * FROM foo WHERE sqrt(x) < sqrt(y)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.