Giter VIP home page Giter VIP logo

dask-sql's Introduction

Conda PyPI GitHub Workflow Status Read the Docs Codecov GitHub Binder

SQL + Python

dask-sql is a distributed SQL query engine in Python. It allows you to query and transform your data using a mixture of common SQL operations and Python code and also scale up the calculation easily if you need it.

  • Combine the power of Python and SQL: load your data with Python, transform it with SQL, enhance it with Python and query it with SQL - or the other way round. With dask-sql you can mix the well known Python dataframe API of pandas and Dask with common SQL operations, to process your data in exactly the way that is easiest for you.
  • Infinite Scaling: using the power of the great Dask ecosystem, your computations can scale as you need it - from your laptop to your super cluster - without changing any line of SQL code. From k8s to cloud deployments, from batch systems to YARN - if Dask supports it, so will dask-sql.
  • Your data - your queries: Use Python user-defined functions (UDFs) in SQL without any performance drawback and extend your SQL queries with the large number of Python libraries, e.g. machine learning, different complicated input formats, complex statistics.
  • Easy to install and maintain: dask-sql is just a pip/conda install away (or a docker run if you prefer).
  • Use SQL from wherever you like: dask-sql integrates with your jupyter notebook, your normal Python module or can be used as a standalone SQL server from any BI tool. It even integrates natively with Apache Hue.
  • GPU Support: dask-sql supports running SQL queries on CUDA-enabled GPUs by utilizing RAPIDS libraries like cuDF, enabling accelerated compute for SQL.

Read more in the documentation.

dask-sql GIF

Example

For this example, we use some data loaded from disk and query them with a SQL command from our python code. Any pandas or dask dataframe can be used as input and dask-sql understands a large amount of formats (csv, parquet, json,...) and locations (s3, hdfs, gcs,...).

import dask.dataframe as dd
from dask_sql import Context

# Create a context to hold the registered tables
c = Context()

# Load the data and register it in the context
# This will give the table a name, that we can use in queries
df = dd.read_csv("...")
c.create_table("my_data", df)

# Now execute a SQL query. The result is again dask dataframe.
result = c.sql("""
    SELECT
        my_data.name,
        SUM(my_data.x)
    FROM
        my_data
    GROUP BY
        my_data.name
""", return_futures=False)

# Show the result
print(result)

Quickstart

Have a look into the documentation or start the example notebook on binder.

dask-sql is currently under development and does so far not understand all SQL commands (but a large fraction). We are actively looking for feedback, improvements and contributors!

Installation

dask-sql can be installed via conda (preferred) or pip - or in a development environment.

With conda

Create a new conda environment or use your already present environment:

conda create -n dask-sql
conda activate dask-sql

Install the package from the conda-forge channel:

conda install dask-sql -c conda-forge

With pip

You can install the package with

pip install dask-sql

For development

If you want to have the newest (unreleased) dask-sql version or if you plan to do development on dask-sql, you can also install the package from sources.

git clone https://github.com/dask-contrib/dask-sql.git

Create a new conda environment and install the development environment:

conda env create -f continuous_integration/environment-3.9.yaml

It is not recommended to use pip instead of conda for the environment setup.

After that, you can install the package in development mode

pip install -e ".[dev]"

The Rust DataFusion bindings are built as part of the pip install. Note that if changes are made to the Rust source in src/, another build must be run to recompile the bindings. This repository uses pre-commit hooks. To install them, call

pre-commit install

Testing

You can run the tests (after installation) with

pytest tests

GPU-specific tests require additional dependencies specified in continuous_integration/gpuci/environment.yaml. These can be added to the development environment by running

conda env update -n dask-sql -f continuous_integration/gpuci/environment.yaml

And GPU-specific tests can be run with

pytest tests -m gpu --rungpu

SQL Server

dask-sql comes with a small test implementation for a SQL server. Instead of rebuilding a full ODBC driver, we re-use the presto wire protocol. It is - so far - only a start of the development and missing important concepts, such as authentication.

You can test the sql presto server by running (after installation)

dask-sql-server

or by using the created docker image

docker run --rm -it -p 8080:8080 nbraun/dask-sql

in one terminal. This will spin up a server on port 8080 (by default) that looks similar to a normal presto database to any presto client.

You can test this for example with the default presto client:

presto --server localhost:8080

Now you can fire simple SQL queries (as no data is loaded by default):

=> SELECT 1 + 1;
 EXPR$0
--------
    2
(1 row)

You can find more information in the documentation.

CLI

You can also run the CLI dask-sql for testing out SQL commands quickly:

dask-sql --load-test-data --startup

(dask-sql) > SELECT * FROM timeseries LIMIT 10;

How does it work?

At the core, dask-sql does two things:

  • translate the SQL query using DataFusion into a relational algebra, which is represented as a logical query plan - similar to many other SQL engines (Hive, Flink, ...)
  • convert this description of the query into dask API calls (and execute them) - returning a dask dataframe.

For the first step, Arrow DataFusion needs to know about the columns and types of the dask dataframes, therefore some Rust code to store this information for dask dataframes are defined in dask_planner. After the translation to a relational algebra is done (using DaskSQLContext.logical_relational_algebra), the python methods defined in dask_sql.physical turn this into a physical dask execution plan by converting each piece of the relational algebra one-by-one.

dask-sql's People

Contributors

andygrove avatar ayushdg avatar brandon-b-miller avatar charlesbluca avatar chrisjar avatar dacet avatar dependabot[bot] avatar filabrazilska avatar github-actions[bot] avatar goodwanghan avatar jakirkham avatar jdye64 avatar ksonj avatar nickvazz avatar nils-braun avatar peterlappo avatar quasiben avatar rajagurunath avatar randerzander avatar rjzamora avatar sarahyurick avatar scharlottej13 avatar vibhujawa avatar wence- avatar yuhuishishishi avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

dask-sql's Issues

Add a packaged version of dask-sql

Currently, dask-sql can only be installed via the source. We should find out, if uploading the packaged jar (contained in a wheel) together with the python code makes sense and if and how we can create a conda package (probably via conda-forge).

benchmarks?

I know they are mostly useless, but they still might add some value. Have any been done?

Check optimization rules

The rules currently implemented in the RelationalAlgebraGenerator java class are a mixture of the standard rules and the ones from the blazingSQL project.
So far, they have worked quite well. However, it might be good to check, if we have the best set of rules for optimization in place.

Make databricks/hive/... interaction possible via SQL

Currently, many data sources can be accessed via the usual context.create_table methods, but not so many via the CREATE TABLE SQL call, as there is e.g. no possibility to create a sqlalchemy cursor via SQL commands.
That should be changed, e.g. when the user passes a known sqlalchemy connection string to the CREATE TABLE call we can directly use it to create the table.

Show better error messages

When there is a SQL validation error, the error is shown on the screen. However, we always get a nested Java and Python exception, which might lead to confusion.
The blazingSQL project created their own SQL exception in Java for this - we might also get along with just catching and parsing the Java exception in python and printing out a piece of more useful information to the user in the Context.sql function.

Question: How to add table from DB SQL like postgresql that data can query live data

Because from the example in doc, right now dask-sql only support:

  • dataframe from pandas & dask (data created just for first time)
  • hive metastore (get cloud storage & hdfs file metadata and can get update live data)
  • cloud storage (can get update live data)

Any possibility right now to create table from DB SQL that data can query/get live data?
May be I miss read a doc.

Ty

Complex join fails with memory error

From @timhdesilva

So I have a large dataset (50GB) that needs to be merged with a small dataset that is a Pandas dataframe. Prior to the merge, I need to perform a groupby observation on the large dataset. Using Dask, I have been able to perform the groupby observation on the large dataset (which is a Dask dataframe). When I then merge the two datasets using X.merge(Y), I have no issues. The problem is that I need to perform a merge than is not exact (i.e. one column between two others), which is why I'm turning to dask-sql. When I try to do the merge with dask-sql though, I get a memory error (the number of observations should only be ~ 10x than the exact merge, so memory shouldn't be a problem).

Any ideas here? I'm thinking somehow the issue might be that I am performing a groupby operation on the Dask dataframe prior to the dask-sql merge. Is this allowed - i.e. can one do a groupby and not execute it prior to using the dask-sql create_table() command and then performing a dask-sql merge with c.sql?

Use the correct java path also directly after installation

When installing dask-sql via conda, also Java is installed as dependency. This sets the $JAVA_HOME environment variable to the correct path, so that java applications will pick up the correct java installation.
However, it can (of course) only do this for newly spawned processes.
If you install dask-sql in e.g. a jupyter notebook (via "! conda install dask-sql") it won't work.

In this case, the JAVA_HOME needs to be set explicitly to $CONDA_PREFIX.

TypeError: sequence item 0: expected str instance, NoneType found on running python setup.py java on source

$ git clone https://github.com/nils-braun/dask-sql.git

$ cd dask-sql

$ pytest tests
ERROR: usage: pytest [options] [file_or_dir] [file_or_dir] [...]
pytest: error: unrecognized arguments: --cov --cov-config=.coveragerc tests
  inifile: /mnt/d/Programs/dask/dask-sql/pytest.ini
  rootdir: /mnt/d/Programs/dask/dask-sql


$ python setup.py java
running java
Traceback (most recent call last):
  File "setup.py", line 93, in <module>
    command_options={"build_sphinx": {"source_dir": ("setup.py", "docs"),}},
  File "/home/saulo/anaconda3/lib/python3.7/site-packages/setuptools/__init__.py", line 165, in setup
    return distutils.core.setup(**attrs)
  File "/home/saulo/anaconda3/lib/python3.7/distutils/core.py", line 148, in setup
    dist.run_commands()
  File "/home/saulo/anaconda3/lib/python3.7/distutils/dist.py", line 966, in run_commands
    self.run_command(cmd)
  File "/home/saulo/anaconda3/lib/python3.7/distutils/dist.py", line 985, in run_command
    cmd_obj.run()
  File "setup.py", line 30, in run
    self.announce(f"Running command: {' '.join(command)}", level=distutils.log.INFO)
TypeError: sequence item 0: expected str instance, NoneType found

$ python dask-sql-test.py
Traceback (most recent call last):
  File "dask-sql-test.py", line 1, in <module>
    from dask_sql import Context
  File "/mnt/d/Programs/dask/dask-sql/dask_sql/__init__.py", line 1, in <module>
    from .context import Context
  File "/mnt/d/Programs/dask/dask-sql/dask_sql/context.py", line 9, in <module>
    from dask_sql.java import (
  File "/mnt/d/Programs/dask/dask-sql/dask_sql/java.py", line 88, in <module>
    DaskTable = com.dask.sql.schema.DaskTable
AttributeError: Java package 'com' has no attribute 'dask'
$ python -V
Python 3.7.6
$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description:    Ubuntu 20.04.1 LTS
Release:        20.04
Codename:       focal
$ java -version
openjdk version "14.0.2" 2020-07-14
OpenJDK Runtime Environment (build 14.0.2+12-Ubuntu-120.04)
OpenJDK 64-Bit Server VM (build 14.0.2+12-Ubuntu-120.04, mixed mode, sharing)

ModuleNotFoundError: No module named 'importlib.metadata'

ModuleNotFoundError: No module named 'importlib.metadata'

$ python -V
Python 3.7.6
$ pip install --upgrade -r requirements.txt
Requirement already up-to-date: dask in ./.venv/lib/python3.7/site-packages (from -r requirements.txt (line 1)) (2021.1.1)
Requirement already up-to-date: dask-sql in ./.venv/lib/python3.7/site-packages (from -r requirements.txt (line 3)) (0.3.0)
Requirement already up-to-date: distributed in ./.venv/lib/python3.7/site-packages (from -r requirements.txt (line 4)) (2021.1.1)
Requirement already up-to-date: importlib-metadata in ./.venv/lib/python3.7/site-packages (from -r requirements.txt (line 5)) (3.4.0)
Requirement already satisfied, skipping upgrade: pyyaml in ./.venv/lib/python3.7/site-packages (from dask->-r requirements.txt (line 1)) (5.4.1)
Requirement already satisfied, skipping upgrade: jpype1>=1.0.2 in ./.venv/lib/python3.7/site-packages (from dask-sql->-r requirements.txt (line 3)) (1.2.1)
Requirement already satisfied, skipping upgrade: tzlocal>=2.1 in ./.venv/lib/python3.7/site-packages (from dask-sql->-r requirements.txt (line 3)) (2.1)
Requirement already satisfied, skipping upgrade: fastapi>=0.61.1 in ./.venv/lib/python3.7/site-packages (from dask-sql->-r requirements.txt (line 3)) (0.63.0)
Requirement already satisfied, skipping upgrade: pygments in ./.venv/lib/python3.7/site-packages (from dask-sql->-r requirements.txt (line 3)) (2.7.4)
Requirement already satisfied, skipping upgrade: uvicorn>=0.11.3 in ./.venv/lib/python3.7/site-packages (from dask-sql->-r requirements.txt (line 3)) (0.13.3)
Requirement already satisfied, skipping upgrade: pandas<1.2.0 in ./.venv/lib/python3.7/site-packages (from dask-sql->-r requirements.txt (line 3)) (1.1.5)
Requirement already satisfied, skipping upgrade: prompt-toolkit in ./.venv/lib/python3.7/site-packages (from dask-sql->-r requirements.txt (line 3)) (3.0.14)
Requirement already satisfied, skipping upgrade: zict>=0.1.3 in ./.venv/lib/python3.7/site-packages (from distributed->-r requirements.txt (line 4)) (2.0.0)
Requirement already satisfied, skipping upgrade: psutil>=5.0 in ./.venv/lib/python3.7/site-packages (from distributed->-r requirements.txt (line 4)) (5.8.0)
Requirement already satisfied, skipping upgrade: click>=6.6 in ./.venv/lib/python3.7/site-packages (from distributed->-r requirements.txt (line 4)) (7.1.2)
Requirement already satisfied, skipping upgrade: sortedcontainers!=2.0.0,!=2.0.1 in ./.venv/lib/python3.7/site-packages (from distributed->-r requirements.txt (line 4)) (2.3.0)
Requirement already satisfied, skipping upgrade: setuptools in ./.venv/lib/python3.7/site-packages (from distributed->-r requirements.txt (line 4)) (41.2.0)
Requirement already satisfied, skipping upgrade: tornado>=5; python_version < "3.8" in ./.venv/lib/python3.7/site-packages (from distributed->-r requirements.txt (line 4)) (6.1)
Requirement already satisfied, skipping upgrade: toolz>=0.8.2 in ./.venv/lib/python3.7/site-packages (from distributed->-r requirements.txt (line 4)) (0.11.1)
Requirement already satisfied, skipping upgrade: msgpack>=0.6.0 in ./.venv/lib/python3.7/site-packages (from distributed->-r requirements.txt (line 4)) (1.0.2)
Requirement already satisfied, skipping upgrade: cloudpickle>=1.5.0 in ./.venv/lib/python3.7/site-packages (from distributed->-r requirements.txt (line 4)) (1.6.0)
Requirement already satisfied, skipping upgrade: tblib>=1.6.0 in ./.venv/lib/python3.7/site-packages (from distributed->-r requirements.txt (line 4)) (1.7.0)
Requirement already satisfied, skipping upgrade: typing-extensions>=3.6.4; python_version < "3.8" in ./.venv/lib/python3.7/site-packages (from importlib-metadata->-r requirements.txt (line 5)) (3.7.4.3)
Requirement already satisfied, skipping upgrade: zipp>=0.5 in ./.venv/lib/python3.7/site-packages (from importlib-metadata->-r requirements.txt (line 5)) (3.4.0)
Requirement already satisfied, skipping upgrade: pytz in ./.venv/lib/python3.7/site-packages (from tzlocal>=2.1->dask-sql->-r requirements.txt (line 3)) (2020.5)
Requirement already satisfied, skipping upgrade: pydantic<2.0.0,>=1.0.0 in ./.venv/lib/python3.7/site-packages (from fastapi>=0.61.1->dask-sql->-r requirements.txt (line 3)) (1.7.3)
Requirement already satisfied, skipping upgrade: starlette==0.13.6 in ./.venv/lib/python3.7/site-packages (from fastapi>=0.61.1->dask-sql->-r requirements.txt (line 3)) (0.13.6)
Requirement already satisfied, skipping upgrade: h11>=0.8 in ./.venv/lib/python3.7/site-packages (from uvicorn>=0.11.3->dask-sql->-r requirements.txt (line 3)) (0.12.0)
Requirement already satisfied, skipping upgrade: numpy>=1.15.4 in ./.venv/lib/python3.7/site-packages (from pandas<1.2.0->dask-sql->-r requirements.txt (line 3)) (1.19.5)
Requirement already satisfied, skipping upgrade: python-dateutil>=2.7.3 in ./.venv/lib/python3.7/site-packages (from pandas<1.2.0->dask-sql->-r requirements.txt (line 3)) (2.8.1)
Requirement already satisfied, skipping upgrade: wcwidth in ./.venv/lib/python3.7/site-packages (from prompt-toolkit->dask-sql->-r requirements.txt (line 3)) (0.2.5)
Requirement already satisfied, skipping upgrade: heapdict in ./.venv/lib/python3.7/site-packages (from zict>=0.1.3->distributed->-r requirements.txt (line 4)) (1.0.1)
Requirement already satisfied, skipping upgrade: six>=1.5 in ./.venv/lib/python3.7/site-packages (from python-dateutil>=2.7.3->pandas<1.2.0->dask-sql->-r requirements.txt (line 3)) (1.15.0)
$ java --version
openjdk 14.0.2 2020-07-14
OpenJDK Runtime Environment (build 14.0.2+12-Ubuntu-120.04)
OpenJDK 64-Bit Server VM (build 14.0.2+12-Ubuntu-120.04, mixed mode, sharing)
$ lsb_release -a
No LSB modules are available.
Distributor ID: Ubuntu
Description:    Ubuntu 20.04.1 LTS
Release:        20.04
Codename:       focal

Error: Unable to instantiate java compiler

Hi! @nils-braun,

As you already know I mistakenly opened this issue on Dask-Docker repo and you were kindly alerted by @jrbourbeau

I will copy/paste my original post here as well as your initial answer (Thank you for your quick reply)

Here is my original post:

####################################################################

What happened:

After installing Java and dask-sql using pip, whenever I try to run a SQL query from my python code I get the following error:

...
File "/home/vquery/.local/lib/python3.8/site-packages/dask_sql/context.py", line 378, in sql
    rel, select_names, _ = self._get_ral(sql)
  File "/home/vquery/.local/lib/python3.8/site-packages/dask_sql/context.py", line 515, in _get_ral
    nonOptimizedRelNode = generator.getRelationalAlgebra(validatedSqlNode)
java.lang.java.lang.IllegalStateException: java.lang.IllegalStateException: Unable to instantiate java compiler
...
...
File "JaninoRelMetadataProvider.java", line 426, in org.apache.calcite.rel.metadata.JaninoRelMetadataProvider.compile
  File "CompilerFactoryFactory.java", line 61, in org.codehaus.commons.compiler.CompilerFactoryFactory.getDefaultCompilerFactory
java.lang.java.lang.NullPointerException: java.lang.NullPointerException

What you expected to happen:

I should get a dataframe as a result.

Minimal Complete Verifiable Example:

# The cluster/client setup is done first, in another module not the one executing the SQL query
# Also tried other cluster/scheduler types with the same error
from dask.distributed import Client, LocalCluster
cluster = LocalCluster(
    n_workers=4,
    threads_per_worker=1,
    processes=False,
    dashboard_address=':8787',
    asynchronous=False,
    memory_limit='1GB'
    )
client = Client(cluster)

# The SQL code is executed in its own module
import dask.dataframe as dd
from dask_sql import Context

c = Context()
df = dd.read_parquet('/vQuery/files/results/US_Accidents_June20.parquet') 
c.register_dask_table(df, 'df')
df = c.sql("""select ID, Source from df""") # This line fails with the error reported

Anything else we need to know?:

As mentioned in the code snippet above, due to the way my application is designed, the Dask client/cluster setup is done before dask-sql context is created.

Environment:

  • Dask version:
    • dask: 2020.12.0
    • dask-sql: 0.3.1
  • Python version:
    • Python 3.8.5
  • Operating System:
    • Ubuntu 20.04.1 LTS
  • Install method (conda, pip, source):
    • pip
  • Application Framework
    • Jupyter Notebook/Ipywidgets & Voila Server

Install steps

$ sudo apt install default-jre

$ sudo apt install default-jdk

$ java -version
openjdk version "11.0.10" 2021-01-19
OpenJDK Runtime Environment (build 11.0.10+9-Ubuntu-0ubuntu1.20.04)
OpenJDK 64-Bit Server VM (build 11.0.10+9-Ubuntu-0ubuntu1.20.04, mixed mode, sharing)

$ javac -version
javac 11.0.10

$ echo $JAVA_HOME
/usr/lib/jvm/java-11-openjdk-amd64

$ pip install dask-sql

$ pip list | grep dask-sql
dask-sql               0.3.1

Unify the API of dask-sql and blazing-sql

After a very nice discussion with @felipeblazing (thanks again for reaching out!), we decided to unify the APIs of dask-sql and blazing-sql, so that users can switch between the different packages more easily (and therefore switch between "normal" CPU dask and GPU-enabled rapids). As blazing has been around much longer already, I think it would make sense to use blazings API as the target for dask-sql.

The entrypoint to all applications with dask-sql or blazing is to instantiate a context and then call methods on it.
blazing currently has the following methods on the context:

File System:
	.s3
	.localfs
	.hdfs
	.gs
	.show_filesystems

SQL:
	.explain
	.create_table
	.drop_table
	.sql

The SQL methods can be implemented quite quickly as they are basically already present (just with different names).
The FS methods are very interesting to have (see #28), so they definitely should come next.
There is another method, .log, which however only makes sense in the blazing context and is very hard to port.

Apart from the context API, there is of course also the very important question if the same SQL queries are understood.
That should be handled next.

Unnamed Exprs in column names after aggregation

import dask
import dask_sql
context = dask_sql.Context()
df = dask.datasets.timeseries()
context.register_dask_table(df, "timeseries")
context.sql("SELECT sum(x), sum(y) from timeseries").compute()
        EXPR$0      EXPR$1
0 -1072.053022  762.750336

Add binder badge

Would be useful to have a cluster available for someone who wants to quickly try. I'll be happy to help here.

Implement IS NOT NULL

Reminder for later. Currently
SELECT * from $table WHERE col_b IS NOT NULL
results in:

try:
            operation = self.OPERATION_MAPPING[operator_name]
        except KeyError:
>           raise NotImplementedError(f"{operator_name} not (yet) implemented")
E           NotImplementedError: IS NOT NULL not (yet) implemented

Avoid scary tracebacks in malformed query

I mistakenly missed the quotes around a name in my query

import dask
import dask_sql
context = dask_sql.Context()
df = dask.datasets.timeseries()
context.register_dask_table(df, "timeseries")
context.sql("SELECT * from timeseries where name=Tim")

And I got a scary traceback

---------------------------------------------------------------------------
org.apache.calcite.sql.validate.SqlValidatorExceptionTraceback (most recent call last)
org.apache.calcite.sql.validate.SqlValidatorException: org.apache.calcite.sql.validate.SqlValidatorException: Column 'Tim' not found in any table

The above exception was the direct cause of the following exception:

org.apache.calcite.runtime.CalciteContextExceptionTraceback (most recent call last)
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in com.dask.sql.application.RelationalAlgebraGenerator.getRelationalAlgebra()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in com.dask.sql.application.RelationalAlgebraGenerator.getValidatedNode()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.prepare.PlannerImpl.validate()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SqlValidatorImpl.validate()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.SqlSelect.validate()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.AbstractNamespace.validate()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SelectNamespace.validateImpl()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SqlValidatorImpl.validateWhereClause()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SqlValidatorImpl.expand()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.SqlCall.accept()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SqlScopedShuttle.visit()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SqlScopedShuttle.visit()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visitScoped()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.SqlOperator.acceptCall()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.util.SqlShuttle$CallCopyingArgHandler.visitChild()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.SqlIdentifier.accept()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.DelegatingScope.fullyQualify()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.SqlUtil.newContextException()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.sql.SqlUtil.newContextException()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.runtime.Resources$ExInstWithCause.ex()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in java.lang.reflect.Constructor.newInstance()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in java.lang.reflect.Constructor.newInstanceWithCaller()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0()

org.apache.calcite.runtime.CalciteContextException: org.apache.calcite.runtime.CalciteContextException: From line 1, column 37 to line 1, column 39: Column 'Tim' not found in any table

The above exception was the direct cause of the following exception:

Exception                                 Traceback (most recent call last)
~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in com.dask.sql.application.RelationalAlgebraGenerator.getRelationalAlgebra()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in com.dask.sql.application.RelationalAlgebraGenerator.getValidatedNode()

~/miniconda/lib/python3.7/site-packages/_jpype.cpython-37m-x86_64-linux-gnu.so in org.apache.calcite.prepare.PlannerImpl.validate()

Exception: Java Exception

The above exception was the direct cause of the following exception:

org.apache.calcite.tools.ValidationExceptionTraceback (most recent call last)
<ipython-input-8-8095ddfbf467> in <module>
----> 1 context.sql("SELECT * from timeseries where name=Tim")

~/miniconda/lib/python3.7/site-packages/dask_sql/context.py in sql(self, sql, debug)
     71         """
     72         # TODO: show a nice error message if something is broken
---> 73         rel = self._get_ral(sql, debug=debug)
     74         df = RelConverter.convert(rel, tables=self.tables)
     75         return df

~/miniconda/lib/python3.7/site-packages/dask_sql/context.py in _get_ral(self, sql, debug)
     94         generator = RelationalAlgebraGenerator(schema)
     95 
---> 96         rel = generator.getRelationalAlgebra(sql)
     97         if debug:  # pragma: no cover
     98             print(generator.getRelationalAlgebraString(rel))

org.apache.calcite.tools.ValidationException: org.apache.calcite.tools.ValidationException: org.apache.calcite.runtime.CalciteContextException: From line 1, column 37 to line 1, column 39: Column 'Tim' not found in any table

I wonder if we maybe want to capture these exceptions and reraise them from a fresh, and much shorter stack.

Connect to Apache Atlas

  • Can dask-sql connect to intake catalog server and using their own Authorization Plugins?

I would love if this 2 project can merge their great features, to be more powerful tools ๐Ÿ™‚

Implement descending sorting

Currently, dask-sql can only handle descending sorting, as the set_index function can not handle anything else.
This might be possible to overcome, e.g. by using a clever way of column redefinitions or by implementing a custom function (instead of set_index).

Make it possible to interact with external data sources

Currently, all data frames need to be registered before using them in dask-sql.
However, it could also be interesting to have dataframes directly from S3 (or any other storage, such as hdfs), from the Hive metastore or by creating temporary views. In the background, one could create dask dataframes and use the normal registration process.

For this, we first need to come up with a good SQL syntax (which is supported by calcite) and/or an API for this.

$JAVA_HOME env var set to $CONDA_PREFIX/Library

When doing from dask_sql import Context it brings up UserWarning: You are running in a conda environment, but the JAVA_PATH is not using it. If this is by mistake, set $JAVA_HOME to $CONDA_PREFIX.. which comes from https://github.com/nils-braun/dask-sql/blob/02e2dad28741e4da84b3a957c0eedee40e262431/dask_sql/utils.py#L31

However, on the command line I see

>>> import os
>>> os.environ["CONDA_PREFIX"]
'C:\\Users\\131416\\AppData\\Local\\Continuum\\anaconda3\\envs\\ds-rm-obr-dev'
>>> os.environ["JAVA_HOME"]
'C:\\Users\\131416\\AppData\\Local\\Continuum\\anaconda3\\envs\\ds-rm-obr-dev\\Library'
>>> from dask_sql import Context
C:\Users\131416\AppData\Local\Continuum\anaconda3\envs\ds-rm-obr-dev\lib\site-packages\dask_sql\utils.py:30: UserWarning: You are running in a conda environment, but the JAVA_PATH is not using it. If this is by mistake, set $JAVA_HOME to $CONDA_PREFIX.
  warnings.warn(

Perhaps the check in https://github.com/nils-braun/dask-sql/blob/main/dask_sql/utils.py#L29 should be something like os.environ["JAVA_HOME"] != os.environ["CONDA_PREFIX"] + 'Library'

Note: i'm also on windows hence the \

Fill the type mappings

There are many SQL types and relevant python types in the SQL to python mappings in mappings.py still missing.
Those can be added one-by-one.

Implement windowing functions

So far, aggregation can only work without windowing (without the OVER keyword).

Unfortunately, I do (still) not know how to implement the very broad windowing techniques from SQL in dask.

Implement all complex operations

These operations need complex types (e.g. collections).
Taken from here and ff.

Value Constructors

  • ROW
  • ( )
  • [ ]
  • ARRAY
  • MAP

Collection Operations

  • ELEMENT
  • CARDINALITY
  • MEMBER OF
  • IS A SET
  • IS NOT A SET
  • IS EMPTY
  • IS NOT EMPTY
  • SUBMULTISET OF
  • NOT SUBMULTISET OF
  • MULTISET UNION [ ALL | DISTINCT ]
  • MULTISET INTERSECT [ ALL | DISTINCT ]
  • MULTISET EXCEPT [ ALL | DISTINCT ]

Period Operations

  • CONTAINS
  • OVERLAPS
  • EQUALS
  • PRECEDES
  • IMMEDIATELY
  • SUCCEEDS
  • IMMEDIATELY

Improve SQL server

Implement more advanced (but required concepts):

  • authentication and authorization
  • install libraries to enable usage of external filesystems (s3, HTTP, etc.) already in the docker image
  • a helm chart for deployment (e.g. together with a prepared BI tool)
  • easier possibilities to connect to an already running dask cluster
  • a nice CLI tool

Improve hue autocompletion

Apache Hue has an integration for dask-sql, where the commands are autocompleted and hightlighted. Currently, this is out of sync due to the recently added new SQL features (e.g. CREATE MODEL).

Implement all simple expression operators

The RexCallPlugin currently can only handle the most common expressions (binary operations, or and and). There are many more of them that could and should be implemented.

Improve ML implementation

The ML implementation is still a bit experimental - we can improve on this:

  • SHOW MODELS and DESCRIBE MODEL
  • Hyperparameter optimizations, AutoML-like behaviour
  • @romainr brought up the idea of exporting models (#191, still missing: onnx - see discussion in the PR by @rajagurunath)
  • and some more showcases and examples

Failed read mapping datetime with timezone in postgresql

Descriptions:

  • When I want to connect/register table from postresql into dask-sql catalog (which successful register), but when I want to query that data got this error message
    NotImplementedError: The python type datetime64[ns, UTC] is not implemented (yet) from this function /dask_sql/mappings.py", line 78, in python_to_sql_type

Reproduction Step Error:

  • from dask_sql import Context
  • c = Context()
  • import dask.dataframe as ddf
  • user_df = ddf.read_sql_table(table="auth_user", schema="public", uri="postgresql://username:password@localhost:5432/dbname",index_col='id')
  • c.create_table("pg.public.auth_user", user_df)
  • c.sql("SELECT * FROM pg.public.auth_user")

Notes:
I'm using this packages:

  • psycopg2-binary 2.8.6
  • dask 2.30.0
  • dask-sql 0.2.2 (I thing because this pypi release version is left behind?)
  • distributed 2.30.0
  • sqlalchemy 1.3.22

Test the usage with BI tools

As the server speaks the presto protocol, which is understood by most of the BI tools, it would be nice to test and showcase it with some of them.
For example I have done a very quick test with both hue and metabase, which look promising - but also have shown some additional quirks in the server implementation.

Improve the SQL server

The SQL server is definitely just a hacky proof-of-concept so far.
It can be improved in multiple ways:

  • at least add some documentation and tests
  • handle multiple connections at the same time
  • error handling and sending the error back to the user
  • authentication
  • is a socketserver really the best library to use?
  • possibility to define dataframes from SQL (e.g. specifying to read from S3, hdfs etc)
  • dockerizing the server, to make it usable in a cluster setup (maybe together with an autoscaling dask cluster or dask gateway)
  • ...

Boolean Where Clause Behavior

Starting from a random timeseries table I played a bit with the where clause and got some unexpected results:

from dask_sql import Context
import dask.datasets

df = dask.datasets.timeseries()
c = Context()
c.create_table("timeseries", df)
In [26]: c.sql("SELECT *\nFROM timeseries AS q48\nWHERE True")
Out[26]:
Dask DataFrame Structure:
                   id    name        x        y
npartitions=30
2000-01-01      int64  object  float64  float64
2000-01-02        ...     ...      ...      ...
...               ...     ...      ...      ...
2000-01-30        ...     ...      ...      ...
2000-01-31        ...     ...      ...      ...
Dask Name: getitem, 210 tasks

However with False, (0 = 1), and (1 = 1) there are failures:

c.sql("SELECT *\nFROM timeseries AS q48\nWHERE (0 = 1)")

...
pandas/_libs/tslibs/timestamps.pyx in pandas._libs.tslibs.timestamps.Timestamp.__new__()
pandas/_libs/tslibs/conversion.pyx in pandas._libs.tslibs.conversion.convert_to_tsobject()
TypeError: Cannot convert input [False] of type <class 'numpy.bool_'> to Timestamp

The above is not totally unexpected -- a pandas dataframe cannot perform df[True]. However, with SQL I believe WHERE Boolean is valid. Is this reasonable to expect dask-sql to handle cleanly while dask/pandas do not?

Wildcards in WHERE clause raises error when column contains NULLS

When I run the following code I get a TypeError:

from dask_sql import Context
import pandas as pd

df = pd.DataFrame({'name':['ifooi','ibooi','tbooi',None], 
                   'number':[1,2,3,4]})

c = Context()
c.register_dask_table(df, 'table')

c.sql("""
SELECT * FROM "table" as s
WHERE s.name NOT LIKE '%foo%'
""")
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-186-e48fca4ecf21> in <module>
----> 1 c.sql("""
      2 SELECT * FROM "table" as s
      3 WHERE s.name NOT LIKE '%foo%'
      4 """)

~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/context.py in sql(self, sql)
    261         try:
    262             rel, select_names = self._get_ral(sql)
--> 263             dc = RelConverter.convert(rel, context=self)
    264         except (ValidationException, SqlParseException) as e:
    265             logger.debug(f"Original exception raised by Java:\n {e}")

~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context)
     54             f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
     55         )
---> 56         df = plugin_instance.convert(rel, context=context)
     57         logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
     58         return df

~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rel/logical/project.py in convert(self, rel, context)
     23     ) -> DataContainer:
     24         # Get the input of the previous step
---> 25         (dc,) = self.assert_inputs(rel, 1, context)
     26 
     27         df = dc.df

~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rel/base.py in assert_inputs(rel, n, context)
     81         from dask_sql.physical.rel.convert import RelConverter
     82 
---> 83         return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
     84 
     85     @staticmethod

~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rel/base.py in <listcomp>(.0)
     81         from dask_sql.physical.rel.convert import RelConverter
     82 
---> 83         return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
     84 
     85     @staticmethod

~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context)
     54             f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
     55         )
---> 56         df = plugin_instance.convert(rel, context=context)
     57         logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
     58         return df

~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rel/logical/filter.py in convert(self, rel, context)
     26         # we just need to apply it here
     27         condition = rel.getCondition()
---> 28         df_condition = RexConverter.convert(condition, dc, context=context)
     29         df = df[df_condition]
     30 

~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rex/convert.py in convert(cls, rex, dc, context)
     60         )
     61 
---> 62         df = plugin_instance.convert(rex, dc, context=context)
     63         logger.debug(f"Processed REX {rex} into {LoggableDataFrame(df)}")
     64         return df

~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rex/core/call.py in convert(self, rex, dc, context)
    460             f"Executing {operator_name} on {[str(LoggableDataFrame(df)) for df in operands]}"
    461         )
--> 462         return operation(*operands)
    463 
    464         # TODO: We have information on the typing here - we should use it

~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rex/core/call.py in __call__(self, *operands)
     28     def __call__(self, *operands) -> Union[dd.Series, Any]:
     29         """Call the stored function"""
---> 30         return self.f(*operands)
     31 
     32     def of(self, op: "Operation") -> "Operation":

~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rex/core/call.py in not_(self, df)
    164         """
    165         if is_frame(df):
--> 166             return ~df
    167         else:
    168             return not df  # pragma: no cover

~/anaconda3/envs/py38/lib/python3.8/site-packages/pandas/core/generic.py in __invert__(self)
   1471             return self
   1472 
-> 1473         new_data = self._data.apply(operator.invert)
   1474         result = self._constructor(new_data).__finalize__(self)
   1475         return result

~/anaconda3/envs/py38/lib/python3.8/site-packages/pandas/core/internals/managers.py in apply(self, f, filter, **kwargs)
    438 
    439             if callable(f):
--> 440                 applied = b.apply(f, **kwargs)
    441             else:
    442                 applied = getattr(b, f)(**kwargs)

~/anaconda3/envs/py38/lib/python3.8/site-packages/pandas/core/internals/blocks.py in apply(self, func, **kwargs)
    388         """
    389         with np.errstate(all="ignore"):
--> 390             result = func(self.values, **kwargs)
    391 
    392         if is_extension_array_dtype(result) and result.ndim > 1:

TypeError: bad operand type for unary ~: 'NoneType'

If I remove "NOT", then I get a ValueError:

from dask_sql import Context
import pandas as pd

df = pd.DataFrame({'name':['ifooi','ibooi','tbooi',None], 
                   'number':[1,2,3,4]})

c = Context()
c.register_dask_table(df, 'table')

c.sql("""
SELECT * FROM "table" as s
WHERE s.name LIKE '%foo%'
""")
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-167-19ac24a5e2b5> in <module>
      7 c.register_dask_table(df, 'table')
      8 
----> 9 c.sql("""
     10 SELECT * FROM "table" as s
     11 WHERE s.name LIKE '%foo%'

~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/context.py in sql(self, sql)
    261         try:
    262             rel, select_names = self._get_ral(sql)
--> 263             dc = RelConverter.convert(rel, context=self)
    264         except (ValidationException, SqlParseException) as e:
    265             logger.debug(f"Original exception raised by Java:\n {e}")

~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context)
     54             f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
     55         )
---> 56         df = plugin_instance.convert(rel, context=context)
     57         logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
     58         return df

~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rel/logical/project.py in convert(self, rel, context)
     23     ) -> DataContainer:
     24         # Get the input of the previous step
---> 25         (dc,) = self.assert_inputs(rel, 1, context)
     26 
     27         df = dc.df

~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rel/base.py in assert_inputs(rel, n, context)
     81         from dask_sql.physical.rel.convert import RelConverter
     82 
---> 83         return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
     84 
     85     @staticmethod

~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rel/base.py in <listcomp>(.0)
     81         from dask_sql.physical.rel.convert import RelConverter
     82 
---> 83         return [RelConverter.convert(input_rel, context) for input_rel in input_rels]
     84 
     85     @staticmethod

~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rel/convert.py in convert(cls, rel, context)
     54             f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
     55         )
---> 56         df = plugin_instance.convert(rel, context=context)
     57         logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
     58         return df

~/anaconda3/envs/py38/lib/python3.8/site-packages/dask_sql/physical/rel/logical/filter.py in convert(self, rel, context)
     27         condition = rel.getCondition()
     28         df_condition = RexConverter.convert(condition, dc, context=context)
---> 29         df = df[df_condition]
     30 
     31         cc = self.fix_column_to_row_type(cc, rel.getRowType())

~/anaconda3/envs/py38/lib/python3.8/site-packages/pandas/core/frame.py in __getitem__(self, key)
   2788 
   2789         # Do we have a (boolean) 1d indexer?
-> 2790         if com.is_bool_indexer(key):
   2791             return self._getitem_bool_array(key)
   2792 

~/anaconda3/envs/py38/lib/python3.8/site-packages/pandas/core/common.py in is_bool_indexer(key)
    134                 na_msg = "Cannot mask with non-boolean array containing NA / NaN values"
    135                 if isna(key).any():
--> 136                     raise ValueError(na_msg)
    137                 return False
    138             return True

ValueError: Cannot mask with non-boolean array containing NA / NaN values

Adding the line 'OR s.name IS NULL' resolves the issue only in the second example

Check/Assign the type in fix_column_to_row_type and check_columns_from_row_type

The RelDataType has all the information on the types of the columns also. Currently, we rely on dask to create the correct type - but we won't get subtle differences such as int32 and int64 with that.
Therefore a type conversion for each column should be implemented (and checked) in the fix_column_to_row_type and check_columns_from_row_type functions.

Implement more plugins

So far, basically all standard SELECT statements can be handled, because most of the RelNodes and RexNodes Apache Calcite will produce are already covered. However, there exist more of them and more advanced use cases might trigger them (leading to a NotImplementedError so far).

I would suggest to first find use cases of SQL, where those other classes are triggered, and then use them as a test case for the implementation.

Here is the list of java classes in the Apache Calcite project under rel/logical
and if they still need to be implemented:

  • LogicalAggregate
  • LogicalCalc -> a combination of project and filter, which is not in use as the corresponding optimization is not included (and also not needed)
  • LogicalCorrelate
  • LogicalExchange
  • LogicalFilter
  • LogicalIntersect
  • LogicalJoin
  • LogicalMatch
  • LogicalMinus
  • LogicalProject
  • LogicalRepeatUnion
  • LogicalSnapshot
  • LogicalSortExchange -> not needed with the current optimizations
  • LogicalSort
  • LogicalTableFunctionScan
  • LogicalTableModify -> not needed, as we do not want to have support table edits
  • LogicalTableScan
  • LogicalTableSpool -> no needed, as we do not want to have support table edits
  • LogicalUnion
  • LogicalValues
  • LogicalWindow

Add extended SQL documentation

With the most recent additions, the SQL statements that dask-sql understands have grown. It might make sense to add a page for every SQL statement instead of having them all on a single documentation page

Implement <>

Currently
SELECT * from $table WHERE NOT(col_b = 123.0)
results in

    def convert(
        self, rex: "org.apache.calcite.rex.RexNode", df: dd.DataFrame
    ) -> Union[dd.Series, Any]:
        # Prepare the operands by turning the RexNodes into python expressions
        operands = [RexConverter.convert(o, df) for o in rex.getOperands()]
    
        # Now use the operator name in the mapping
        operator_name = str(rex.getOperator().getName())
    
        try:
>           operation = self.OPERATION_MAPPING[operator_name]
E           KeyError: '<>'

Implement all (non-window) aggregation functions

The LogicalAggregatePlugin currently can not handle all aggregation functions.

From the calcite docu, here are all aggregation functions that calcite understands:

  • ANY_VALUE
  • APPROX_COUNT_DISTINCT
  • AVG
  • BIT_AND
  • BIT_OR
  • BIT_XOR
  • COUNT
  • COVAR_POP (implemented via REGR_COUNT)
  • COVAR_SAMP (implemented via REGR_COUNT)
  • EVERY
  • MAX
  • MIN
  • REGR_COUNT
  • REGR_SXX
  • REGR_SYY
  • SINGLE_VALUE
  • STDDEV
  • STDDEV_POP
  • STDDEV_SAMP
  • SUM
  • VAR_POP
  • VAR_SAMP
  • VARIANCE

Operations on collection types:

  • COLLECT
  • FUSION
  • INTERSECTION
  • LISTAGG

unable to select a column without quotation from a table

Trying to select a column from a table and getting ParsingException.

Select * works.

>>> c.sql("""SELECT * from my_data""")
Dask DataFrame Structure:
              UUID_SAILING a b
npartitions=1
                    object            float64              float64
                       ...                ...                  ...
Dask Name: getitem, 6 tasks

But when trying to select a column (UUID_SAILING) I get

>>> c.sql("""SELECT UUID_SAILING from my_data""")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "C:\Users\131416\AppData\Local\Continuum\anaconda3\envs\ds-rm-obr-dev\lib\site-packages\dask_sql\context.py", line 346, in sql
    rel, select_names, _ = self._get_ral(sql)
  File "C:\Users\131416\AppData\Local\Continuum\anaconda3\envs\ds-rm-obr-dev\lib\site-packages\dask_sql\context.py", line 466, in _get_ral
    raise ParsingException(sql, str(e.message())) from None
dask_sql.utils.ParsingException: Can not parse the given SQL: org.apache.calcite.runtime.CalciteContextException: From line 1, column 8 to line 1, column 19: Column 'uuid_sailing' not found in any table; did you mean 'UUID_SAILING'?

The problem is probably somewhere here:

        SELECT UUID_SAILING from my_data
               ^^^^^^^^^^^^

I believe what is happening is it is expecting quotation around the field name

>>> c.sql("""SELECT * from my_data""").columns
Index(['UUID_SAILING', 'a', 'b'], dtype='object')
>>> c.sql("""SELECT 'UUID_SAILING' from my_data""")
Dask DataFrame Structure:
              'UUID_SAILING'
npartitions=1
                      object
                         ...
Dask Name: getitem, 5 tasks

Note: i'm on windows and the table was created using
c.create_table("my_data", remote_folder + "databricks_out_tmp/file.parquet", storage_options=storage_options)

Also this isn't reproducible using the timeseries as a dataframe

>>> from dask.datasets import timeseries
>>> df = timeseries()
>>> c.create_table("timeseries", df)
>>> c.sql("""SELECT * from timeseries""").columns
Index(['id', 'name', 'x', 'y'], dtype='object')
>>> c.sql("""SELECT * from timeseries""")
Dask DataFrame Structure:
                   id    name        x        y
npartitions=30
2000-01-01      int32  object  float64  float64
2000-01-02        ...     ...      ...      ...
...               ...     ...      ...      ...
2000-01-30        ...     ...      ...      ...
2000-01-31        ...     ...      ...      ...
Dask Name: getitem, 210 tasks
>>> c.sql("""SELECT id from timeseries""")
Dask DataFrame Structure:
                   id
npartitions=30
2000-01-01      int32
2000-01-02        ...
...               ...
2000-01-30        ...
2000-01-31        ...
Dask Name: getitem, 120 tasks

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.