Giter VIP home page Giter VIP logo

hustle's People

Contributors

dinedal avatar ncloudioj avatar oldmantaiter avatar pombredanne avatar pooya avatar tspurway avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

hustle's Issues

Auto Purge Query's Result

This feature could be used to clean up the query's data if it's no longer needed, as the "purge" option for the Disco jobs. It'll keep your Disco cluster lean and neat.

Simplified 'join' clause

Here is a typical join in Hustle:

select(t1.x, t2.y, where=(t1, t2), join=(t1.jcol, t2.jcol))

This could be more simply represented as:

select(t1.x, t2.y, where=(t1, t2), join='jcol')

if the name of the Column in both Tables is the same (which seems to happen often). We will keep the first form for that case where you are joining on a Column that has a different name in each Table.

Exception: Error opening environment: No such file or directory

Have problem with db path when running version from git

cd /home/mike/github/hustle
DISCO_ROOT=/home/mike/github/disco bin/hustle
>>> pixels = Table.create('pixels',
      columns=['index string token', 'index uint8 isActive', 'index site_id', 'uint32 amount',
               'index int32 account_id', 'index city', 'index trie16 state', 'index int16 metro',
               'string ip', 'lz4 keyword', 'index string date'],      partition='date',      force=True)
>>> insert(pixels,streams=[[{'token':'a'}]],decoder=lambda d:d)
>>> select(pixels.token,where=pixels)

fails

>>> select(pixels.token,where=pixels)
Traceback (most recent call last):
  File "<console>", line 1, in <module>
  File "/home/mike/github/hustle/hustle/__init__.py", line 586, in select
    blobs = job.wait()
  File "/home/mike/.local/lib/python2.7/site-packages/disco/core.py", line 369, in wait
    timeout, poll_interval * 1000)
  File "/home/mike/.local/lib/python2.7/site-packages/disco/core.py", line 329, in check_results
    raise JobError(Job(name=jobname, master=self), "Status {0}".format(status))
JobError: Job select_from_pixels@59f:b7b65:10962 failed: Status dead

with following logs in master

22:35:24.928 [info] Job select_from_pixels@59f:b7ad7:dcb05 failed: <<"Traceback (most recent call last):
  File \"/home/mike/github/disco/root/data/localhost/5d/select_from_pixels@59f:b7ad7:dcb05/home/mike/.local/lib/python2.7/site-packages/disco/worker/__init__.py\", line 345, in main
    job.worker.start(task, job, **jobargs)
  File \"/home/mike/github/disco/root/data/localhost/5d/select_from_pixels@59f:b7ad7:dcb05/home/mike/github/hustle/hustle/core/pipeworker.py\", line 203, in start
    self.run(task, job, **jobargs)
  File \"/home/mike/github/disco/root/data/localhost/5d/select_from_pixels@59f:b7ad7:dcb05/home/mike/.local/lib/python2.7/site-packages/disco/worker/pipeline/worker.py\", line 228, in run
    self.run_stage(task, stage, params)
  File \"/home/mike/github/disco/root/data/localhost/5d/select_from_pixels@59f:b7ad7:dcb05/home/mike/github/hustle/hustle/core/pipeworker.py\", line 254, in run_stage
    stage.process(interface, state, label, inp, task)
  File \"/home/mike/github/hustle/hustle/core/pipeline.py\", line 516, in process_restrict
    for key, value in islice(inp, 0, limit):
  File \"/home/mike/github/disco/root/data/localhost/5d/select_from_pixels@59f:b7ad7:dcb05/home/mike/.local/lib/python2.7/site-packages/disco/worker/__init__.py\", line 582, in __iter__
    for item in iter:
  File \"/home/mike/github/disco/root/data/localhost/5d/select_from_pixels@59f:b7ad7:dcb05/home/mike/.local/lib/python2.7/site-packages/disco/worker/__init__.py\", line 538, in next
    self.last, item = next(self.iter)
  File \"/home/mike/github/hustle/hustle/core/pipeline.py\", line 123, in hustle_input_stream
    otab = MarbleStream(fle)
  File \"/home/mike/github/hustle/hustle/core/marble.py\", line 566, in __init__
    self.marble = Marble.from_file(local_file)
  File \"/home/mike/github/hustle/hustle/core/marble.py\", line 175, in from_file
    mdb.MDB_NOSUBDIR | mdb.MDB_NOLOCK)
  File \"/db.pyx\", line 2210, in mdb.mdb_read_handle (liblmdb/db.c:36219)
  File \"/db.pyx\", line 252, in mdb.Env.__init__ (liblmdb/db.c:5167)
Exception: Error opening environment: No such file or directory
">>

After some debugging I see wrong file in hustle_inpiut_stream, see screen shot below
fle=u'/home/mike/github/disco/root/data/mike/github/disco/root/ddfs/vol0/blob/f7/hustleuq5Uwn$59f-b190d-b6730'

It has '/mike/github' twice. This is some bug in util.localize function or my misconfiguration I can't figure out exactly.

image

Could you please advice what I did wrong?

Non blocking select() function

Right now, select() blocks until the query result is returned. This is painful for long running queries. I imagine an option to the select() function, or a separate version of the function could return a future type object that could be inquired about progress (perhaps), and could also be 'waited' on. Note that underlying Disco jobs are already implemented like this, so it should be fairly easy to implement.

future = select_no_block(tab.col, where=tab)
future.progress()  # could return [(39,39), (27,32), (0,0)] for 3 stage query (complete, total) label indication
future.wait()

bonus points for implementing a wait() capability that can wait for many queries.

'in' operator

The Column currently doesn't support the in operator, which would allow you to easily test the column against a collection of values. You currently have to do this with the | operator.

Ideally, we would have a Column Expression like:

table.column in ['hello', 'there', 'you', 'guys']

however, in Python, this is actually implemented by overriding the in operator on the list class, which is not something we want to do. We want to override operators only on the Column class.

In the fine tradition of overloading bitwise operators for operations in our DSL, I would suggest overloading the << operator on the Column class thusly:

table.column << ['hello', 'there', 'you', 'guys']

Boolean data type

A boolean type would save a lot of space. We wouldn't need to retain the column db, only the index db in LMDB. To get the value for that column for a particular rowid, just check one of the values (either true or false) in the ix:db for that column. If the rowid isn't in one, it must be in the other.

This method would save at least 5 bytes per row per column, plus whatever LMDB overhead for the actual db column.

Optimize queries with 'limit'

The current way to do 'limit' is that, firstly, fetching all the rows that meet the conditions in the where clause, then emitting a certain number of row as specified in 'limit'. If the where clause has a very wide range, the first stage is extremely time-consuming.

To speed up the query execution, we could pass the limit number to the data fetching stage, i.e. stopping fetching more rows as soon as it reaches the limit. Note that this only works if there is no 'distinct' or 'order by' specified.

Compression of partition data

We have a column:db and ix:db in marble mdbs for partition columns. by definition, each row of the column:db will have N rows with exactly the same value, where N is the number of rows in the marble. The ix:db db will have exactly one column with an "all ones" bitmap. The column is clearly wasteful. This can be removed, and the value of the partition for the marble can be stored in the meta db.

Schema Language

Hustle uses a rather obtuse language for specifying Column data types:

    pixels = Table.create(PIXELS,
                          fields=['=$token', '+@1isActive', '+%site_id', '@amount', '+#account_id', 
                                     '+%city', '+%2state', '+#2metro', '$ip', '*keyword', '+$date'],
                          partition='date',
                          force=True)

This notation is fine for storage in the underlying tags and meta-dbs, but the language for specifying this information should be changed for the 'user facing' Table.create() function. Here is a possible alternative:

    pixels = Table.create(PIXELS,
                          fields=['wide index string token', 'index uint8 isActive', 'index trie32 site_id',
                                     'uint32 amount', 'index int32 account_id', 'index trie32 city',
                                     'index trie16 state', 'index int16 metro', 'string ip', 'lz4 keyword', 
                                     'index string date'],
                          partition='date',
                          force=True)

Hustle sometimes fails to insert data into tables.

insert(...)

======= attempting to resize mmap ======
Error writing to MDB: Error putting data: Invalid argument
Traceback (most recent call last):
File "hustle/core/marble.py", line 528, in _insert
subindexdb.put(txn, val, bitmap.dumps())
File "hustle/core/marble.py", line 315, in put
self.subindexdb.put(self.txn, key, val)
File "db.pyx", line 1185, in mdb.IntStrDB.put (liblmdb/db.c:19491)
Exception: Error putting data: Invalid argument

Wrong Data Types for Aggregation Columns

Currently, aggregation columns will inherit the type of their corresponding columns. That results in wrong column types when dumping query result to another Hustle marble.

For example,
The type of column "name" is string, h_count(name)'s type should be "int" instead of "string".

Locality of reference for value column data in LMDB

Currently, we write a single value from each column to LMDB for each record during insert. The way LMDB works, this will cause our storage to have row orientation in physical memory/disk. To remedy this, we will cache column values in memory during insertion and write column data to LMDB in single bursts.

_meta_._vid_kids optimization

This metadata key holds the Trie data for VIDs in each marble. It is possible to compress this by 20% using lz4 compression

h_count() optimization

The first stage of h_count() can always be executed in restrict-select by simply getting the length of the resulting bitmap generated by the where clause, obviating the need to actually process a counting accumulator.

Use bitmap to speed up the h_count()

Currently, h_count() doesn't take advantage of the count information from bitmap, instead it loads up all columns and counts them in the memory. That's not efficient and wasteful.

Can't insert data with a huge number of partitions

insert function failed when the input dataset has a large number of partitions (say, 1000+). It has something to do with the lmdb's lock table.

Since lmdb's readers and writers never show up together, turning on MDB_NOLOCK could fix this.

Wrap Hustle into the Disco concurrent pipeline processing model

The latest version of Disco supports to process multiple stages concurrently. Hustle could benefit from this new feature to speed up the query execution.

The tricky part is that in Hustle, every stage needs the input to be sorted, that means the stage is unable to run until all the output from previous stage is available and gets sorted. In some cases, sorting the input is unnecessary and wasteful, further, it prevents Disco from running the following stage concurrently as waiting the whole input to be available.

The following query is a typical one, which could be beneficial a lot from using Disco concurrent model,

select(h_sum(foo.cost), where=foo.date=="2014-06-02")

Aggregate operation optimizations on Bit type columns

Almost all of the aggregating functions can be optimized if we are operation on a bit column:

  • h_sum = bitmap.length() -> the number of '1' bits in the bitmap
  • h_avg = bm.length() / bm.capacity()
  • h_min = 0 if bm.length() != bm.capacity() else 1 -- similar for h_max()

Ability to alias column or aggregate functions

We need to support the following type of column aliasing:

select(table.column.alias('newname'), table.col2, h_sum(table.col3).alias('anothername'), ...

In the result schema, the columns will be called: newname, col2, anothername

Nested selects() result in poorly named Tables

Currently, a nested select() will return a Table named 'sub-tablename' where tablename is the table the select() was performed on.

If we try to join two nested tables that were from the same table, they will both have the same name, and that will cause a number of issues in identifying which table is which.

For nested queries, select a unique name for the returned Table.

For example:

a = select(t.c1, t.c2, where=t.c3 > 5, nest=True)
b = select(t.c1, t.c2, where=t.c4 == 'bizness', nest=True)
c = select(a.c1, b.c2, where=(a, b), join=(a.c5, b.c5))

will complain:

Table sub-t occurs twice in the where clause.

where clause only allows column expressions with literals

The following query is currently illegal in Hustle, as we don't allow Columns to be right side operands in Column expressions:

select(t.c1, where=t.c2 > t.c3)

Note that is is a much different issue when the two columns are from different Tables, as this would have to be done at a later stage than restrict-select, where all of the current Column expressions are evaluated

Combine optimization in restrict-select stage

For aggregation functions, we currently don't take advantage of duplicate group by keys during the first stage of the query. It is possible to 'combine' the keys by storing the aggregated keys in memory during the input processing to the restrict-select stage. Note that this should only be done for low cardinality keys, as the higher the cardinality, the more memory will be used as well as the fact that high cardinality keys don't benefit from a combiner at all.

We should also take a look at whether or not the separate 'group-combine' stage is really required any more. Benchmark this with both high and low cardinality group keys to see if it in fact saves any time.

Partition filter for insert

Add a partition filter for inserts when giving a list of files, useful for single day reloads if the partition is day.

dump() can't handle the result of nested query

a = select(t.c1, t.c2, where=t, nest=True)
dump(a._blobs)

will incur an Disco exception "DataError: ... Corrupted input: Could not parse a value length at 0 bytes.". It shouldn't use result_iterator() to fetch the result in this case.

Ability to save query results in DDFS

We should be able to set 'save=tablename' and have the results of a query saved back out to DDFS. It would have to use the 'hustle_output_stream' much like 'nest=True'.

Duplicate Adjacent Column Optimization

In the marble's value db (the rowid -> value db for a column), we can compress the data by skipping consecutive rows that have the same value. For example, currently we store value dbs like this:

RID VID
1 12
2 12
3 12
4 15
5 12
6 12
7 18
8 18
9 18

with Duplicate Adjacent Column Optimization, this db becomes:

RID VID
1 12
4 15
5 12
7 18

Invariant query columns optimization

During a query like:

select(tab.c1, tab.c2, where=(tab.c1 == 'hello') & (tab.c3 > 2))

we can avoid fetching tab.c1 during the input processing (hustle_input_stream), saving some cycles, because it will always have the value of 'hello'.

drop_table()

We currently don't have a way of dropping a table and all of it's partitions.

New input decoders

We have 2(ish) input decoders right now and only one is really solid (the json_decoder). Here's a good start:

  • fixed_width (like csv, but with designated columns for each field)
  • csv
  • netCDF
  • HDF5
  • protocol buffer
  • cap'n proto
  • xml if there is still demand

Also, it would be nice to gather some good basic ETL preprocessors for various popular web and application servers, proxies, etc.

New Data Type IP

Storing IP addresses in Hustle turns out to be quite common. Directly storing the IP strings like "127.0.0.1" is space wasteful and inefficient when querying it. A better encoding schema would be helpful.

Option 1, convert it to binary format before inserting to database. In turn, convert back to string when fetching it. (inet_aton, inet_ntoa)
Option 2, radix trie encoding. The current 32bit Trie can't hold the whole IP collections. The binary radix trie could be the used here.

Conider using Blosc

I saw that you are using lz4 for compression. Perhaps you are also interested in the Blosc metacodec:

http://www.blosc.org

It actually supports using lz4 under the hood and can yield speed and ratio improvements over plain lz4 on certain kinds of data (e.g. timeseries) due to multithreading and pre-conditioning.

We have some pretty well tuned python bindings too:

https://github.com/Blosc/python-blosc

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.