Giter VIP home page Giter VIP logo

node-etl's Introduction

NPM Version NPM Downloads code coverage

ETL is a collection of stream based components that can be piped together to form a complete ETL pipeline with buffering, bulk-inserts and concurrent database streams. See the test directory for live examples.

npm install etl

Introductory example: csv -> elasticsearch

fs.createReadStream('scores.csv')
  // parse the csv file
  .pipe(etl.csv())
  // map `date` into a javascript date and set unique _id
  .pipe(etl.map(d => {
    d._id = d.person_id;
    d.date = new Date(d.date);
    return d;
  }))
  // collect 1000 records at a time for bulk-insert
  .pipe(etl.collect(1000))  
  // upsert records to elastic with max 10 concurrent server requests
  .pipe(etl.elastic.index(esClient,'scores','records',{concurrency:10}))
  // Switch from stream to promise chain and report done or error
  .promise()
  .then( () => console.log('done'), e => console.log('error',e));

API Reference

Parsers

# etl.csv([options])

Parses incoming csv text into individual records. For parsing options see csv-parser. If options contains a transform object containing functions, those functions will be applied on the values of any matching keys in the data. If a key in the transform object is set to null then value with that key will not be included in the downstream packets. If option santitize is set to true, then headers will be trimmed, converted to lowercase, spaces converted to underscore and any blank values (empty strings) will be set to undefined.

A header event will be emitted when headers have been parsed. An event listener can change the headers in-place before the stream starts piping out parsed data.

Example

// Here the test.csv is parsed but field dt is converted to date.  Each packet will 
// contain the following properties:  __filename, __path, __line and csv fields
etl.file('test.csv')
  .pipe(etl.csv({
    transform: {
      dt : function(d) {
        return new Date(d);
      }
    }
  })
  .pipe(...)

# etl.fixed(layout)

Parses incoming text into objects using a fixed width layout. The layout should be an object where each key is a field name that should be parsed, containing an object with start, end and/or length. Alternatively each key can just have a number, which will be deemed to be length.

If a key contains a transform function, it will be applied to the parsed value of that key.

The layout can also be supplied as an array where instead of an object key the fieldname is defined using property field in each element.

The length of a single record will be determined by the highest end or start+length position.

Each packet will contain __line, a number signifying the sequential position of the record

Example

// Reads the file and parses into individual records.  Each packet will contain the
// following properties:  __filename, __path, __line, firstCol, nextCol, lastCol.
// nextCol values are coerced into numbers here

var layout = {
  firstCol: {start: 0, end:10},
  nextCol: {length:5, transform: Number},
  lastCol: 5
}

etl.file('test.txt')
  .pipe(etl.fixed(layout))
  .pipe(...)

If needed, you can get the current line as param in the transform function.

var layout = {
  firstCol: {
    start: 0, end: 10,
    transform: function (value, line) {      
      console.log(`firstCol on line ${line} is: ${value}`); // firstCol on line 1 is: test
      return value;
    }
  }
}

Transforms

# etl.map(fn)

The base streamz object is exposed as etl.map to provide quick ability to do mappings on the fly. Anything pushed inside the custom function will go downstream. Also if the function has a return value (or a promise with a return value) that is !== undefined that return value will be pushed as well.

Example

// In this example names and ages and normalized to fresh objects pushed
// downstream.  (If we wanted to retain metadata we would use Object.create(d))

etl.file('test.csv')
  .pipe(etl.csv())
  .pipe(etl.map(function(d) {
    this.push({name: d.name_1, age: d.age_1});
    this.push({name: d.name_2, age: d.age_2});
    this.push({name: d.name_3, age: d.age_3});
  }))

# etl.split([symbol])

Splits the text of an incoming stream by the provided symbol into separate packets. The default symbol is a newline, i.e. splitting incoming text into invididual lines. If the supplied symbol is never found, the incoming stream will be buffered until the end, where all content is sent in one chunk. The prototype of each packet is the first incoming packet for each buffer.

Example

// Reads the file and splits `text` by newline
etl.file('text.txt')
  .pipe(etl.split())

# etl.cut(maxLength)

Cuts incoming text into text snippets of a given maximum length and pushes downstream.

# etl.collect(count [,maxDuration] [,maxTextLength])

Buffers incoming packets until they reach a specified count and then sends the array of the buffered packets downstream. At the end of the incoming stream, any buffered items are shipped off even though the count has not been achieved. This functionality can come in handy when preparing to bulk-insert into databases. An optional maxDuration can be supplied to signal the maximum amount of time (in ms) that can pass from a new collection starting until the contents are pushed out. This can come in handy when processing real time sporadic data where we want the collection to flush early even if the count has not been reached. Finally defining an optional maxTextLength will cause the stream to keep track of the stringified length of the puffer and push when it goes over the limit.

Example:

var data = [1,2,3,4,5,6,7,8,9,10,11],
    collect = etl.collect(3);

data.forEach(collect.write.bind(collect));
collect.end();

collect.pipe(etl.inspect());
// Should show 4 packets: [1,2,3]   [4,5,6]    [7,8,9]   [10,11]

If the first argument (count) is a function it will be used as a custom collection function. This function can add elements to the buffer by: this.buffer.push(data) and push buffer downstream by: this._push(). When stream is ended any remaining buffer is pushed automatically.

# etl.chain(fn)

Allows a custom subchain of streams to be injected into the pipe using duplexer3. You must provide a custom function that takes in the inbound stream as a first argument and optionally an outbound stream as the second argument. You can use the optional outbound stream directly to chain the two streams together or you can return a stream or a Promise resolved with stream or values (all of which will be piped down with etl.toStream).

Example 1: Simple return of the outbound stream

etl.file('test.csv')
  .pipe(etl.chain(function(inbound) {
    return inbound
      .pipe(etl.csv())
      .pipe(etl.collect(100));
  }))
  .pipe(console.log);

Example: Using the outbond stream from arguments

etl.file('test.csv')
  .pipe(etl.chain(function(inbound,outbound) {
    inbound
      .pipe(etl.csv())
      .pipe(etl.collect(100))
      .pipe(outbound);
  }))
  .pipe(console.log);

# etl.prescan(size,fn)

Buffers the incoming data until the supplied size is reached (either number of records for objects or buffer/string length). When target size is reached, the supplied function will be called with the buffered data (array) as an argument. After the function has executed and the returning promise (if any) has been resolved, all buffered data will be piped downstream as well as all subsequent data.

Prescan allows the user to make certain determinations from the incoming data before passing it down, such as inspecting data types across multiple rows.

Example:

// In this example we want to collect all columns for first 10 rows
// of a 
// to build a csv header row

let headers = new Set();
fs.createReadStream('data.json')
  .pipe(etl.split()) // split on newline
  .pipe(etl.map(d => JSON.parse(d)))  // parse each line as json
  .pipe(etl.prescan(10,d => 
    // build up headers from the first 10 lines
    d.forEach(d => Object.keys(d).forEach(key => headers.add(key)))
  ))
  .pipe(etl.map(function(d) => {
    this.firstline = this.firstline || this.push([...headers].join('.')+'\n');
    headers.map(header => d[header]).join('.')+'\n'))
  }))
  .pipe(fs.createWriteStream('data.csv'))

# etl.expand([convert])

Throughout the etl pipeline new packets are generated with incoming packets as prototypes (using Object.create). This means that inherited values are not enumerable and will not show up in stringification by default (although they are available directly). etl.expand() loops through all keys of an incoming packet and explicitly sets any inherited values as regular properties of the object

Example:

// In this example the `obj` would only show property `c` in stringify
// unless expanded first
var base = {a:1,b:'test'},
    obj = Object.create(base),
    s = etl.streamz();

obj.c = 'visible';
s.end(obj);

s.pipe(etl.expand())
  .pipe(etl.inspect());

The optional convert option will modify the keys of the new object. If convert is 'uppercase' or 'lowercase' the case of the keys will be adjusted accordingly. If convert is a function it will set the keyname to the function output (and if output is undefined that particular key will not be included in the new object)

# etl.stringify([indent] [,replacer] [,newline])

Transforms incoming packets into JSON stringified versions, with optional indent and replacer. If newline is true a \n will be appended to each packet.

# etl.inspect([options])

Logs incoming packets to console using util.inspect (with optional custom options)

# etl.timeout([ms])

A passthrough transform that emits an error if no data has passed through for at least the supplied milliseconds (ms). This is useful to manage pipelines that go stale for some reason and need to be errored out for further inspection.

Example:

// Here the pipeline times out if no data has been flowing to the file for at least 1 second
mongocollection.find({})
  .pipe(lookup)
  .timeout(1000)
  .pipe(etl.toFile('test.json'))

Databases

Mongo

# etl.mongo.insert(collection [,options])

Inserts incoming data into the provided mongodb collection. The supplied collection can be a promise on a collection. The options are passed on to both streamz and the mongodb insert comand. By default this object doesn't push anything downstream, but if pushResults is set as true in options, the results from mongo will be pushed downstream.

Example

// The following inserts data from a csv, 10 records at a time into a mongo collection
// ..assuming mongo has been promisified

var db = mongo.ConnectAsync('mongodb://localhost:27017/testdb');
var collection = db.then(function(db) {
  return db.collection('testcollection');
});

etl.file('test.csv')
  .pipe(etl.csv())
  .pipe(etl.collect(10))
  .pipe(etl.mongo.insert(collection));

# etl.mongo.update(collection [,keys] [,options])

Updates incoming data by building a criteria from an array of keys and the incoming data. Supplied collection can be a promise and results can be pushed downstream by declaring pushResults : true. The options are passed to mongo so defining upsert : true in options will ensure an upsert of the data.

Example

// The following updates incoming persons using the personId as a criteria (100 records at a time)

etl.file('test.csv')
  .pipe(etl.csv())
  .pipe(etl.collect(100))
  .pipe(etl.mongo.update(collection,['personId']));

# etl.mongo.upsert(collection [,keys] [,options])

Syntax sugar for mongo.update with {upsert: true}

By default update and upsert will take each data object and wrap it within a $set{}. If you want to have full control of the mongo update verbs used you can put them under $update in the data object.

Mysql

# etl.mysql.upsert(pool, schema, table [,options])

Pipeline that scripts incoming packets into bulk sql commands (etl.mysql.script) and executes them (etl.mysql.execute) using the supplied mysql pool. When the size of each SQL command reaches maxBuffer (1mb by default) the command is sent to the server. Concurrency is managed automatically by the mysql poolSize.

Example:

etl.file('test.csv')
  .pipe(etl.csv())
  .pipe(etl.mysql.upsert(pool,'testschema','testtable',{concurrency:4 }))

# etl.mysql.script(pool, schema, table [,options])

Collects data and builds up a mysql statement to insert/update data until the buffer is more than maxBuffer (customizable in options). Then the maxBuffer is reached, a full sql statement is pushed downstream. When the input stream has ended, any remaining sql statement buffer will be flushed as well.

The script stream first establishes the column names of the table being updated, and as data comes in - it uses only the properties that match column names in the table.

# etl.mysql.execute(pool [,options])

This component executes any incoming packets as sql statements using connections from the connection pool. The maximum concurrency is automatically determined by the mysql poolSize, using the combination of callbacks and Promises.

Example:

// The following bulks data from the csv into sql statements and executes them with 
// a maximum of 4 concurrent connections

etl.file('test.csv')
  .pipe(etl.csv())
  .pipe(etl.mysql.script(pool,'testschema','testtable'))
  .pipe(etl.mysql.execute(pool,4))

Postgres

# etl.postgres.upsert(pool, schema, table [,options])

Pipeline that scripts incoming packets into bulk sql commands (etl.postgres.script) and executes them (etl.postgres.execute) using the supplied postgres pool. When the size of each SQL command reaches maxBuffer (1mb by default) the command is sent to the server. Concurrency is managed automatically by the postgres poolSize. If primary key is defined and an incoming data packet contains a primary key that already exists in the table, the record will be updated - otherwise the packet will be inserted.

Example:

etl.file('test.csv')
  .pipe(etl.csv())
  .pipe(etl.postgres.upsert(pool,'testschema','testtable',{concurrency:4 }))

# etl.postgres.script(pool, schema, table [,options])

Collects data and builds up a postgres statement to insert/update data until the buffer is more than maxBuffer (customizable in options). Then the maxBuffer is reached, a full sql statement is pushed downstream. When the input stream has ended, any remaining sql statement buffer will be flushed as well.

The script stream first establishes the column names of the table being updated, and as data comes in - it uses only the properties that match column names in the table.

# etl.postgres.execute(pool [,options])

This component executes any incoming packets as sql statements using connections from the connection pool. The maximum concurrency is automatically determined by the postgres poolSize, using the combination of callbacks and Promises.

Example:

// The following bulks data from the csv into sql statements and executes them with 
// a maximum of 4 concurrent connections

etl.file('test.csv')
  .pipe(etl.csv())
  .pipe(etl.postgres.script(pool,'testschema','testtable'))
  .pipe(etl.postgres.execute(pool,4))

Elasticsearch

# etl.elastic.bulk(action, client, [,index] [,type] [,options])

Transmit incoming packets to elasticsearch, setting the appropriate meta-data depending on the default action. Each incoming packet can be an array of documents (or a single document). Each document should contain a unique _id. To bulk documents together use etl.collect(num) above the elastic adapter.

The results are not pushed downstream unless pushResults is defined in the options. The body of the incoming data is included in the results, allowing for easy resubmission upon version conflicts. By defining option pushErrors as true only errors will be pushed downstream. Maximum number of concurrent connections can be defined as option concurrency. If maxRetries is defined in options, an error response from the server will result in retries up to the specified limit - after a wait of retryDelay or 30 seconds. This can be useful for long-running upsert operations that might encounter the occasional network or timeout errors along the way. If debug is defined true, the error message will be printed to console before retrying. maxRetries should only be used for data with user-supplied _id to prevent potential duplicate records on retry.

An exponential backoff is provided by defining backoffDelay in options. The backoff can be capped by defining maxBackoffDelay and variance can be applied by defining backoffVariance (should be between 0-1)

If index or type are not specified when the function is called, they will have to be supplied as _index and _type properties of each document. The bulk command first looks for _source in the document to use as a document body (in case the document originates from a scroll command), alternatively using the document itself as a body.

Available actions are also provided as separate api commands:

  • etl.elastic.index(client,index,type,[options])
  • etl.elastic.update(client,index,type,[options])
  • etl.elastic.upsert(client,index,type,[options])
  • etl.elastic.delete(client,index,type,[options])
  • etl.elastic.custom(client,index,type,[options])

Example

etl.file('test.csv')
  .pipe(etl.csv())
  .pipe(etl.collect(100))
  .pipe(etl.elastic.index(esClient,'testindex','testtype'))

Another example shows how one index can be copied to another, retaining the _type of each document:

console.time('copy');
etl.elastic.scroll(esClient,{index: 'index.a', size: 5000})
  .pipe(etl.collect(1000))
  .pipe(etl.elastic.index(esClient,'index.b',null,{concurrency:10}))
  .promise()
  .then(function() {
    console.timeEnd('copy');
  });

If custom action is selected, each packet must be the raw metadata to be sent to elasticsearch with the optional second line stored in property body

Since _type is no longer allowed in elasticsearch > 7, it should be set as undefined for use in later versions.

BigQuery

# etl.bigquery.insert(table, [,options])

Bulk insert data into BigQuery. This function first downloads the field names for the table and then inserts the matching columns from the incoming data. The first variable needs to be an instance of the BigQuery table class. Options can specify the concurrency (i.e. how many concurrent insert connections are allowed);

example:

const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery(config);
const dataset = bigquery.dataset('my_dataset');
const table = dataset.table('my_table');

 etl.file('test.csv')
  .pipe(etl.collect(100))  // send 100 rows in each packet
  .pipe(etl.bigquery.insert(table, {concurrency: 5}));

Cluster

# etl.cluster.schedule(list [,num_threads] [,reportingInterval])

Schedules a list (array) of tasks to be performed by workers. Returns a promise on the completion of all the tasks. Number of threads will default to number of cpus. If reporting interval is defined - progress will be reported in console.log.Should only be run from the master thread.

# etl.cluster.process(data [callback])

This function should be overwritten in the worker to perform each task and either return a Promise that is resolved when the task is done or call the optional callback.

# etl.cluster.process(num)

This function sends a numerical value representing progress up to the master (for reporting).

Utilities

# etl.toStream(data)

A helper function that returns a stream that is initialized by writing every element of the supplied data (if array) before being ended. This allows for an easy transition from a known set of elements to a flowing stream with concurrency control. The input data can also be supplied as a promise or a function and the resulting values will be piped to the returned stream. If the resulting value from a supplied function or promise is a stream, it will be piped downstream.

# etl.file(data [,options])

Opens up a fileStream on the specified file and pushes the content downstream. Each packet has a base prototype of of either an optional info object provided in options or the empty object. The following properties are defined for each downstream packet: __filename, '__path' and text containing incremental contents of the file.

The optional info object allows setting generic properties that will, through inheritance, be available in any derived packets downstream.

Example:

// each packet will contain  properties context, __filename, __path and text
etl.file('text.txt',{info: {context: 'test'}})

# etl.toFile(filename)

This is a convenience wrapper for fs.createWriteStream that returns a streamz object. This allows appending .promise() to capture the finish event (or error) in a promise form.

Example:

etl.toStream([1,2,3,4,5])
  .pipe(etl.stringify(0,null,true))
  .pipe(etl.toFile('/tmp/test.txt'))
  .promise()
  .then(function() {
    console.log('done')
  })
  .catch(function(e) {
    console.log('error',e);
  })

# etl.keepOpen([timeout])

etl.keepOpen([timeout]) is a passthrough component that stays open after receiving an end only to finally close down when no data has passed through for a period of [timeout]. This can be useful for any pipelines where data from lower part of the pipeline is pushed back higher for reprocessing (for example when encountering version conflicts of database documents) - as it will avoid write after end error. The default timeout is 1000ms

Testing

Testing environment is provided using docker-compose.

npm test starts docker-compose (if not already running) and executes the test suite.

You can run individual tests from the docker directly. To enter the docker type npm run docker

node-etl's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

node-etl's Issues

How to Export to csv file?

`p.stream('select * from users')

.pipe(etl.stringify(0,null,true))
.pipe(etl.toFile('txt/test.txt'))
.promise()
.then(function() {
console.log('done')
})
.catch(function(e) {
console.log('error',e);
});`

I'm try like this, but how to do next?

__line

  • Operating System: Windows 10
  • Node Version: v14.16.0
  • NPM Version: 6.14.11
  • etl Version: 0.6.12

Expected Behavior / Situation

.pipe(csv({ originalLine: false }))
/*
a;b;c            [
1;2;3   --->     {a:1, b:2, c:3},
4;5;6               {a:4, b:5, c:6},
                   ]
*/

Actual Behavior / Situation

.pipe(csv())
/*
a;b;c            [
1;2;3   --->     {a:1, b:2, c:3, __line: 2},
4;5;6               {a:4, b:5, c:6, __line: 3},
                   ]
*/

Modification Proposal

To have an option ("originalLine" in example) to inhibit the addition of "__line" property to the resulting objects. It's bad for SQL insert, for example.

Elasticsearch - pushResults and pushErrors doesn't push the data to downstream

In elasticsearch bulk operation pushResults and pushErrors have not effect as the data is not being pushed to the stream .

  1. options is stated as pushResults but internally code is checking for pushResult
    if (!self.options.pushResult && !self.options.pushErrors) return;

  2. results data is not being pushed to the stream , so cannot be caught in the downstream

.pipe(etl.elastic.index(esClient,index,'type', {
    concurrency : 5,
    pushResult : true,
    pushErrors : true
  }))
.pipe(etl.map(function (d) {
    console.log(d);
    return d
  }))

this doesn't work.

@ZJONSSON could you please take a look ?

More fixed support

Fixed length layouts are very efficient as the data can be determined immediately without having to regex through delimiters. Currently only parsing a fixed with file is supported - additional functionality should be:

  • create fixed layout file (with or without newline)
  • read specific records from a file (provided fixed-encoding such as UTF-32 or ASCII

Close mysql stream on error

If an error occurs in the consumer of a mysql stream, the connection is returned to the connection pool. However, since the connection hasn't been explicitly closed, it's still actively trying to deliver the original results. Any subsequent request that gets this connection will be unable to submit a request. We need to explicitly destroy the connection so it doesn't get reallocated to the pool.

Parallel processing

Thanks for this excellent library, I have successfully used this to transform 40m records from Postgres to DynamoDb (I see there is an open issue from this so will work on a PR). I am working on a new etl pipeline which takes a csv, enriches it with a call to a web service and then generates a new csv with a line for each record. The file is huge (22m lines) and I need to be able to make multiple service calls in parallel for this to be efficient (~40). I think I need to use the cluster/worker model but I can’t work out how to use this. Would it be possible to add and example of how this should work? Many thanks.

Mongo "pushResults" option

Hi and thank you for the package!
Observed a small typo in the readMe. As per docs if want to stream down the result of the mongo operation then need to pass extra option {"pushResults": true}. However the correct option is {"pushResult": true}

.pipe(etl.mongo.update(collection, ["_id"], {"pushResult": true}))
.pipe(etl.map(mongoResult => {
      
}))

Mongo insert is deprecated

Using mongo:

etl.file('test.csv')
  .pipe(etl.csv())
  .pipe(etl.collect(10))
  .pipe(etl.mongo.insert(collection));

Will result in:

(node:62919) DeprecationWarning: collection.insert is deprecated. Use insertOne, insertMany or bulkWrite instead.

mongo ConnectAsync undefined

Hi

It seems that ConnectAsync has been depreciated, and I'm unable to run the following example.

var db = mongo.ConnectAsync('mongodb://localhost:27017/testdb');
var collection = db.then(function(db) {
  return db.collection('testcollection');
});

etl.file('test.csv')
  .pipe(etl.csv())
  .pipe(etl.collect(10))
  .pipe(etl.mongo.insert(collection));

I have tried using newer newer syntax, but getting TypeError: Cannot read property 'insert' of undefined

const fs  = require('fs')
const etl = require('etl');

const MongoClient = require("mongodb").MongoClient;

var collection;

(async() => {
    await MongoClient.connect('mongodb://127.0.0.1:3001', function (err, client) {
        if (err) throw err;

        var db = client.db('meteor');
     
        collection = db.collection("Carrier")
    }); 
})()

fs.createReadStream("ins.csv")
  // parse the csv file
  .pipe(etl.csv())
  // map `date` into a javascript date and set unique _id
  .pipe(
    etl.map(d => {
     return d
  )
  // collect 1000 records at a time for bulk-insert
  .pipe(etl.collect(10))
  .pipe(etl.mongo.insert(collection))
  .promise()
  .then(() => console.log("done"), e => console.log("error", e));

I'm seeing the following error:

node index.js
(node:97320) DeprecationWarning: current URL string parser is deprecated, and will be removed ina future version. To use the new parser, pass option { useNewUrlParser: true } to MongoClient.connect.
error TypeError: Cannot read property 'insert' of undefined
    at collection.then.collection (/Users/dck/Projects/csv-load/node_modules/etl/lib/mongo/insert.js:24:35)
    at tryCatcher (/Users/dck/Projects/csv-load/node_modules/bluebird/js/release/util.js:16:23)
    at Promise._settlePromiseFromHandler (/Users/dck/Projects/csv-load/node_modules/bluebird/js/release/promise.js:512:31)
    at Promise._settlePromise (/Users/dck/Projects/csv-load/node_modules/bluebird/js/release/promise.js:569:18)
    at Promise._settlePromiseCtx (/Users/dck/Projects/csv-load/node_modules/bluebird/js/release/promise.js:606:10)
    at _drainQueueStep (/Users/dck/Projects/csv-load/node_modules/bluebird/js/release/async.js:142:12)
    at _drainQueue (/Users/dck/Projects/csv-load/node_modules/bluebird/js/release/async.js:131:9)
    at Async._drainQueues (/Users/dck/Projects/csv-load/node_modules/bluebird/js/release/async.js:147:5)
    at Immediate.Async.drainQueues [as _onImmediate] (/Users/dck/Projects/csv-load/node_modules/bluebird/js/release/async.js:17:14)
    at processImmediate (timers.js:632:19)
Unhandled rejection TypeError: Cannot read property 'insert' of undefined
    at collection.then.collection (/Users/dck/Projects/csv-load/node_modules/etl/lib/mongo/insert.js:24:35)
    at tryCatcher (/Users/dck/Projects/csv-load/node_modules/bluebird/js/release/util.js:16:23)
    at Promise._settlePromiseFromHandler (/Users/dck/Projects/csv-load/node_modules/bluebird/js/release/promise.js:512:31)
    at Promise._settlePromise (/Users/dck/Projects/csv-load/node_modules/bluebird/js/release/promise.js:569:18)
    at Promise._settlePromiseCtx (/Users/dck/Projects/csv-load/node_modules/bluebird/js/release/promise.js:606:10)
    at _drainQueueStep (/Users/dck/Projects/csv-load/node_modules/bluebird/js/release/async.js:142:12)
    at _drainQueue (/Users/dck/Projects/csv-load/node_modules/bluebird/js/release/async.js:131:9)
    at Async._drainQueues (/Users/dck/Projects/csv-load/node_modules/bluebird/js/release/async.js:147:5)
    at Immediate.Async.drainQueues [as _onImmediate] (/Users/dck/Projects/csv-load/node_modules/bluebird/js/release/async.js:17:14)
    at processImmediate (timers.js:632:19)

Any ideas?

Postgres upsert non-functional.

Feeding a stream containing existing Postgres pkeys to etl.postgres.upsert results in a duplicate key error, instead of an upsert statement. Manually adding the primary keys to the postgres script.js file and removing the "IF" statement around it (line 100) allows it to work. It would appear that primary keys are not properly being grabbed/fed to this function.

I may attempt to troubleshoot if I have time, but I'm just bringing it to your attention.

ETL Pipe:
etl.postgres.upsert(pool,'public','testdumps',{concurrency:4 });
Error when existing record piped to above:
ERROR: duplicate key value violates unique constraint "testdumps_pkey"

SQL Server

Can I use this package to insert data into SQL Server?

Migrating to class syntax

Will you be open to migrating to the ES6 class syntax. Would like to hear your thoughts on the following sample piece of code taken from lib/postgres/execute.js before I start doing a full blown PR.

Eg:

Existing Code

const Postgres = require('./postgres');
const util = require('util');

function Execute(pool,options) {
  if (!(this instanceof Execute))
    return new Execute(pool,options);

  options = options || {};
  Postgres.call(this,pool,options);

}

util.inherits(Execute,Postgres);

Execute.prototype._fn = function(d,cb) {
  // TODO make transaction or use {maxBuffer:1} in options
  //console.log(d);
  return this.query(d,cb)
    .then(d => this.options.pushResult && d || undefined);
};

module.exports = Execute;

Proposed Changes

new code

Passes the test

node test/postgres-test.js
TAP version 13
# Subtest: postgres
    # Subtest: inserts
        ok 1 - returns correct length
        1..1
    ok 1 - inserts # time=27.359ms

    # Subtest: and records are verified
        ok 1 - records verified
        1..1
    ok 2 - and records are verified # time=3.427ms

    # Subtest: streaming
        ok 1 - work
        1..1
    ok 3 - streaming # time=3.507ms

    1..3
ok 1 - postgres # time=56.602ms

1..1
# time=63.281ms

Typescript typings

Would it be possible to get Typescript typings for this library?

I'll start making my own, but it probably won't be comprehensive.

MSSQL Support

Would it be difficult to implement a MSSQL database lib?

thanks

Brian

Missing implementation of `maxBuffer` in Postgres

@willfarrell @ZJONSSON

The documentation states

# etl.postgres.script(pool, schema, table [,options])

Collects data and builds up a postgres statement to insert/update data until the buffer is more than maxBuffer (customizable in options). Then the maxBuffer is reached, a full sql statement is pushed downstream. When the input stream has ended, any remaining sql statement buffer will be flushed as well.

However the code of Postgres is missing the maxBuffer implementation.

Postgres Upsert - Unique Field Support

Currently, postgres upsert only takes primary keys into consideration for generating ON CONFLICT statements. It would be helpful to also be able to upsert against unique fields. One solution would be to be able to pass in 'pkeys' via options.

Current code:
if (this.upsert) this.pkeys = this.getPrimaryKeys();

Suggested Change:
if (this.upsert) this.pkeys = this.options.pkeys ? Promise.resolve(this.options.pkeys) : this.getPrimaryKeys();

Suppport for MongoDb $setOnInsert

Currently, there is no way to add additional commands, such as $setOnInsert, to the Mongo bulk operation. Use case is to set a 'dateCreated' field when a document is inserted when upsert is set to true.

Postgres Upsert Requires All NOT NULL values to be included in the object.

Been fighting an issue today where I couldn't do a simple upsert of an object looked like {id: 123, value: "myvalue"}

The primary key of the table is id but the import would fail with a "null value in column "required_value" violates not-null constraint" error.

As soon as I include the two required values, everything runs smoothly.

Digging into the script.js for postgres, I believe it is because of how the upsert logic works.

INSERT INTO public.mytable (
    "id",
    "required_value",
    "myvalue",
    ... long list of columns
  )
VALUES
  (
    123,
    DEFAULT
    "insertedvalue",
    ... long list of DEFAULT
  ) ON CONFLICT ("id") DO
UPDATE
SET
  "id" = 6,

Because "required_value" is NOT NULL and there is no default, the query fails. No idea how to fix it as SQL isn't my strength, but this appears to be a bug.

Elasticsearch bulk indexing module - warning being thrown

There is a warning being thrown in elasticsearch bulk indexing module , may be a we don't need to return promise from execute function

https://github.com/petkaantonov/bluebird/blob/master/docs/docs/warning-explanations.md#warning-a-promise-was-created-in-a-handler-but-was-not-returned-from-it

Warning: a promise was created in a handler but was not returned from it
    at execute (/node_modules/etl/lib/elasticsearch/bulk.js:77:20)
    at Bulk._fn (/node_modules/etl/lib/elasticsearch/bulk.js:119:10)
    at Bulk.Streamz._transform (/node_modules/etl/node_modules/streamz/streamz.js:97:16)
    at Bulk.Transform._read (_stream_transform.js:167:10)
    at Bulk.Transform._write (_stream_transform.js:155:12)
    at doWrite (_stream_writable.js:300:12)
    at clearBuffer (_stream_writable.js:407:7)
    at onwrite (_stream_writable.js:339:7)
    at WritableState.onwrite (_stream_writable.js:89:5)
    at afterTransform (_stream_transform.js:79:3)
    at TransformState.afterTransform (_stream_transform.js:54:12)
    at pop (/node_modules/etl/node_modules/streamz/streamz.js:79:29)
    at done (/node_modules/etl/node_modules/streamz/streamz.js:86:5)

Streaming tableA to tableB and updating streamed tableA records

Hi,

I am wondering about the following scenario in MySQL.

const query = db.query(`SELECT * FROM table_a WHERE is_migrated=?`, [0])

query
  .stream()
  .pipe(
    stream.Transform({
      objectMode: true,
      transform: (row, encoding, callback) => {
        callback(null, processRow(row))
      }
    })
    .on('error', err => console.log('transform pipe error', err))
    .on('end', () => console.log('transform pipe end'))
  )
  .pipe(etl.mysql.upsert(pool, 'db_name', 'table_b'))
  // ... for all migrated records from table_a set is_migrated=1 
  .promise()
  .then(data => console.log('done', data))
  .catch(err => console.error('err:', err))

select from tableA where is_migrated=0
do transformations for each row
insert them to tableB
update tableA for all inserted records to tableB and set is_migrated=1

I have an idea of building an array of IDs and then doing a separate UPDATE, but it seems hacky, especially since the array could grow very quickly due to a big data set. Is there a better way of doing it?

Also, is it possible to use INSERT ... ON DUPLICATE KEY UPDATE for upsert? REPLACE INTO becomes an issue in our use case where it would eat up auto_increment IDs very quickly.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.