Giter VIP home page Giter VIP logo

streams's Introduction

Channel API

Where Did All the Text Go?

We are in the process of transitioning this specification from a GitHub README into something a bit more palatable. The official-lookin' version is developed in the official-lookin branch's index.html file, which you can see on GitHub, or in its rendered glory at this long URL.

Right now, we've transferred over most of the concepts and text, but none of the algorithms or APIs. We'll be iterating on the APIs a bit more here, in Markdown format, until we feel confident in them. In the meantime, please check out the rendered spec for all of the interesting stage-setting text.

By the way, this transition is being tracked as #62.

FixedBuffer

FixedBuffer class is used by buferred channels if it's instantiated with an argument of type number. FixedBuffer implements general Buffer interface:

// Buffer interface is used for buffers that can be passed
// into channels, they allow configurable bufferring of
// the channel based on the data it will be handling.
interface Buffer {
  // If this buffer is empty this should return `true`,
  // otherwise it should return `false`.
  boolean isEmpty();
  // If this buffer is full this should return `true`
  // otherwise it should return `false`.
  boolean isFull();
  // If this buffer is empty this should throw exception
  // otherwise it should return data chunk from the buffer
  // and adjust internal state as appropriate.
  any take();
  // If this `buffer` is full this should throw an exception
  // otherwise given `data` should be put into a buffer.
  put(any data);
}

Properties of the FixedBuffer prototype

constructor(size)
  1. Set this.[[size]] to size.
  2. Set this.[[buffer]] to [].
isEmpty()
  1. Return this.[[buffer]].length === 0.
isFull()
  1. Return this.[[buffer]].length === this.[[size]].
put(chunk)
  1. If this.isFull(), throw Error
  2. Add chunk to a buffer by calling: this.[[buffer]].unshift(chunk).
take()
  1. If this.isEmpty(), throw Error.
  2. Return this.[[buffer]].pop().

Port

Port is a class that both InputPort and OutputPort derive from.

// Port interface is implemented by both input and output ports
// of the channel.
interface Port {
  // Closes this channel. Note that queued and bufferred values
  // still can be taken off the closed channel. If there are
  // queued takes on the channel (this implies buffer and put
  // queue is empty) then those takes are resolved with `undefined`.
  void close();
}

Properties of the Port prototype

constructor(channel)
  1. Set this.[[channel]] to channel.
  2. Return this
close()
  1. Let channel be this.[[channel]]
  2. Let result be result of calling [[close]] method of channel.
  3. Return result.

Operation

Objects implementing Operation interface are used to represent "take" and "put" operations on the channel. They can have a state that is either complete or pending. State can be checked via isPending method. If operation is complete it's result can be accessed via valueOf method. If operation is pending calling valueOf throws an error. Operation derives from Promise and it's then method can be used to access it's result.

interface Operation : Promise {
  // If operation is pending returns `true` otherwise
  // returns `false`.
  boolean isPending();
  // If operation is pending throws an error, otherwise
  // returns result of the operation.
  any valueOf();
}

Properties of the Operation prototype

isPending()
  1. Let choice be this.[[select]].[[choice]].
  2. Let result be `true.
  3. If this is choice, Set result to false.
  4. Return result.
valueOf()
  1. If this.isPending() is true throw an Error.
  2. Return this.[[result]].
[[isActive]]
  1. Let result be true.
  2. If this.[[select]].[[choice]] is not void 0, Set result to false.
  3. Return result.
[[complete]](result)
  1. If this.[[isActive]]() is false throw an Error.
  2. Set this.[[select]].[[choice]] to this.
  3. Resolve this promise with a result as a value.
  4. Set this.[[result]] to result.

OutputPort

An output port represents end of the channel into which data can be put to make it available to the input port of the same channel.

[NamedConstructor=OutputPort(Channel channel)]
interface OutputPort : Port {
  // If this channel is closed operation is ignored and
  // promise resolved with `undefined` is returned
  // (Promise.resolve(void 0)).


  // If `value` put on a channel is `undefined` it is ignored
  // same as it is in JSON.

  // Channels can be bufferred or unbuffered. Putting values
  // onto buffered channel returns promises resolved with
  // `true` (`Promise.resolve(true)`) until buffer is full.

  // If this channel is unbuffered or buffer is full puts values
  // are queued. In such case returned value a promise that will
  // be resolved with `true` once put value will make it into
  // a buffer or be taken of the channel.
  Operation <boolean> put(any value);
}

Properties of the OutputPort prototype

constructor(channel)
  1. Call the [[Call]] internal method of Port, with this as thisArgument and channel as an argument.
get [[Prototype]]
  1. Return Port.prototype.
put(data)
  1. Let channel be this.[[channel]]
  2. Return channel.[[put]](this, data, void 0).

InputPort

An input port represents end of the channel from which data can be taken, that was put there from the output port of the same channel.

[NamedConstructor=InputPort(Channel channel)]
interface InputPort : Port {
  // If this channel has no queued "puts" and has being
  // already closed returns promise resolved with `undefined`
  // think of it as reading undefined property.
  //
  // If channel has no queued or bufferred "puts" then
  // "take" is enqueued and promise is returned that
  // will be resolved with value whenever it's "put"
  // on a channel.
  //
  // If this channel has bufferred data it's taken off
  // the buffer and promise resolved with it is returned.
  // If channel has queued "put" data it will be moved
  // over to buffer.
  //
  // If channel is unbuffered and it has queued "put" data
  // promise resolved with "put" data is returned, also
  // causing pending put promise to resolve.
  Operation <any> take();
}

Properties of the InputPort prototype

  1. Call the [[Call]] internal method of Port, with this as thisArgument and channel as an argument.
get [[Prototype]]
  1. Return Port.prototype.
take()
  1. Let channel be this.[[channel]]
  2. Return channel.[[take]](this, void 0).

Channel

[NamedConstructor=Channel(float n),
 NamedConstructor=Channel(Buffer buffer)]
interface Channel {
  attribute InputPort input;
  attribute OutputPort output;
}

Properties of the Channel prototype

constructor(buffer)

The constructor can be passed optional argument that implements Buffer interface. If such argument is passed than resulting channel is bufferred and given buffer is used for bufferring data. If argument is a number, then buffered channel is created with fixed size buffer of given size. Bufferred channels allow seperation of data handling (delegating that to buffer) from data transfer. Bufferred channels won't block (return pre-resolved promises) when putting data on them until buffer is full, which gives more freedom to a producer and consumer to have they're own work schedules.

Data still can be put on the channel even if buffer is full or if channel is unbuferred it's just in such case put operation is going to be enqueued until it can be completed.

  1. If buffer is instance of Buffer, Let buffer be buffer.
  2. If buffer is type of number, Let buffer be new FixedBuffer(buffer).
  3. If bufffer is undefined Let buffer be undefined.
  4. Set this.[[buffer]] to buffer.
  5. Set this.[[puts]] to [].
  6. Set this.[[takes]] to [].
  7. Set input.[[closed]] to false.
  8. Set this.[[in]] to new InputPort(this).
  9. Set this.[[out]] to new OutputPort(this).
get input()
  1. Return this.[[in]].
get output()
  1. Return this.[[out]].
close()
  1. If this.[[closed]] is false,
  2. Set this.[[closed]] to true.
  3. While this.[[takes]].length > 0:
    1. Let take be this.[[takes]].pop(),
    2. If take.[[isActive]]() is true, Call take.[[complete]](void 0).
  4. Return void 0.
[[put]](port, data, select)
  1. If port isn't instance of OutputPort throw TypeError.
  2. Let puts be this.[[puts]].
  3. Let takes be this.[[takes]].
  4. Let buffer be this.[[buffer]].
  5. Let put be a newly-created pending operation.
  6. If select is void 0,
    1. Set put.[[select]] to put.
  7. If select is instance of Select,
    1. Set put.[[select]] to select.
  8. If put.[[isActive]]() is true,
    1. If this.[[closed]] is true, call put.[[complete]](void 0).
    2. If this.[[closed]] is false and
      1. If data is void 0,
        1. call put.[[complete]](true).
      2. If data is not void 0,
        1. If buffer is void 0,
          1. Let take be takes.pop().
          2. While take is object & take.[[isActive]]() is false,
            1. Set take to take.pop().
          3. If take is object & take.[[isActive]]() is true,
            1. Call put.[[complete]](true).
            2. Call take.[[complete]](data).
          4. If take is void 0,
            1. Set put.[[value]] to data.
            2. Call puts.unshift(put).
        2. If buffer is instance of Buffer,
          1. If buffer.isFull() is true,
            1. Set put.[[value]] to data.
            2. Call puts.unshift(put).
          2. If buffer.isFull() is `false,
            1. Call buffer.put(data).
            2. Call put.[[complete]](true).
            3. If buffer.isEmpty() is false,
              1. Let take be takes.pop().
              2. While take is object && take.[[isActive]]() is false
                1. Set take to take.pop().
              3. If take is object & take.[[isActive]]() is true,
                1. Call take.[[complete]](buffer.take()).
  9. Return put.
[[take]](port, select)
  1. If port isn't instance of InputPort throw TypeError.
  2. Let puts be this.[[puts]].
  3. Let takes be this.[[takes]].
  4. Let buffer be this.[[buffer]].
  5. Let take be a newly-created pending operation.
  6. If select is void 0,
    1. Set put.[[select]] to put.
  7. If select is instance of Select,
    1. Set put.[[select]] to select.
  8. If take.[[isActive]]() is true,
    1. If buffer is not void 0,
      1. Let isEmpty be buffer.isEmpty().
      2. If isEmpty is false
        1. Let data be buffer.take().
        2. Call take.[[complete]](data).
        3. If buffer.isFull() is false,
          1. Let put be puts.pop().
          2. While buffer.isFull() is false and put is object
            1. If put.[[isActive]]() is true,
              1. Call put.[[complete]](true).
              2. Call buffer.put(put.[[value]]).
            2. set put to puts.pop().
      3. If isEmpty is true,
        1. If this.[[closed]] is true,
          1. Call take.[[complete]](void 0).
        2. If this.[[closed]] is `false,
          1. Call takes.unshift(take).
    2. If buffer is void 0,
      1. Let put be puts.pop().
      2. While put is object and put.[[isActive()]] is false,
        1. Set put to puts.pop().
      3. If put is object,
        1. Call put.[[complete]](true).
        2. Call take.[[complete]](put.[[value]]).
      4. If put is void 0,
        1. If this.[[closed]] is true,
          1. Call take.[[complete]](void 0)
        2. If this.[[closed]] is false,
          1. Call takes.unshift(take).
  9. Return take

Select

Select allows to make a single choice between several channel operations (put / take). Choice is made in favor of operation that completes first. If more than one operation is ready to be complete at the same time choice is made in favor of the operation which was requested first.

Interface

[NamedConstructor=Select()]
interface Select {
  Operation <any> take(InputPort input);
  Operation <boolean> put(OutputPort output, any value);
}

Properties of the Select prototype

constructor()
  1. Let this.[[choice]] be undefined.
  2. Return this.
put(port, data)
  1. Let channel be port.[[channel]]
  2. Return channel.[[put]](port, data, this).
take(port)
  1. Let channel be port.[[channel]]
  2. Return channel.[[take]](port, this).

streams's People

Contributors

domenic avatar gozala avatar marcoscaceres-remote avatar raynos avatar tyoshino avatar tzik avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

Forkers

grncdr

streams's Issues

Write socket exmaple with a proper error handling

spawn(function*() {
  var chunk = void(0)
  try {
    while (chunk = yield socket.read()) {
      yield socket.write(chunk + " " + chunk)
    }
  } catch(error) {
    console.error(error)
  }
})


Socket.prototype.read = function() {
  var select = new Select()
  var data = select.take(this.input)
  var error = select.take(this.error).then(function(reason) {
    Promise.reject(reason)
  })
  return Promise.race([data, error])
}

Join efforts to promote CSP style channels to standard bodies

Hi Folks,

If I have mentioned you here that is because either you have written CSP style channel library of some form or have expressed interest in this subject.

As you may know watwg group is working on a DOM standard Stream API which has being mainly designed after node's stream. The problem is that node streams are not the good foundation as they're in their third rewrite phase and are still very complex. I assume you would agree that CSP style channels would be a better foundation, which is what I plat to prove to them & I hope to get some help in doing this from a people who may have a common interest - You. In worst case scenario our efforts will not end up in DOM standard but we still could end up with an open standard like Promise/A+.

Current status of our efforts is:

If you have time and interest consider working with me & others to build up undeniable case for standards group proving that CSP style channels would be a better choice for driving innovation.

@odf @BagosGiAr @snetz @coreh @Raynos @fitzgen @Benvie @jlongster @Schuck-Johnson @thehydroimpulse @gyson @avoidwork @brentburgoyne @LightSpeedWorks @brophdawg11 @eugeneware @cojs @ubolonton @jspipe @joubertnel @rads @odf @olahol

Write a parser example

Platform ppl at mozilla have being eager to see a real parser example where data is passed via channel. They are not convinced that streams / channels without unread / unput are a good fit (namely HTML parser was thought to be very hard to port from current geco implementation)

Probably servo written in rust would have an example of that.

Ability to abort a channel

Lets says you can no longer put() to a channels output because the person taking has an error.

lets say your putting to a channel to write to a file, and the file implementation is taking and saving to the file.

The implementation will want to emit some kind of "disk is full error". All pending puts should probably be aborted / rejected with an error.

Add Writer examples where channel is written to disk

It is important to illustrate that WriteStream is a bad design as errors triggered by it can't really be reasonably handled by anyone which caused node to come up with domains. We also need to illustrate that good design will have something like a Writer that writes each chunk either succeeding or failing to do so. We also should have realistic example of how to recover from disk is full style of errors, and explain that passing responsibility to a data producer is wrong.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.