tspurway / hustle Goto Github PK
View Code? Open in Web Editor NEWA column oriented, embarrassingly distributed relational event database.
License: Other
A column oriented, embarrassingly distributed relational event database.
License: Other
This feature could be used to clean up the query's data if it's no longer needed, as the "purge" option for the Disco jobs. It'll keep your Disco cluster lean and neat.
Here is a typical join in Hustle:
select(t1.x, t2.y, where=(t1, t2), join=(t1.jcol, t2.jcol))
This could be more simply represented as:
select(t1.x, t2.y, where=(t1, t2), join='jcol')
if the name of the Column in both Tables is the same (which seems to happen often). We will keep the first form for that case where you are joining on a Column that has a different name in each Table.
Have problem with db path when running version from git
cd /home/mike/github/hustle
DISCO_ROOT=/home/mike/github/disco bin/hustle
>>> pixels = Table.create('pixels',
columns=['index string token', 'index uint8 isActive', 'index site_id', 'uint32 amount',
'index int32 account_id', 'index city', 'index trie16 state', 'index int16 metro',
'string ip', 'lz4 keyword', 'index string date'], partition='date', force=True)
>>> insert(pixels,streams=[[{'token':'a'}]],decoder=lambda d:d)
>>> select(pixels.token,where=pixels)
fails
>>> select(pixels.token,where=pixels)
Traceback (most recent call last):
File "<console>", line 1, in <module>
File "/home/mike/github/hustle/hustle/__init__.py", line 586, in select
blobs = job.wait()
File "/home/mike/.local/lib/python2.7/site-packages/disco/core.py", line 369, in wait
timeout, poll_interval * 1000)
File "/home/mike/.local/lib/python2.7/site-packages/disco/core.py", line 329, in check_results
raise JobError(Job(name=jobname, master=self), "Status {0}".format(status))
JobError: Job select_from_pixels@59f:b7b65:10962 failed: Status dead
with following logs in master
22:35:24.928 [info] Job select_from_pixels@59f:b7ad7:dcb05 failed: <<"Traceback (most recent call last):
File \"/home/mike/github/disco/root/data/localhost/5d/select_from_pixels@59f:b7ad7:dcb05/home/mike/.local/lib/python2.7/site-packages/disco/worker/__init__.py\", line 345, in main
job.worker.start(task, job, **jobargs)
File \"/home/mike/github/disco/root/data/localhost/5d/select_from_pixels@59f:b7ad7:dcb05/home/mike/github/hustle/hustle/core/pipeworker.py\", line 203, in start
self.run(task, job, **jobargs)
File \"/home/mike/github/disco/root/data/localhost/5d/select_from_pixels@59f:b7ad7:dcb05/home/mike/.local/lib/python2.7/site-packages/disco/worker/pipeline/worker.py\", line 228, in run
self.run_stage(task, stage, params)
File \"/home/mike/github/disco/root/data/localhost/5d/select_from_pixels@59f:b7ad7:dcb05/home/mike/github/hustle/hustle/core/pipeworker.py\", line 254, in run_stage
stage.process(interface, state, label, inp, task)
File \"/home/mike/github/hustle/hustle/core/pipeline.py\", line 516, in process_restrict
for key, value in islice(inp, 0, limit):
File \"/home/mike/github/disco/root/data/localhost/5d/select_from_pixels@59f:b7ad7:dcb05/home/mike/.local/lib/python2.7/site-packages/disco/worker/__init__.py\", line 582, in __iter__
for item in iter:
File \"/home/mike/github/disco/root/data/localhost/5d/select_from_pixels@59f:b7ad7:dcb05/home/mike/.local/lib/python2.7/site-packages/disco/worker/__init__.py\", line 538, in next
self.last, item = next(self.iter)
File \"/home/mike/github/hustle/hustle/core/pipeline.py\", line 123, in hustle_input_stream
otab = MarbleStream(fle)
File \"/home/mike/github/hustle/hustle/core/marble.py\", line 566, in __init__
self.marble = Marble.from_file(local_file)
File \"/home/mike/github/hustle/hustle/core/marble.py\", line 175, in from_file
mdb.MDB_NOSUBDIR | mdb.MDB_NOLOCK)
File \"/db.pyx\", line 2210, in mdb.mdb_read_handle (liblmdb/db.c:36219)
File \"/db.pyx\", line 252, in mdb.Env.__init__ (liblmdb/db.c:5167)
Exception: Error opening environment: No such file or directory
">>
After some debugging I see wrong file in hustle_inpiut_stream, see screen shot below
fle=u'/home/mike/github/disco/root/data/mike/github/disco/root/ddfs/vol0/blob/f7/hustleuq5Uwn$59f-b190d-b6730'
It has '/mike/github' twice. This is some bug in util.localize function or my misconfiguration I can't figure out exactly.
Could you please advice what I did wrong?
Although the purge option is set by default these tests are not going to purge the created job. We will need a mechanism for purging the jobs at teardown.
Right now, select() blocks until the query result is returned. This is painful for long running queries. I imagine an option to the select() function, or a separate version of the function could return a future type object that could be inquired about progress (perhaps), and could also be 'waited' on. Note that underlying Disco jobs are already implemented like this, so it should be fairly easy to implement.
future = select_no_block(tab.col, where=tab)
future.progress() # could return [(39,39), (27,32), (0,0)] for 3 stage query (complete, total) label indication
future.wait()
bonus points for implementing a wait() capability that can wait for many queries.
This issue just covers the h_cardinality() aggregation function, which will allow for UNION operations across HLL tables.
The Column currently doesn't support the in operator, which would allow you to easily test the column against a collection of values. You currently have to do this with the | operator.
Ideally, we would have a Column Expression like:
table.column in ['hello', 'there', 'you', 'guys']
however, in Python, this is actually implemented by overriding the in operator on the list class, which is not something we want to do. We want to override operators only on the Column class.
In the fine tradition of overloading bitwise operators for operations in our DSL, I would suggest overloading the << operator on the Column class thusly:
table.column << ['hello', 'there', 'you', 'guys']
A boolean type would save a lot of space. We wouldn't need to retain the column db, only the index db in LMDB. To get the value for that column for a particular rowid, just check one of the values (either true or false) in the ix:db for that column. If the rowid isn't in one, it must be in the other.
This method would save at least 5 bytes per row per column, plus whatever LMDB overhead for the actual db column.
The current way to do 'limit' is that, firstly, fetching all the rows that meet the conditions in the where clause, then emitting a certain number of row as specified in 'limit'. If the where clause has a very wide range, the first stage is extremely time-consuming.
To speed up the query execution, we could pass the limit number to the data fetching stage, i.e. stopping fetching more rows as soon as it reaches the limit. Note that this only works if there is no 'distinct' or 'order by' specified.
We need special jobs to perform unions and intersections on HLL/Minhash type tables.
We have a column:db and ix:db in marble mdbs for partition columns. by definition, each row of the column:db will have N rows with exactly the same value, where N is the number of rows in the marble. The ix:db db will have exactly one column with an "all ones" bitmap. The column is clearly wasteful. This can be removed, and the value of the partition for the marble can be stored in the meta db.
Hustle uses a rather obtuse language for specifying Column data types:
pixels = Table.create(PIXELS,
fields=['=$token', '+@1isActive', '+%site_id', '@amount', '+#account_id',
'+%city', '+%2state', '+#2metro', '$ip', '*keyword', '+$date'],
partition='date',
force=True)
This notation is fine for storage in the underlying tags and meta-dbs, but the language for specifying this information should be changed for the 'user facing' Table.create() function. Here is a possible alternative:
pixels = Table.create(PIXELS,
fields=['wide index string token', 'index uint8 isActive', 'index trie32 site_id',
'uint32 amount', 'index int32 account_id', 'index trie32 city',
'index trie16 state', 'index int16 metro', 'string ip', 'lz4 keyword',
'index string date'],
partition='date',
force=True)
insert(...)
======= attempting to resize mmap ======
Error writing to MDB: Error putting data: Invalid argument
Traceback (most recent call last):
File "hustle/core/marble.py", line 528, in _insert
subindexdb.put(txn, val, bitmap.dumps())
File "hustle/core/marble.py", line 315, in put
self.subindexdb.put(self.txn, key, val)
File "db.pyx", line 1185, in mdb.IntStrDB.put (liblmdb/db.c:19491)
Exception: Error putting data: Invalid argument
Currently, aggregation columns will inherit the type of their corresponding columns. That results in wrong column types when dumping query result to another Hustle marble.
For example,
The type of column "name" is string, h_count(name)'s type should be "int" instead of "string".
Currently, we write a single value from each column to LMDB for each record during insert. The way LMDB works, this will cause our storage to have row orientation in physical memory/disk. To remedy this, we will cache column values in memory during insertion and write column data to LMDB in single bursts.
Hi,
In star schema OLAP design, a small dimension table, need to be replicated across node, to prevent join across network.
How to create replicated table in Hustle (not partitioned one)?
-Aris
This metadata key holds the Trie data for VIDs in each marble. It is possible to compress this by 20% using lz4 compression
The first stage of h_count() can always be executed in restrict-select by simply getting the length of the resulting bitmap generated by the where clause, obviating the need to actually process a counting accumulator.
Currently, h_count() doesn't take advantage of the count information from bitmap, instead it loads up all columns and counts them in the memory. That's not efficient and wasteful.
insert function failed when the input dataset has a large number of partitions (say, 1000+). It has something to do with the lmdb's lock table.
Since lmdb's readers and writers never show up together, turning on MDB_NOLOCK could fix this.
Probably we can implement this as an aggregate function. The ƒGH framework should get this done painlessly.
Currently you can have either 0 or 1 partition columns. We need the ability to have more.
The latest version of Disco supports to process multiple stages concurrently. Hustle could benefit from this new feature to speed up the query execution.
The tricky part is that in Hustle, every stage needs the input to be sorted, that means the stage is unable to run until all the output from previous stage is available and gets sorted. In some cases, sorting the input is unnecessary and wasteful, further, it prevents Disco from running the following stage concurrently as waiting the whole input to be available.
The following query is a typical one, which could be beneficial a lot from using Disco concurrent model,
select(h_sum(foo.cost), where=foo.date=="2014-06-02")
Almost all of the aggregating functions can be optimized if we are operation on a bit column:
They should also take string columns, as the only requirement is that the columns be range comparable
We need to support the following type of column aliasing:
select(table.column.alias('newname'), table.col2, h_sum(table.col3).alias('anothername'), ...
In the result schema, the columns will be called: newname, col2, anothername
Seeing wrong indexes for the "bit" columns.
The following returns multiple rows, and should return one:
select(h_count(), where=t.c1 == 'hello')
Currently, a nested select() will return a Table named 'sub-tablename' where tablename is the table the select() was performed on.
If we try to join two nested tables that were from the same table, they will both have the same name, and that will cause a number of issues in identifying which table is which.
For nested queries, select a unique name for the returned Table.
For example:
a = select(t.c1, t.c2, where=t.c3 > 5, nest=True)
b = select(t.c1, t.c2, where=t.c4 == 'bizness', nest=True)
c = select(a.c1, b.c2, where=(a, b), join=(a.c5, b.c5))
will complain:
Table sub-t occurs twice in the where clause.
The following query is currently illegal in Hustle, as we don't allow Columns to be right side operands in Column expressions:
select(t.c1, where=t.c2 > t.c3)
Note that is is a much different issue when the two columns are from different Tables, as this would have to be done at a later stage than restrict-select, where all of the current Column expressions are evaluated
For aggregation functions, we currently don't take advantage of duplicate group by keys during the first stage of the query. It is possible to 'combine' the keys by storing the aggregated keys in memory during the input processing to the restrict-select stage. Note that this should only be done for low cardinality keys, as the higher the cardinality, the more memory will be used as well as the fact that high cardinality keys don't benefit from a combiner at all.
We should also take a look at whether or not the separate 'group-combine' stage is really required any more. Benchmark this with both high and low cardinality group keys to see if it in fact saves any time.
Add a partition filter for inserts when giving a list of files, useful for single day reloads if the partition is day.
a = select(t.c1, t.c2, where=t, nest=True)
dump(a._blobs)
will incur an Disco exception "DataError: ... Corrupted input: Could not parse a value length at 0 bytes.". It shouldn't use result_iterator() to fetch the result in this case.
We should be able to set 'save=tablename' and have the results of a query saved back out to DDFS. It would have to use the 'hustle_output_stream' much like 'nest=True'.
In the marble's value db (the rowid -> value db for a column), we can compress the data by skipping consecutive rows that have the same value. For example, currently we store value dbs like this:
RID | VID |
---|---|
1 | 12 |
2 | 12 |
3 | 12 |
4 | 15 |
5 | 12 |
6 | 12 |
7 | 18 |
8 | 18 |
9 | 18 |
with Duplicate Adjacent Column Optimization, this db becomes:
RID | VID |
---|---|
1 | 12 |
4 | 15 |
5 | 12 |
7 | 18 |
During a query like:
select(tab.c1, tab.c2, where=(tab.c1 == 'hello') & (tab.c3 > 2))
we can avoid fetching tab.c1 during the input processing (hustle_input_stream), saving some cycles, because it will always have the value of 'hello'.
The LZ4 homepage lists several Big Data projects using LZ4:
https://code.google.com/p/lz4/
but sadly, hustle isn't amongst them.
Query
select(department.name, where = ~(department.id. > 10))
returns 0 partitions.
We currently don't have a way of dropping a table and all of it's partitions.
We have 2(ish) input decoders right now and only one is really solid (the json_decoder). Here's a good start:
Also, it would be nice to gather some good basic ETL preprocessors for various popular web and application servers, proxies, etc.
Storing IP addresses in Hustle turns out to be quite common. Directly storing the IP strings like "127.0.0.1" is space wasteful and inefficient when querying it. A better encoding schema would be helpful.
Option 1, convert it to binary format before inserting to database. In turn, convert back to string when fetching it. (inet_aton, inet_ntoa)
Option 2, radix trie encoding. The current 32bit Trie can't hold the whole IP collections. The binary radix trie could be the used here.
I saw that you are using lz4 for compression. Perhaps you are also interested in the Blosc metacodec:
It actually supports using lz4 under the hood and can yield speed and ratio improvements over plain lz4 on certain kinds of data (e.g. timeseries) due to multithreading and pre-conditioning.
We have some pretty well tuned python bindings too:
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.