Giter VIP home page Giter VIP logo

event-stream's Introduction

EventStream

Streams are node's best and most misunderstood idea, and EventStream is a toolkit to make creating and working with streams easy.

Normally, streams are only used for IO, but in event stream we send all kinds of objects down the pipe. If your application's input and output are streams, shouldn't the throughput be a stream too?

The EventStream functions resemble the array functions, because Streams are like Arrays, but laid out in time, rather than in memory.

All the event-stream functions return instances of Stream.

event-stream creates 0.8 streams, which are compatible with 0.10 streams.

NOTE: I shall use the term "through stream" to refer to a stream that is writable and readable.

NOTE for Gulp users: Merge will not work for gulp 4. merge-stream should be used.

//pretty.js

if(!module.parent) {
  var es = require('event-stream')
  var inspect = require('util').inspect

  process.stdin                        //connect streams together with `pipe`
    .pipe(es.split())                  //split stream to break on newlines
    .pipe(es.map(function (data, cb) { //turn this async function into a stream
      cb(null
        , inspect(JSON.parse(data)))   //render it nicely
    }))
    .pipe(process.stdout)              // pipe it to stdout !
}

run it ...

curl -sS registry.npmjs.org/event-stream | node pretty.js

node Stream documentation

through (write?, end?)

Re-emits data synchronously. Easy way to create synchronous through streams. Pass in optional write and end methods. They will be called in the context of the stream. Use this.pause() and this.resume() to manage flow. Check this.paused to see current flow state. (write always returns !this.paused)

this function is the basis for most of the synchronous streams in event-stream.

es.through(function write(data) {
    this.emit('data', data)
    //this.pause() 
  },
  function end () { //optional
    this.emit('end')
  })

map (asyncFunction)

Create a through stream from an asynchronous function.

var es = require('event-stream')

es.map(function (data, callback) {
  //transform data
  // ...
  callback(null, data)
})

Each map MUST call the callback. It may callback with data, with an error or with no arguments,

  • callback() drop this data.
    this makes the map work like filter,
    note:callback(null,null) is not the same, and will emit null

  • callback(null, newData) turn data into newData

  • callback(error) emit an error for this item.

Note: if a callback is not called, map will think that it is still being processed,
every call must be answered or the stream will not know when to end.

Also, if the callback is called more than once, every call but the first will be ignored.

mapSync (syncFunction)

Same as map, but the callback is called synchronously. Based on es.through

flatmapSync (syncFunction)

Map elements nested.

var es = require('event-stream')

es.flatmapSync(function (data) {
  //transform data
  // ...
  return data
})

filterSync (syncFunction)

Filter elements.

var es = require('event-stream')

es.filterSync(function (data) {
  return data > 0
})

split (matcher)

Break up a stream and reassemble it so that each line is a chunk. matcher may be a String, or a RegExp

Example, read every line in a file ...

fs.createReadStream(file, {flags: 'r'})
  .pipe(es.split())
  .pipe(es.map(function (line, cb) {
    //do something with the line 
    cb(null, line)
  }))

split takes the same arguments as string.split except it defaults to '\n' instead of ',', and the optional limit parameter is ignored. String#split

NOTE - Maintaining Line Breaks
If you want to process each line of the stream, transform the data, reassemble, and KEEP the line breaks the example will look like this:

fs.createReadStream(file, {flags: 'r'})
  .pipe(es.split(/(\r?\n)/))
  .pipe(es.map(function (line, cb) {
    //do something with the line 
    cb(null, line)
  }))

This technique is mentioned in the underlying documentation for the split npm package.

join (separator)

Create a through stream that emits separator between each chunk, just like Array#join.

(for legacy reasons, if you pass a callback instead of a string, join is a synonym for es.wait)

merge (stream1,...,streamN) or merge (streamArray)

concat โ†’ merge

Merges streams into one and returns it. Incoming data will be emitted as soon it comes into - no ordering will be applied (for example: data1 data1 data2 data1 data2 - where data1 and data2 is data from two streams). Counts how many streams were passed to it and emits end only when all streams emitted end.

es.merge(
  process.stdout,
  process.stderr
).pipe(fs.createWriteStream('output.log'));

It can also take an Array of streams as input like this:

es.merge([
  fs.createReadStream('input1.txt'),
  fs.createReadStream('input2.txt')
]).pipe(fs.createWriteStream('output.log'));

replace (from, to)

Replace all occurrences of from with to. from may be a String or a RegExp.
Works just like string.split(from).join(to), but streaming.

parse

Convenience function for parsing JSON chunks. For newline separated JSON, use with es.split. By default it logs parsing errors by console.error; for another behaviour, transforms created by es.parse({error: true}) will emit error events for exceptions thrown from JSON.parse, unmodified.

fs.createReadStream(filename)
  .pipe(es.split()) //defaults to lines.
  .pipe(es.parse())

stringify

convert javascript objects into lines of text. The text will have whitespace escaped and have a \n appended, so it will be compatible with es.parse

objectStream
  .pipe(es.stringify())
  .pipe(fs.createWriteStream(filename))

readable (asyncFunction)

create a readable stream (that respects pause) from an async function.
while the stream is not paused,
the function will be polled with (count, callback),
and this will be the readable stream.

es.readable(function (count, callback) {
  if(streamHasEnded)
    return this.emit('end')
  
  //...
  
  this.emit('data', data) //use this way to emit multiple chunks per call.
      
  callback() // you MUST always call the callback eventually.
             // the function will not be called again until you do this.
})

you can also pass the data and the error to the callback.
you may only call the callback once.
calling the same callback more than once will have no effect.

readArray (array)

Create a readable stream from an Array.

Just emit each item as a data event, respecting pause and resume.

  var es = require('event-stream')
    , reader = es.readArray([1,2,3])

  reader.pipe(...)

If you want the stream behave like a 0.10 stream you will need to wrap it using Readable.wrap() function. Example:

	var s = new stream.Readable({objectMode: true}).wrap(es.readArray([1,2,3]));

writeArray (callback)

create a writeable stream from a callback,
all data events are stored in an array, which is passed to the callback when the stream ends.

  var es = require('event-stream')
    , reader = es.readArray([1, 2, 3])
    , writer = es.writeArray(function (err, array){
      //array deepEqual [1, 2, 3]
    })

  reader.pipe(writer)

pause ()

A stream that buffers all chunks when paused.

  var ps = es.pause()
  ps.pause() //buffer the stream, also do not allow 'end' 
  ps.resume() //allow chunks through

duplex (writeStream, readStream)

Takes a writable stream and a readable stream and makes them appear as a readable writable stream.

It is assumed that the two streams are connected to each other in some way.

(This is used by pipeline and child.)

  var grep = cp.exec('grep Stream')

  es.duplex(grep.stdin, grep.stdout)

child (child_process)

Create a through stream from a child process ...

  var cp = require('child_process')

  es.child(cp.exec('grep Stream')) // a through stream

wait (callback)

waits for stream to emit 'end'. joins chunks of a stream into a single string or buffer. takes an optional callback, which will be passed the complete string/buffer when it receives the 'end' event.

also, emits a single 'data' event.

readStream.pipe(es.wait(function (err, body) {
  // have complete text here.
}))

Other Stream Modules

These modules are not included as a part of EventStream but may be useful when working with streams.

Like Array.prototype.reduce but for streams. Given a sync reduce function and an initial value it will return a through stream that emits a single data event with the reduced value once the input stream ends.

var reduce = require("stream-reduce");
process.stdin.pipe(reduce(function(acc, data) {
  return acc + data.length;
}, 0)).on("data", function(length) {
  console.log("stdin size:", length);
});

event-stream's People

Contributors

andrejewski avatar blopker avatar detailyang avatar dominictarr avatar felixrabe avatar feross avatar floatdrop avatar gdw2 avatar holdinghandsfeedingducks avatar jeffbski avatar johnw424 avatar justphil avatar kevinsawicki avatar krisajenkins avatar magomogo avatar mattsmith14 avatar mhart avatar nantas avatar natlownes avatar parshap avatar psalaets avatar raynos avatar rolias avatar rubytuesdaydono avatar samolsen avatar sorrycc avatar sukima avatar tjmehta avatar treyhunner avatar xecycle avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

event-stream's Issues

exception in mapSync causes fatal error

The error thrown in the middle of the mapSync() cases application exception. Shouldn't mapSync handle that? Also, do I need to do on('error') after each stream transformation?

return new Promise(function (resolve, reject) {
    var s = fs.createReadStream(srcFile)
        .on('error', reject)
        .pipe(es.split())
        .on('error', reject)
        .pipe(es.mapSync(function (line) {
            ...
            throw new Error();
            ...
        }).on('error', reject)
        .pipe(fs.createWriteStream(outputFile))
        .on('error', reject)
        .on('finish', function () {
            resolve(outputFile);
        });
});

Option to follow Node 0.10 readable stream emit behavior?

Right now, when I create a new stream (e.g. through es.readArray()) it starts emitting data in the nextTick.

I'd like it to follow the Node 0.10 behavior, that is, start the stream paused by default and automatically resume it once an .on('data') listener/.pipe() is attached.

Would this be possible/viable to implement for all methods that return a readable stream?

npm install event-stream --> npm ERR! code EJSONPARSE

$ npm install --save https://github.com/dominictarr/event-stream/tarball/master
npm WARN package.json [email protected] keywords should be an array of strings
npm ERR! Windows_NT 6.1.7601
npm ERR! argv "c:\dev\iojs\node.exe" "c:\dev\iojs\node_modules\npm\bin\npm-cli.js" "install
" "--save" "https://github.com/dominictarr/event-stream/tarball/master"
npm ERR! node v1.8.1
npm ERR! npm v2.8.3
npm ERR! file C:\Users\ahausden\AppData\Roaming\npm-cache\stream-combiner\0.0.4\package\package.json

npm ERR! code EJSONPARSE

npm ERR! Failed to parse json
npm ERR! No data, empty input at 1:1
npm ERR!
npm ERR! ^
npm ERR! File: C:\Users\ahausden\AppData\Roaming\npm-cache\stream-combiner\0.0.4\package\package.jso
n
npm ERR! Failed to parse package.json data.
npm ERR! package.json must be actual JSON, not just JavaScript.
npm ERR!
npm ERR! This is not a bug in npm.
npm ERR! Tell the package author to fix their package.json file. JSON.parse

npm ERR! Please include the following file with any support request:

Property 'read' of object #<Socket> is not a function

The simple script below fails reading a text file with one JavaScript object per line. Why?

var fs = require("fs"),
    es = require("event-stream");
es.pipeline(
    fs.createReadStream("./file.txt", {flags: 'r'}),
    es.split('\n'),
    process.stdout
);

Some sample content from the input file is below:

{"JsonTimetableV1":{"classification":"public","timestamp":1397171668,"owner":"Network Rail","Sender":{"organisation":"Rockshore","application":"NTROD","component":"SCHEDULE"},"Metadata":{"type":"full","sequence":666}}}
{"JsonAssociationV1":{"transaction_type":"Create","main_train_uid":"W20936","assoc_train_uid":"W20815","assoc_start_date":"2013-12-14T00:00:00Z","assoc_end_date":"2014-05-17T00:00:00Z","assoc_days":"0000010","category":"NP","date_indicator":"S","location":"RAMSGTE","base_location_suffix":null,"assoc_location_suffix":null,"diagram_type":"T","CIF_stp_indicator":"P"}}
(...)

The error is:

_stream_readable.js:745
    while (!paused && (null !== (c = stream.read())))
                                            ^
TypeError: Property 'read' of object #<Socket> is not a function
    at Socket.<anonymous> (_stream_readable.js:745:45)
    at Socket.EventEmitter.emit (events.js:92:17)
    at emitDataEvents (_stream_readable.js:771:10)
    at Socket.Readable.on (_stream_readable.js:692:5)
    at proxyStream (/Users/giacecco/Projects/stomp-test/node_modules/event-stream/node_modules/duplexer/index.js:65:16)
    at Array.forEach (native)
    at forEach (/Users/giacecco/Projects/stomp-test/node_modules/event-stream/node_modules/duplexer/index.js:11:20)
    at duplex (/Users/giacecco/Projects/stomp-test/node_modules/event-stream/node_modules/duplexer/index.js:27:5)
    at Object.module.exports (/Users/giacecco/Projects/stomp-test/node_modules/event-stream/node_modules/stream-combiner/index.js:8:17)
    at Object.<anonymous> (/Users/giacecco/Projects/stomp-test/test3.js:4:4)

I am using node v0.10.26. Thanks,

Giacecco

npm install: failed to parse package.json

Just wanted to install event-stream via npm with command:
npm install event-stream -g
but i only got the same error:

`$ npm install event-stream -g
npm ERR! Failed to parse json
npm ERR! Unexpected end of input
npm ERR! File: C:\Users\myUser\AppData\Roaming\npm-cache\stream-combiner\0.0.4\package\package.jso
n
npm ERR! Failed to parse package.json data.
npm ERR! package.json must be actual JSON, not just JavaScript.
npm ERR!
npm ERR! This is not a bug in npm.
npm ERR! Tell the package author to fix their package.json file. JSON.parse

npm ERR! System Windows_NT 6.1.7601
npm ERR! command "c:\dev\nodejs\node.exe" "c:\dev\nodejs\node_modules\npm\bin\npm-cli.js" "
install" "event-stream" "-g"
npm ERR! cwd z:\myService
npm ERR! node -v v0.10.29
npm ERR! npm -v 1.4.14
npm ERR! file C:\Users\myUser\AppData\Roaming\npm-cache\stream-combiner\0.0.4\package\package.json

npm ERR! code EJSONPARSE
npm ERR!
npm ERR! Additional logging details can be found in:
npm ERR! z:\myService\npm-debug.log
npm ERR! not ok code 0`

More full example please

I know this is a great library, but feel underestimated by the community.
The reason I think is that lack of the demonstration.
Probably @dominictarr the author have a complete picture in his mind, but the readers cannot follow the concept well.

Actually, I git cloned and tried the example, but completely had no idea what is going on.

The other thing is I feel this can be a great alternative for a very popular caolan/async library, and if possible I would like to use this one; however due to my poor understanding for stream and this library, it can't be done.

Please populate the concise example and documentation more.

Thanks for the great work!

Question: how to stream array of urls into request?

I am trying to get my head around streams. I have an array of urls and want to call for each url, request, which returns a csv file, that i would like then to process.
I have tried to do it like this:

var es = require('event-stream'),
    request = require('request');

var items =['item1', 'item2',..,'itemN']

var reader = es.readArray(items);

reader
    .pipe(es.map(function (data, cb) {
        var csv = request(data);
        cb(null, csv);
    }))
    .pipe(process.stdout)

but it doesn't work, what would be the correct way to implement this? Thx

de kitchen-sink

You've started this process already with pause-stream and through.

For example I use es.duplex the most out of all the event-stream modules.

It would be nice if that was a seperate duplex module that had that API.

Eventually it would be nice if I never have to require event-stream again.

If you don't want to do this, then I will take pieces out of event-stream and make them separate modules.

simple example pretty.js - split

That example is not instructive of concepts and is buggy.

Curl fetches only one line without EOL so it works. But mapping only just one line of input to json is not good way explaining the concept of 'map' function.

As soon as I tried mapping more then one line pretty.js has crashed in json parser on syntax error because es.split() has correctly generated empty string after last EOL terminating last line.

readArray has a problem with stream items?

Trying to work with a stream of streams:

es.readArray([
  es.readArray([1, 2, 3]),
  es.readArray([4, 5, 6]),
  es.readArray([7, 8])
]).on('data', function (stream) {
  stream.pipe(process.stdout);
});

I would expect this to print 1 through 8 but I get nothing. Could this be an issue with the library or am I thinking about this wrong?

es.merge. node 5.1.0. Does not work.

Use example at https://www.npmjs.com/package/event-stream

es.merge(
  process.stdout,
  process.stderr
).pipe(fs.createWriteStream('output.log'));

I have error:

events.js:141
      throw er; // Unhandled 'error' event
      ^

Error: read ENOTCONN
    at exports._errnoException (util.js:856:11)
    at WriteStream.Socket._read (net.js:394:21)
    at WriteStream.Readable.read (_stream_readable.js:328:10)
    at WriteStream.Socket.read (net.js:283:43)
    at resume_ (_stream_readable.js:718:12)
    at doNTCallback2 (node.js:452:9)
    at process._tickCallback (node.js:366:17)
    at Function.Module.runMain (module.js:459:11)
    at startup (node.js:138:18)
    at node.js:974:3

Could you offer how to fix it?

`readArray` and `writeArray` are confusing names?

What do you think of aliasing them to head or prime and tail or collect, respectively.

It's not clear from the method names or the docs what these do, but they are common use cases:

  • start this stream with a bunch of data I have right now
  • collect all data in a results array and wait for done/end event

I'm catching a lot of authors still attempting to manage on.data(function (data) { results.push(data); }) and similar end stanzas by hand instead of using these friendly methods and simply pipelining appropriately configured streams. My proposal is that somehow making these two methods more clear in particular would help people.

UTF-8 encoding

I am using your event-stream module in combination with json-csv to create a CSV file from an array of objects. Some elements contain text with utf-8 characters. When using readArray, these characters are converted. I am using the following code to save the file:

var out = fs.createWriteStream(csvFile, {encoding: 'utf8'});
var readable = event-stream.readArray(results);
readable.pipe(jsoncsv.csv(options)).pipe(out);

how to use wait?

I tried:

  rs = fs.create-read-stream in-file
  ws = fs.create-write-stream out-file

  es.pipe rs,                                 # load inventory file
    es.replace //^[\r\n]+|\.|[\r\n]+$//g, ''  # remove leading and trailing newlines
    es.split '\r'                             # seperate records
    to-obj in-columns                         # convert record to object
    transform!                                # add records, change as needed
    to-csv out-columns                        # convert back to comma seperated values
    es.replace //\u001d//g, ''                # remove unicode group separator
    es.replace //^[\r\n]+|\.|[\r\n]+$//g, ''  # remove leading and trailing newlines
    ws                                        # write to new file
    es.wait ->
      console.log 'done'

The file writes fine, but console.log never gets called. I know this must be something simple, but it escapes me.

Readable stream gets called infinitely?

Hi, forgive me for my noobness. I am trying to figure out, what is happening? I created a readable stream and it gets called infinitely.

es.readable(function(count, next) {
  next(null, 'hello world');
}).pipe(es.log('test'));

Why does it do that?

Feature request - add tee support

For piping readable streams to multiple writable streams, not only to the console and to a file as in the shell, but to as many as you like.

es.merge broke my data

I have 12 files to handle, so I want to create 12 ReadStreams and merge to one.

In each file, each line is a JSON string.

My code is like this:

  fileNames = fileNames.filter(Boolean);
  es.merge(fileNames.map(function(item, index) {
               return fs.createReadStream(item);
             }))
    .pipe(es.split())
    .pipe(es.parse())

After run it, I got some JSON.parse error like this:

SyntaxError: Unexpected token

and I found the error json looks like this:

{"ts":1452700832306,"segments":{"sex":1,"age":5,"os":1,"deviceprice":5},"appkey":"560129b88bda20a5270f88da","token":"jXL29pROAnEGng","deviceid":"D4FB58CD-39{"ts":1452700810077,"appkey":"560129b88bda20a5270f88da","token":"jXL29pROAnEGng","deviceid":"B8DA0F64C613E896B412788B9BDB06D4"}

I grep 1452700832306 and 1452700810077, and find they come from two different files:

in file1, test1.log:

{"ts":1452700832306,"segments":{"sex":1,"age":5,"os":1,"deviceprice":5},"appkey":"560129b88bda20a5270f88da","token":"jXL29pROAnEGng","deviceid":"D4FB58CD-39CF-4445-AD26-C85194C0D032"}

in file2, test2.log:

{"ts":1452700810077,"appkey":"560129b88bda20a5270f88da","token":"jXL29pROAnEGng","deviceid":"B8DA0F64C613E896B412788B9BDB06D4"}

So it looks like some of my data has been broken.

Node version: v4.2.3
OS: CentOS Linux release 7.0.1406 (Core)

flatMap?

It seems like this module could use a flatMap implementation. I've even looked for it in other modules, and can't seem to find a flatMap for node streams (which seems crazy!), only libraries like bacon.js and highland which use their own stream types.

Would you entertain a PR adding flatMap?

update map-stream dependency

es still uses map-stream 0.0.2, while you've already created 0.0.3 which solves out of order mapping.

Been cracking my head around this for a while :)

Access to stream combiner

Just wondering, the stream-combiner module is accessible via es.pipe but it's not exposed in the documentation. Is it safe to use es.pipe in our code?

Thanks in advance.

es.concat - if any stream is ended it doesn't work

If passing an already ended stream to es.concat / es.merge, it won't work unless an "end" event is manually provoked for that stream.

A possible fix would be to check if there is a "ended" property in the stream object and increment the counter during stream registration:

es.merge = function (/streams.../) {
var toMerge = [].slice.call(arguments)
var stream = new Stream()
var endCount = 0
stream.writable = stream.readable = true

toMerge.forEach(function (e) {

e.pipe(stream, {end: false})
var ended = false
  //BEGIN FIX
if(e.ended) {
    ended=true
    endCount++
}
  //END FIX
e.on('end', function () {

    if(ended) return
  ended = true
  endCount ++
  if(endCount == toMerge.length)
    stream.emit('end') 
})

})
stream.write = function (data) {
this.emit('data', data)
}
stream.destroy = function () {
merge.forEach(function (e) {
if(e.destroy) e.destroy()
})
}
return stream
}

compatibility with through and native streams in pipeline/pipe

Using es.pipeline to join together several es.pipe s of different streams seems to have some event based issues when more than one of the streams in pipe are created with through or es.through and the rest are subclasses of nodes native stream.Stream (Streams I).

The pipeline will work with all of the streams being subclasses of stream.Stream, and only one through stream, but where I put that through stream in the pipe seems to matter (has to be last). If I add more than one in that pipe, it breaks - the last node stream never gets an 'end' event.

I was wondering if there was a known compatibility issue with how these interact with each other - though I don't understand why this would be the case. Is it preferable that everything that goes through the pipeline be of one type of stream? i.e. all node streams or all through?

Simultaneous pipelines does not work...last one wins?

I have a need to process multiple files at once, each with it's own stream configuration. I'm new to NodeJS and EventStream, so I'm probably doing something dumb...but maybe not?

Here is a simplified version of my code...

for file in files
  console.log "Open file stream: #{file}"
  input = fs.createReadStream "#{PATH}#{file}"

  extractor = es.pipeline es.split(),
    es.map( (data, callback) ->   # callback(err, data)
...
          if (parsed) 
            return callback null, JSON.stringify(result) + '\n'  # pass along first match
      callback()  # drop data
    )
    es.map tapJSON

  console.log "Extracting facts from: #{file}"
  input.pipe extractor

The output I get only includes results from the last file...

Open file stream: EventLog_System.txt
Extracting facts from: EventLog_System.txt
Open file stream: EventLog_Application.txt
Extracting facts from: EventLog_Application.txt
{ category: 'VSS_ERROR',
  machine_name: 'Win7Pro-x64-VM1',
  date_time: '12/15/2012 3:07:00 PM',
  code: '22',
  none: 'None',
  error_message: 'Volume Shadow Copy Service error: A critical component required by the Volume Shadow Copy service is not registered.  This might happened if an error occurred during Windows setup or during installation of a Shadow Copy provider.  The error returned from CoCreateInstance on class with CLSID {e579ab5f-1cc4-44b4-bed9-de0991ff0623} and Name Coordinator is [0x80040154, Class not registered  ].    ' }

Reversing the order of files shows that the last pipeline wins.

Is it possible that the way event-stream, stream-combiner, duplex require eachother results in a singleton? I'm still looking, but I thought I'd ask the experts...

Nathan

es.map doesn't fire 'end' event?

I have a stream thats piped to es.map and it dosen't seem to fire end close or finish below is the map function.

function installFilterDataHandle(install, cb){
  var install_obj = JSON.parse(install);
  var origin_key = key_map[install_obj.api_key];

  if(!_.isUndefined(origin_key)){
    idfa_map[origin_key] = install_obj.om_idfa;
    cb(null,null);
  }
  cb(null, null);
}


  install_pipe = install_filter.stdout
              .pipe(es.split())
              .pipe(es.map(installFilterDataHandle));

  install_pipe.on('data', function(data){
    console.log("install pipe data",data);
  });

  install_pipe.on('end', function(){
    console.log('done mapping installs');
  });

  install_pipe.on('close', function(){
    console.log('done mapping installs');
  });

  install_pipe.on('finish', function(){
    console.log('done mapping installs');
  });

Error in parsing when using arrow functions

I'm getting queh trying to es.merge my javascript files.

  stream.js:75
     throw er; // Unhandled stream error in pipe.

The error message:
Error: Error in parsing: "modules/example/controllers/example-controller.js", Line 8: Unexpected token )

My Controller file:

        self.toggleSidenav = () => {
            return $mdSidenav('left').toggle();
        };

Is there some solution for it?

Add es.filter

There should be an equivalent of array.prototype.filter. I have used this simple wrapper around es.map:

es.filter = function(func){
  return es.map(function(data, next){
    func(data, function(result){
      if(result){
        next(null, data);
      }else{
        next();
      }
    })
  });
}

It can be used like this:

stream.pipe(es.filter(function(data, keep){
  keep(data.length > 0);
})).pipe(output);

Add a es.chain?

Currently I have something like this (es6):

function chain(...streams) {
  var [head] = streams;
  var tail = streams.reduce((prev, curr) => prev.pipe(curr));
  return es.duplex(head, tail);
}

so that I can write

function formatter() {
  return chain(
    es.split(),
    es.mapSync(...),
    es.stringify()
  );
}

someStream.pipe(formatter());

Would you like this added to event-stream? If yes please also tell me what behaviour you want, e.g. multiple arguments or single array argument, and what to do with empty array, maybe also other corner cases. I can go PR.

readable-stream

Node 0.9 will use the new readable-stream interface.

"data" events, pause and resume will be deprecated.

We need some kind of upgrade strategy for all these streaming modules. Including scuttlebutt, mux-demux, shoe, etc.

pipeline error logic

The docs clearly say that using 'pipeline' will emit errors from all the streams in the pipeline. In addition, though, the error is emitted for the 'pipeline' being created which means the error is re-emitted by the main pipeline. So this test indicates every error is duplicated.:

    describe('demonstrateerror', function(done) {
      return it('should error once', function() {
        var errors, pipe, thrower, thru;
        errors = 0;
        thrower = es.through(function(data) {
          return this.emit('error', new Error(data));
        });
        thru = es.through(function(data) {
          return this.emit('data', data);
        });
        pipe = es.pipeline(thru, thrower);
        pipe.on('error', function(err) {
          errors++;
          console.log('error count', errors);
          if (errors.length > 1) {
            return done();
          }
        });
        return pipe.write('meh');
      });
    });

My question is why the 'pipeline' isn't considered to be more transparent? Duplicating errors causes issues on handlers since error handling code will inevitably be fired at least twice for each error and is IMO counterintuitive since the error handler now has to be ensure to only fire once (yes, I could use 'once'). Am I misunderstanding the use of this or is this issue valid?

Best pattern to prepend and append element to a series while streaming?

I am using event-stream and request to convert a >2Gb file of records from one JSON 'schema' to another, to bulk upload to CouchDB. The code looks like:

s.createReadStream(inFile, {flags: 'r'})
    .pipe(es.split('\n'))
    .pipe(es.parse())
    .pipe(es.map(function (data, callback) {
            // transform the thing here     
            callback(null, data);       
    }))
    .pipe(es.stringify())
    .pipe(es.join(','))
    .pipe(request({ 
        'url': 'http://localhost:5984/dbname/_bulk_docs',
        'method': 'POST',
        'json': true,
    }, function (err, response, body) { 
        console.log(body);
    }));

The problem is that for the above to work I need to wrap all output in another JSON so that every original item becomes a record in an array called 'docs'. In other words, I need to prepend { "docs": [ and to append ] }, but I have no idea of how to do that. I know I could load the whole file in memory but it is not a good practice for something this big.

How can I achieve that with event-stream? Thanks,

Giacecco

es.split breaks pipe chain?

I have a custom stream that looks like this

function QueryFormatStream(){}
util.inherits(QueryFormatStream, Stream);

QueryFormatStream.prototype.write = function(query){
  console.log("we got query", query);
  this.emit('data', query);
};

QueryFormatStream.prototype.end = function(){
  this.emit('end');
};

module.exports = QueryFormatStream;

I have a pipe chain that looks like:

  var stream = fs.createReadStream(file_path)
                 .pipe(zlib.createGunzip())
                 .pipe(es.split())
                 .pipe(new QueryFormatStream())
                 .pipe(process.stdout)

running the stream like this produces no output and the console log statement in QueryFormatStream never gets called.

However calling it like so, without the QueryFormatStream correctly pipes it to process.stdout

  var stream = fs.createReadStream(file_path)
                 .pipe(zlib.createGunzip())
                 .pipe(es.split())
                 .pipe(process.stdout)

You sure "array" works on merge

I've tried that: https://github.com/dominictarr/event-stream#merge-stream1streamn-or-merge-streamarray

with an array and without an array. Everytime I use the array, it tells me:

[15:57:32] TypeError: undefined is not a function
    at /Users/kud/Projects/_playmedia/desktop/node_modules/event-stream/index.js:40:7
    at Array.forEach (native)
    at Object.es.concat.es.merge (/Users/kud/Projects/_playmedia/desktop/node_modules/event-stream/index.js:39:11)
    at Gulp.module.exports (/Users/kud/Projects/_playmedia/desktop/gulp/scripts.js:68:34)
    at module.exports (/Users/kud/Projects/_playmedia/desktop/node_modules/gulp/node_modules/orchestrator/lib/runTask.js:34:7)
    at Gulp.Orchestrator._runTask (/Users/kud/Projects/_playmedia/desktop/node_modules/gulp/node_modules/orchestrator/index.js:273:3)
    at Gulp.Orchestrator._runStep (/Users/kud/Projects/_playmedia/desktop/node_modules/gulp/node_modules/orchestrator/index.js:214:10)
    at /Users/kud/Projects/_playmedia/desktop/node_modules/gulp/node_modules/orchestrator/index.js:279:18
    at finish (/Users/kud/Projects/_playmedia/desktop/node_modules/gulp/node_modules/orchestrator/lib/runTask.js:21:8)
    at cb (/Users/kud/Projects/_playmedia/desktop/node_modules/gulp/node_modules/orchestrator/lib/runTask.js:29:3)

Build 3.0.20 failed and is breaking packages dependent on event-stream

Just tried to run npm update this morning on a barebones project containing browserify, debowerify, and connect. Browserify errored out when npm tried to grab it, so I looked deeper and got the following error:

npm http 404 https://registry.npmjs.org/event-stream/-/event-stream-3.0.20.tgz
npm ERR! fetch failed https://registry.npmjs.org/event-stream/-/event-stream-3.0.20.tgz
npm ERR! Error: 404 Not Found
npm ERR!     at WriteStream.<anonymous> (/usr/local/lib/node_modules/npm/lib/utils/fetch.js:57:12)
npm ERR!     at WriteStream.EventEmitter.emit (events.js:117:20)
npm ERR!     at fs.js:1596:14
npm ERR!     at /usr/local/lib/node_modules/npm/node_modules/graceful-fs/graceful-fs.js:103:5
npm ERR!     at Object.oncomplete (fs.js:107:15)
npm ERR! If you need help, you may report this log at:
npm ERR!     <http://github.com/isaacs/npm/issues>
npm ERR! or email it to:
npm ERR!     <[email protected]>

npm ERR! System Linux 3.11.0-14-generic
npm ERR! command "/usr/local/bin/node" "/usr/local/bin/npm" "update"
npm ERR! cwd /home/[path to node project directory]
npm ERR! node -v v0.10.22
npm ERR! npm -v 1.3.14

The CI server also indicates event-stream v3.0.20 fails across all browsers.

Add option to split() to remove matching characters from emitted chunks

This would bring it closer to String.split behaviour (at the moment you're trimming each line in test/split.asynct.js to do the comparison - this wouldn't account for leading spaces for example)

It would also allow for splitting on Regex patterns, because the matched characters wouldn't need to be re-emitted.

Happy to do a pull request for this, but figured I'd wait to see what you thought of #7 first.

es.fork

I would like to be able to fork a stream so it can be piped to two places at the same time, is there an easy way to do this already? In consuming a stream I'd like to fork it one direction to save the data in a temporary place until the other fork has been successfully processed.

Travis CI Integration

This is an enhancement request:

Integrate with Travis CI.

Don't make the tests a sub-module. Put them in the main repo. This would make it easier to run the tests, and catch merge, issues, etc.

readArray stream must be used on the same `tick`

es.readArray([1,2,3]).on('data', function (n) { console.log(n); });

works as expected while

var stream = es.readArray([1,2,3]);
process.nextTick(function () {
   stream.on('data', function (n) { console.log(n); });
});

does not work (nothing is printed)

This happens since the contents of the array is emitted to the stream on first stream.resume in a while loop (https://github.com/dominictarr/event-stream/blob/master/index.js#L108), which means delaying the use of the stream a tick makes it unusable.

Is this intended behaviour?

Please more examples!

Dear dominctarr,
Thank you for your work with event-stream, I have been using it for a few days now but still feel like I am just scraping the surface. There are things that I am sure are feasible using event-stream, but the documentation is not sufficient for me to understand how to put the pieces together. Please provide more examples with it!

My current issue in particular is that I need to transform a series of 'events' into a readable stream. The code should look something like the one below, where 'Emitter' is the constructor for a third-party object calling the function I give as a parameter every time there is new data to process.

var outStream = require('fs').createWriteStream("foo.txt", { 'encoding': 'utf-8' });
var inStream = require('event-stream').????;
var emitter = new Emitter(function (data) {
    // there's new data to push as part of the readable stream!
    inStream.????(data);
});
inStream.pipe(outStream);

Can you give me a hint? Thanks,

Giacecco

event-stream.readable overcalls async function

Running the following:

var es = require("event-stream");

var source = es.readable(function(count, cb){
  setTimeout(function(){
    cb(null, {ok: true, n: count});
  }, 1000);
});

var destination = es.map(function(data, cb){
  console.info(data); cb();
});

es.connect(source, destination);

I think the above should, once a second, write out a new line looking something like {ok: true, n: 1} where n increments every time. Instead it works correctly momentarily, but begins to 'double up' with every call. It will print n = 1, then two lines n = 2, 3. Followed with four lines, n = 4, 5, 6, 7, then eight, sixteen, etc etc.

I looked into the event-stream index.js file and I'm having some trouble tracking down why this behavior is occurring.. It looks like the .readable(fn) function has an embedded get() method which gets passed to the function passed into the original call, kicking off the next cycle... That is, I'm not quite sure how get() or func() is being called twice.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.