datopian / assembler Goto Github PK
View Code? Open in Web Editor NEWThe DataHub data assembly line
License: MIT License
The DataHub data assembly line
License: MIT License
Choose a good number of rows for previewing the data (based on our current core datasets).
E.g. DPP_ELASTICSEARCH variable (with an example to illustrate whether it is fully qualified or not ...)
Problem: you republish of the dataset and it will take a few minutes even if only metadata changed.
Split the pipeline into: metadata and data pipelines
Copying from openknowledge-archive/dpr-api#273
When we upload any file in pkg store the file type by default set to binary/octet-stream
. But we want text/plain:
e.g. datapackage.json has binary/octet-stream should be json. Try:
curl -I http://pkgstore-testing.datahub.io/core/finance-vix/latest/datapackage.json
Why is this a problem:
Want right content type for JSON and for everything else set text/plain (or could even just have text/plain for json too).
Add Content-Type
while publishing data to S3
text/plain
If you go and visit http://datahub.io/anuveyatsu/gdp/pipelines, you can see that it has status of INVALID and error:
Dirty dependency: Cannot run until all dependencies are executed
Dependency unsuccessful: Cannot run until all dependencies are successfully executed
We need two new processors to be able to have preview resources - processors for getting first N rows from given resource and processor for adding views to the datapackage.json
parameters: limit (will default to 10000)
One of our core datasets finance-fix that should be updated daily, is stuck on 2nd of October - the scheduled job is running but data is never updated.
This is happening cause only "main" pipeline is running on scheduled time and derived ones remain same
We need to export data into a sqlite if requested
Parameters:
_
)rewrite
(always creates new tables)appen
(append new rows) and update
(update if row exists). but let's keep it simplepath on S3: data/sqlite/data/{resourcename}.db
# request body from CLI
{
...
kind: sqlite
}
meta:
owner: <owner username>
ownerid: <owner unique id>
dataset: <dataset name>
version: 1
findability: <published/unlisted/private>
inputs:
- # only one input is supported atm
kind: datapackage
url: <datapackage-url>
parameters:
resource-mapping:
<resource-name-or-path>: <resource-url>
outputs:
-
kind: sqlite
when sqlite is in outputs, we need to add two processors:
# pipeline-spec
meta:
...
inputs:
-
kind: datapackage
...
outputs:
-
kind: sqlite
# generator.py in assambler
pipeline = [current_pipeline]
for output in outputs:
if output[kind] == 'sqlite':
pipeline.append({run: dump.to_sql, parameters: {engine: lsqlite:///}}})
etc..
yield pipeline_id, {pipeline: pipeline}
Questions:
What should be path for it?
We are introducing private datasets for users. We need to test the pipelines executed fine and links are returning 403
When developing locally and running all pipeline, not all the, are executed - no errors, no warnings they are marked as successful, but actually some of them are not executed at all. (I strongly believe this is happening not only locally, but in production as well - see second comment).
Recently we've implemented new pipeline that generates preview resource. This pipeline cannot be executed until derived/csv
. Also, cannot be executed "the main" pipeline that is depended on all others are finished. So
dpp run all
) nontabular plus all derived ones plus successfully executed (including preview), there are no error messages or warnings. However, there is no the final one (the one that should contain all together plus original)dpp run all
) now that final one gets there and we have complete datasetFor example, running dpp
list all pipelines with their errors. running dpp
for first time (non of pipleines are executed yet) Logs:
Available Pipelines:
INFO :Main :Skipping redis connection, host:None, port:6379
- ./core/finance-vix:non-tabular (*)
- ./core/finance-vix:vix-daily_csv (*)
- ./core/finance-vix:vix-daily_csv_preview (E)
Dirty dependency: Cannot run until all dependencies are executed
Dependency unsuccessful: Cannot run until all dependencies are successfully executed
- ./core/finance-vix:vix-daily_json (*)
- ./core/finance-vix (E)
Dirty dependency: Cannot run until all dependencies are executed
Dependency unsuccessful: Cannot run until all dependencies are successfully executed
Dirty dependency: Cannot run until all dependencies are executed
Dependency unsuccessful: Cannot run until all dependencies are successfully executed
Running dpp for second time (after first dpp run all finishes):
Available Pipelines:
INFO :Main :Skipping redis connection, host:None, port:6379
- ./core/finance-vix:non-tabular
- ./core/finance-vix:vix-daily_csv
- ./core/finance-vix:vix-daily_csv_preview (*)
- ./core/finance-vix:vix-daily_json
- ./core/finance-vix (E)
Dirty dependency: Cannot run until all dependencies are executed
Dependency unsuccessful: Cannot run until all dependencies are successfully executed
Running dpp now (after second dpp run all
finished) - now everything is fine
Available Pipelines:
INFO :Main :Skipping redis connection, host:None, port:6379
- ./core/finance-vix:non-tabular
- ./core/finance-vix:vix-daily_csv
- ./core/finance-vix:vix-daily_csv_preview
- ./core/finance-vix:vix-daily_json
- ./core/finance-vix (*)
how dpp run all
works
dirty=False
in our case, two pipelines have dependencies: derived_preview and main pipeline. So when iterating it goes like this:
dirty=False
dirty=False
for some reason this canhge is not happeningI can not find the reason why status of successfully executed pipeline is still dirty for final iteration.
NOTE: this is not happening if you run dpp run dirty
. All pipelines are executed fine in that case
Feedback after reviewing export to zip.
out-file
ot dataset-name.zip instead of datahub.zip [0.5]I Have pushed a new source spec for one of the core datasets using specstore API.
success=true
The source spec I pushed (and currently in DB):
meta:
dataset: finance-vix
findability: published
owner: core
ownerid: core
version: 2
inputs:
- kind: datapackage
parameters:
resource-mapping:
vix-daily: http://www.cboe.com/publish/ScheduledTask/MktData/datahouse/vixcurrent.csv
url: https://raw.githubusercontent.com/datasets/finance-vix/master/.datahub/datapackage.json
processing:
-
input: vix-daily
tabulator:
skip_rows: 2
headers:
- Date
- VIXOpen
- VIXHigh
- VIXLow
- VIXClose
output: vix-daily
schedule:
crontab: '0 0 * * *'
Source spec from dashboard http://api.datahub.io/pipelines/#anchor-ALL-core-finance-vix
inputs:
- kind: datapackage
parameters:
resource-mapping:
vix-daily: https://s3.amazonaws.com/rawstore.datahub.io/9c46b3948d297fa1c4e92ead714f0399
url: https://s3.amazonaws.com/rawstore.datahub.io/66387913a43fc2a04ca602ce0d529b1c
meta:
dataset: finance-vix
findability: unlisted
owner: core
ownerid: core
update_time: '2017-09-26T10:02:32.931873'
version: 1
Currently, we are using SourceSpecRegistry to read from DB what pipelines need to be run. Since we extracted planning part of assembling and changed the flow management, SourceSpecRegistry is not useful anymore.
Generated artifacts should be stored on pkgstore S3 based on their hash, and not their path
or pipeline-id properties.
TODO: proper description and taskes
We have 2 core data packages that have not passed pipeline. The problem is geopoint
type.
In the first time, it errored with invalid geopoint type
. Then I fixed both of them. Now we have a different error, probably with jsontableschema
library.
Error outcome from pipeline:
AttributeError: 'list' object has no attribute 'split'
It seems best way to handle README is to store inline in datapackage.json in rawstore.
This means assembler pipeline must extract readme from readme
property and dump to README.md (and delete that property from the datapackage.json) when dumping to the pkgstore.
how we are going to deal with sorting data that is time sensitive. Eg: for Finance Vix, that is updated daily, we need last 200 rows (or maybe first for other datasets), to display the latest data
{desc: true/false}
processing
object of source-speccc @rufuspollock and @akariv what do you think about this?
As a Publisher I want to upload a 50Mb CSV and have the showcase page work
basic_nodes.py
#22node_collector.py
with new processing nodeload_modified_resources.py
to handle dp.json with viewsWe will need two new processors: generate_views (for adding the views) and one that just takes derived resources (json) and yields only first 10000 rows.
Modifications for basic_nodes.py:
class DerivedFormatProcessingNode(BaseProcessingNode):
def get_artifacts(self):
for artifact in self.available_artifacts:
if artifact.datahub_type == 'derived/json': #or csv
datahub_type = 'derived/preview'
resource_name = artifact.resource_name + '_preview.json'
output = ProcessingArtifact(
datahub_type, resource_name,
[artifact], [],
[(new processors + old goes here)],
True
)
yield output
class DerivedPreviewProcessingNode(...):
def __init__(self):
... # (all same)
Inside node_collector.py
we will have to add DerivedPreviewrocessingNode
in the appropriate order. (should come after derived json as it is depended on that)
And we will have to modify load_modified_resources.py
to handle "views" as the views are not yet supported there.
Example datapackage.json after running pipelines:
{
"resources": [
{
"name": "vix-daily",
"path": "data/vix-daily.csv",
"format": "csv",
"mediatype": "text/csv",
"schema": {...}
},
{}, // derived csv
{}, // derived json
{ // even though json data we have a table schema
"name": "vix-daily_csv_preview",
"path": "data/preview/vix-daily.json",
"datahub": {
"derivedFrom": [
"vix-daily_csv"
],
"forView": [
"datahub-preview-vix-daily_csv_preview"
],
"type": "derived/preview" // data will be json ...
}
}
],
"views": [
{
"name": "graph",
"title": "VIX - CBOE Volatility Index",
"specType": "simple",
"spec": {
"type": "line",
"group": "Date",
"series": [
"VIXClose"
]
}
},
{
name: 'datahub-preview-vix-daily_csv_preview',
specType: 'table',
datahub: {
type: 'preview'
},
transform: {
limit: 10000
},
resources: [
'vix-daily_csv_preview'
]
}
]
}
We need new processing node to define one more processing flow (preview datasets). For that, we need to implement a class inheriting from BaseProcessingNode
, and add that class name to the ORDERED_NODE_CLASSES array
. new processing node will iterate over all resources and add processors that are generating first 10000 rows and views to the ones that have type derived/json
load_previews
and load_views
from #23class DerivedPreviewProcessingNode
that will inherit DerivedFormatProcessingNode
DerivedPreviewProcessingNode
override get_artifacts
and Iterate over existing artefacts (resources)DerivedPreviewProcessingNode
to the ORDERED_NODE_CLASSES
in correct position (last in the list)Example how refactored basic_nodes.py
may look like
class DerivedFormatProcessingNode(BaseProcessingNode):
def get_artifacts(self):
for artifact in self.available_artifacts:
if artifact.datahub_type == 'derived/csv':
datahub_type = 'derived/preview'
resource_name = artifact.resource_name + '_preview.'
output = ProcessingArtifact(
datahub_type, resource_name,
[artifact], [],
[(new processors + old goes here)],
True
)
yield output
class DerivedPreviewProcessingNode(...):
def __init__(self):
... # (all same)
The resulting datapackage should have the bytes
and rowCount
set correctly:
rowCount
should be the sum of all rowcounts of derived/csv
resources.bytes
should be the sum of all bytes from all resources.If you try and download zip files, data inside is placed in following structure
|- data/data.csv
|- data/filehash1/data_csv.csv
|- data/filehash2/data_json.csv
|- datapackage.json
We don't need hashes
dump.to_zip
in outputNodes
Currently, we are copping across non-tabular resources, without any modification to dp.json. We need to change this behaviour as in some cases we have to reuse and add it as a new resource to the dp.json. add_resource
processor searches the resource in dp.json with its name and as non-tabular ones are copied across, resource names are copied from originals. So we are getting AssertionError: Failed to find a resource with the index or name matching 'non-tabular'
as a planner is trying to add resource using artefact name (add_resource, {resource: resource_info[required_artifact.resource_name]}...)
NonTabularProcessingNode
in basic_nodes.py to yield artifact with correct resource names {resource name}_non_tabular
Besides metadata, we want to store events on Elastic Search
{
"timestamp": ...
// type of event
"event_entity": "flow|account|..."
"event_action": "create/finished/deleted/...
// event filters
"owner": ...
"dataset": ... // may not always be there ...
// results
"outcome": "ok/error", or "status": "good/bad/ugly",
"messsage": ... // mostly blank
"findability": ...
"payload": {
// anything you want add - customizing the event info per object
flow-id (aka pipeline-id)
}
}
When writing datapackage.json to pkgstore pretty print json so it is nice to read for users (this is convenient when directly browsing).
Not sure this is that important ;-)
stream_remote_resources: WARNING :Error while opening resource from url https://s3.amazonaws.com/rawstore-testing.datahub.io/sQqpgDlCdaDFdRjzxbZN9Q==: FormatError('Format "None" is not supported',)
stream_remote_resources: Traceback (most recent call last):
stream_remote_resources: File "/usr/local/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/stream_remote_resources.py", line 144, in opener
stream_remote_resources: _stream.open()
stream_remote_resources: File "/usr/local/lib/python3.6/site-packages/tabulator/stream.py", line 132, in open
stream_remote_resources: raise exceptions.FormatError(message)
stream_remote_resources: tabulator.exceptions.FormatError: Format "None" is not supported
stream_remote_resources: During handling of the above exception, another exception occurred:
stream_remote_resources: Traceback (most recent call last):
stream_remote_resources: File "/usr/local/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/stream_remote_resources.py", line 201, in
stream_remote_resources: rows = stream_reader(resource, url, ignore_missing or url == "")
stream_remote_resources: File "/usr/local/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/stream_remote_resources.py", line 159, in stream_reader
stream_remote_resources: schema, headers, stream, close = get_opener(url, _resource)()
stream_remote_resources: File "/usr/local/lib/python3.6/site-packages/datapackage_pipelines/specs/../lib/stream_remote_resources.py", line 153, in opener
stream_remote_resources: _stream.close()
stream_remote_resources: File "/usr/local/lib/python3.6/site-packages/tabulator/stream.py", line 156, in close
stream_remote_resources: self.__parser.close()
stream_remote_resources: AttributeError: 'NoneType' object has no attribute 'close'
If you take a look at the paths of the resources in the datapakcage.json they look like this: "https://pkgstore.datahub.io/core/finance-vix:non-tabular/data/vix-daily.csv"
The structure on the S3 is the same:
| pkgstore.datahub.io
--| core
--|--| finance-vix:non-tabular
--|--|--| data
--|--|--|--| vix-daily.csv
@akariv Is this done on purpose or there should be /
and that is a bug?
Example dp.json: https://pkgstore.datahub.io/core/finance-vix/latest/datapackage.json
/
As an admin, I want to record most activities (sign up, sign in, push etc) so that I'm know how frequently users are using platform and how often they achieve expected results
As a Publisher, I want to see what has happened recently with my data (and in future what my team has done) so that I know things are working, what's changed etc
As a Viewer I want to see what a Publisher has been up to so that I know whether they are active, what's new and what's hot (so that ...)
In order to do statistical analyses we need to record most possible activities on the platform. Good place for storing info would be the elastic search
What are the events?
{
"timestamp": ...
// type of event
"event_entity": "flow|account|..."
"event_action": "create/finished/deleted/...
// event filters
"owner": ...
"dataset": ... // may not always be there ...
// results
"outcome": "ok/error", or "status": "good/bad/ugly",
"messsage": ... // mostly blank
"findability": ...
"payload": {
// anything you want add - customizing the event info per object
flow-id (aka pipeline-id)
}
}
{
"timestamp":
"owner":
"payload": ...
}
For user dashboard:
SELECT * FROM events where event.owner = userid SORT DESC timestamp;
SELECT * FROM events where ... AND
For user public profile
SELECT * FROM events where event.owner = profilid AND findability == 'public' AND type != 'login'
Internally
SELECT count(*) FROM events where type = "dataset/push" AND outcome == "failed"
Recently, we have implemented generating derived/preview
resources, which are first 10k rows of the original resource. It was decided to make it in JSON format for convenience in the frontend. However, as we can see now, JSON versions usually are larger in size than CSV versions. E.g., for finance-vix dataset, original resource is ~228KB
, while derived/preview
version is ~528KB
. It becomes crucial when loading resources that are several MB in sizes.
Another point is we could skip generating derived/preview
if the number of rows is <10k
as it does not make a lot of sense or am I missing something? Frontend would use original resource for the preview if derived/preview
is not found. Notice, that we have to generate preview views anyway.
In addition, we could reduce number of rows for preview resources, e.g., only first 1k rows. Consider https://pkgstore.datahub.io/f512ef8d39ada702fde60efe1ca59c17/farm-url/latest/datapackage.json which has a resource with 4987
rows . What do you think?
If you check this dataset - https://pkgstore.datahub.io/f512ef8d39ada702fde60efe1ca59c17/farm-url/latest/datapackage.json :
bytes: 51811260
, which is ~52MB but in fact it is ~17MBWe have failing test on Travis. There are multiple revisions saved while should be one per each dataset. This is happening cause we are erasing data only ones from db (on the start) - we need to do that after each test
@zelima commented on Thu Nov 09 2017
Depending on config passed to planner we have a different kind of outputs. We need to test all of them
As a user, I want to see datapackage.json prettified other than have it on one line, so that it is easy for me to read it
Research and implement storing data in S3 so that it's downloaded using gzip compression.
(might already be this way, need to check first).
RDS is getting too many connections, cause each new FlowRegistry
object in Generator
creates a new connection to RDS. One such object should suffice. (see analysis)
current state
class Generator(GeneratorBase):
@classmethod
def get_schema(cls):
return json.load(open(SCHEMA_FILE))
@classmethod
def generate_pipeline(cls, source):
registry = FlowRegistry(DB_ENGINE)
count = 0
for pipeline in registry.list_pipelines(): # type: Pipelines
yield pipeline.pipeline_id, pipeline.pipeline_details
count += 1
logging.error('assember sent %d pipelines', count)
Solution
REGISTRY = FlowRegistry(DB_ENGINE)
class Generator(GeneratorBase):
@classmethod
def get_schema(cls):
return json.load(open(SCHEMA_FILE))
@classmethod
def generate_pipeline(cls, source):
count = 0
for pipeline in REGISTRY.list_pipelines(): # type: Pipelines
yield pipeline.pipeline_id, pipeline.pipeline_details
count += 1
logging.error('assember sent %d pipelines', count)
We need to export data into a zip if requested
dpp:streamedFrom
property to absolute path of the resource like it is done in load_modified_resources
https://github.com/datahq/assembler/blob/master/datapackage_pipelines_assembler/processors/load_modified_resources.py#L41NTS: i think we are starting to want a method that "in-directories" data: given a datapackage.json it downloads it, retrieves all data files converting path urls into local relative paths.
Full datapackage.json
datapackage.json
resources - are in directory
data
...
=> zip that directory ...
"Good" DataPackage - local data etc
datapackage.json
data/
// only the "primary" data (good csv if csv, excel if an excel etc)
archive/
// original data ...
=> zipped
Original DataPackage
datapackage.json
# if path was url it remains url, but we do reinline from rawstore
+ whatever files actually came with it in their correct locations
What we want to be zipped? - We have several different options:
How should datapackage.json look like?
What about other output formats? Eg sqlite - should it be zipped if both present?
If zip occurred as kind in source-pec we should append new processor dump.to_zip
to the pipeline, after add it as resource and export to s3.
Available parameter(s):
out-file: optional #defaults to {dataset-name}.zip
meta:
owner: <owner username>
ownerid: <owner unique id>
dataset: <dataset name>
version: 1
findability: <published/unlisted/private>
inputs:
-
kind: datapackage
url: <datapackage-url>
parameters:
resource-mapping:
<resource-name-or-path>: <resource-url>
outputs:
-
kind: zip
parameter:
out-file:: my path
After DataSet is published, if you try to download any file in json format you'll get something like this: https://pkgstore-testing.datahub.io/0e9b59cd50f1884058c1aa242d71a228/finance-vix/latest/.datahub/json/data/vix-daily.csv.json
Plus if you try and open the file it is still CSV.
This also impacts resource paths in datapackage.json
and crashes graphs and tables
...
resources: [{
name: ...
path: ".datahub/json/data/vix-daily.csv.json"
}]
As I understood we're prepending json
version of each csv
resource in a data package. However, in views
property of a descriptor we reference resources using initial indexes. So now our 0
resource is not expected csv
file but json
version of it.
Questions:
json
versions in the end of resources
? So this way it would work as expected.json
(or geojson
, topojson
etc.)?As a developer I want to have whole system under test so that whenever I make new changes I am sure that nothing else is broken.
As a product owner I want to be sure that site will work fine before some new feature is launched to production, so that our users have a good experience
As a developer I want to be able to add new features quickly which means e.g. I don't have to worry I've broken other stuff or do manual testing on the site all the time so that our users get new features faster
We are missing stats
(especially bytes
) attributes for packages AND for resources, see http://pkgstore.datahub.io/examples/geojson-tutorial/latest/datapackage.json
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.