Giter VIP home page Giter VIP logo

datafusion-python's Introduction

DataFusion in Python

This code has been donated to Datafusion Apache Arrow and development will continue there. Thank you for your understanding!

This is a Python library that binds to Apache Arrow in-memory query engine DataFusion.

Like pyspark, it allows you to build a plan through SQL or a DataFrame API against in-memory data, parquet or CSV files, run it in a multi-threaded environment, and obtain the result back in Python.

It also allows you to use UDFs and UDAFs for complex operations.

The major advantage of this library over other execution engines is that this library achieves zero-copy between Python and its execution engine: there is no cost in using UDFs, UDAFs, and collecting the results to Python apart from having to lock the GIL when running those operations.

Its query engine, DataFusion, is written in Rust, which makes strong assumptions about thread safety and lack of memory leaks.

Technically, zero-copy is achieved via the c data interface.

How to use it

Simple usage:

import datafusion
import pyarrow

# an alias
f = datafusion.functions

# create a context
ctx = datafusion.ExecutionContext()

# create a RecordBatch and a new DataFrame from it
batch = pyarrow.RecordBatch.from_arrays(
    [pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
    names=["a", "b"],
)
df = ctx.create_dataframe([[batch]])

# create a new statement
df = df.select(
    f.col("a") + f.col("b"),
    f.col("a") - f.col("b"),
)

# execute and collect the first (and only) batch
result = df.collect()[0]

assert result.column(0) == pyarrow.array([5, 7, 9])
assert result.column(1) == pyarrow.array([-3, -3, -3])

UDFs

def is_null(array: pyarrow.Array) -> pyarrow.Array:
    return array.is_null()

udf = f.udf(is_null, [pyarrow.int64()], pyarrow.bool_())

df = df.select(udf(f.col("a")))

UDAF

import pyarrow
import pyarrow.compute


class Accumulator:
    """
    Interface of a user-defined accumulation.
    """
    def __init__(self):
        self._sum = pyarrow.scalar(0.0)

    def to_scalars(self) -> [pyarrow.Scalar]:
        return [self._sum]

    def update(self, values: pyarrow.Array) -> None:
        # not nice since pyarrow scalars can't be summed yet. This breaks on `None`
        self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(values).as_py())

    def merge(self, states: pyarrow.Array) -> None:
        # not nice since pyarrow scalars can't be summed yet. This breaks on `None`
        self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(states).as_py())

    def evaluate(self) -> pyarrow.Scalar:
        return self._sum


df = ...

udaf = f.udaf(Accumulator, pyarrow.float64(), pyarrow.float64(), [pyarrow.float64()])

df = df.aggregate(
    [],
    [udaf(f.col("a"))]
)

How to install

pip install datafusion

How to develop

This assumes that you have rust and cargo installed. We use the workflow recommended by pyo3 and maturin.

Bootstrap:

# fetch this repo
git clone [email protected]:jorgecarleitao/datafusion-python.git

cd datafusion-python

# prepare development environment (used to build wheel / install in development)
python -m venv venv
venv/bin/pip install maturin==0.8.2 toml==0.10.1

Whenever rust code changes (your changes or via git pull):

venv/bin/maturin develop
venv/bin/python -m unittest discover tests

datafusion-python's People

Contributors

alippai avatar jorgecarleitao avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

datafusion-python's Issues

Rename Cargo.toml project name?

Would you kindly consider renaming the name of the project in Cargo.toml from "datafusion" to something else, perhaps "datafusion-python"? I'd like to include this project in my Rust project, but cannot because "datafusion" is used in my project to refer to Arrow Datafusion.

thanks!

PyArrow array type translation seems to not be correct

The type numbers seem off when translating PyArrow arrays using PyArrow 3.0.

I did a bit of debugging and found the following for example:

Object to be extracted: TimestampType(timestamp[ns])  id: Ok(18)
Object to be extracted: DataType(string)  id: Ok(13)
Object to be extracted: DataType(int32)  id: Ok(7)

(This is by adding debugging to extract() function)

Expose ExecutionContext methods to Python

I really like the Python interface you guys build to DataFusion, however would it be possible to expose more of the ExecutionContext methods to the Python interface, to have more fine-grained control over the execution? Methods such as:

  • create_logical_plan
  • optimize
  • create_physical_plan

Speed comparison of package compared to Pandas / Pyarrow_ops

I wrote a comparison between this package and pandas / pyarrow_ops on a task where we join, group and aggregate two tables with ~300K rows. The Datafusion package is about 3-5 times slower than its alternatives.

What is causing this performance hit? Is it the serialization between C / Python or is it the performance of DataFusion itself?

import datafusion, time
import pyarrow as pa
import pyarrow.parquet as pq
from pyarrow_ops import head, join, groupby

f1 = "path to local parquet file"
f2 = "path to local parquet file"

# Pandas
time1 = time.time()
t1 = pq.read_table(f1, columns=['sku_key', 'option_key']).to_pandas()
t2 = pq.read_table(f2, columns=['sku_key', 'economical']).to_pandas()
r = t1.merge(t2, on=['sku_key']).groupby(['option_key']).agg({'economical': 'sum'})
print("Query in Pandas took:", time.time() - time1)
print(r.head())

# Pyarrow ops
time2 = time.time()
t1 = pq.read_table(f1, columns=['sku_key', 'option_key'])
t2 = pq.read_table(f2, columns=['sku_key', 'economical'])
r = groupby(join(t1, t2, on=['sku_key']), by=['option_key']).agg({'economical': 'sum'})
print("\nQuery in Pyarrow ops took:", time.time() - time2)
head(r)

# Datafusion
time3 = time.time()

f = datafusion.functions
ctx = datafusion.ExecutionContext()
ctx.register_parquet("skus", f1)
ctx.register_parquet("stock_current", f2)
result = ctx.sql("SELECT option_key, SUM(economical) as stock FROM stock_current as sc JOIN skus as sk USING (sku_key) GROUP BY option_key").collect()
r = pa.Table.from_batches(result)
print("\nQuery in DataFusion took:", time.time() - time3)
head(r)
Query in Pandas took: 0.6710879802703857
            economical
option_key
15847197             4
15978197           455
15984669            56
15985197           907
16066197           460

Query in Pyarrow ops took: 1.0179059505462646
Row  option_key  economical
0    15847197    4
1    15978197    455
2    15984669    56
3    15985197    907
4    16066197    460



Query in DataFusion took: 3.2192792892456055
Row  option_key  stock
0    26284326    0
1    25214207    -1
2    30372204    16
3    33163308    156
4    26880505    10

Cannot build

Hey Jorge,

Great work. I can't build it though, when I clone and repo and run Maturin develop I get:

  Compiling indoc-impl v0.3.6
   Compiling ghost v0.1.2
   Compiling tokio-macros v0.2.6
   Compiling inventory-impl v0.1.10
   Compiling ctor v0.1.20
   Compiling pyo3cls v0.12.4
   Compiling indoc v0.3.6
   Compiling tokio v0.2.25
   Compiling futures-util v0.3.14
   Compiling thiserror v1.0.24
   Compiling flatbuffers v0.8.4
error[E0658]: const generics are unstable======================>     ] 204/222: async-trait, packed_s...
  --> /Users/evan/.cargo/registry/src/github.com-1ecc6299db9ec823/flatbuffers-0.8.4/src/array.rs:28:35
   |
28 | pub struct Array<'a, T: 'a, const N: usize>(&'a [u8], PhantomData<T>);
   |                                   ^
   |
   = note: see issue #74878 <https://github.com/rust-lang/rust/issues/74878> for more information
   = help: add `#![feature(min_const_generics)]` to the crate attributes to enable


error[E0658]: const generics are unstable======================>     ] 204/222: async-trait, packed_s...
  --> /Users/evan/.cargo/registry/src/github.com-1ecc6299db9ec823/flatbuffers-0.8.4/src/array.rs:30:23
   |
30 | impl<'a, T: 'a, const N: usize> Debug for Array<'a, T, N>
   |                       ^
   |
   = note: see issue #74878 <https://github.com/rust-lang/rust/issues/74878> for more information
   = help: add `#![feature(min_const_generics)]` to the crate attributes to enable


error[E0658]: const generics are unstable======================>     ] 204/222: async-trait, packed_s...
  --> /Users/evan/.cargo/registry/src/github.com-1ecc6299db9ec823/flatbuffers-0.8.4/src/array.rs:40:23
   |
40 | impl<'a, T: 'a, const N: usize> Array<'a, T, N> {
   |                       ^
   |
   = note: see issue #74878 <https://github.com/rust-lang/rust/issues/74878> for more information
   = help: add `#![feature(min_const_generics)]` to the crate attributes to enable


error[E0658]: const generics are unstable
  --> /Users/evan/.cargo/registry/src/github.com-1ecc6299db9ec823/flatbuffers-0.8.4/src/array.rs:57:36
   |
57 | impl<'a, T: Follow<'a> + 'a, const N: usize> Array<'a, T, N> {
   |                                    ^
   |
   = note: see issue #74878 <https://github.com/rust-lang/rust/issues/74878> for more information
   = help: add `#![feature(min_const_generics)]` to the crate attributes to enable

It seems maybe this has to do with using an older nightly. Maybe try a newer one?
(NOTE: I tried a newer one but then ran into linker problems. I'm using OSX 10.15.7)

link to github repo in pypi

Took me awhile to find out the github repo ;) it would be nice to link it from pypi to help other users find the source code and contribute. I think you can just set the project-url key in maturin metadata config to achieve that.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.