Giter VIP home page Giter VIP logo

zen-observable's Introduction

zen-observable

An implementation of Observables for JavaScript. Requires Promises or a Promise polyfill.

Install

npm install zen-observable

Usage

import Observable from 'zen-observable';

Observable.of(1, 2, 3).subscribe(x => console.log(x));

API

new Observable(subscribe)

let observable = new Observable(observer => {
  // Emit a single value after 1 second
  let timer = setTimeout(() => {
    observer.next('hello');
    observer.complete();
  }, 1000);

  // On unsubscription, cancel the timer
  return () => clearTimeout(timer);
});

Creates a new Observable object using the specified subscriber function. The subscriber function is called whenever the subscribe method of the observable object is invoked. The subscriber function is passed an observer object which has the following methods:

  • next(value) Sends the next value in the sequence.
  • error(exception) Terminates the sequence with an exception.
  • complete() Terminates the sequence successfully.
  • closed A boolean property whose value is true if the observer's subscription is closed.

The subscriber function can optionally return either a cleanup function or a subscription object. If it returns a cleanup function, that function will be called when the subscription has closed. If it returns a subscription object, then the subscription's unsubscribe method will be invoked when the subscription has closed.

Observable.of(...items)

// Logs 1, 2, 3
Observable.of(1, 2, 3).subscribe(x => {
  console.log(x);
});

Returns an observable which will emit each supplied argument.

Observable.from(value)

let list = [1, 2, 3];

// Iterate over an object
Observable.from(list).subscribe(x => {
  console.log(x);
});
// Convert something 'observable' to an Observable instance
Observable.from(otherObservable).subscribe(x => {
  console.log(x);
});

Converts value to an Observable.

  • If value is an implementation of Observable, then it is converted to an instance of Observable as defined by this library.
  • Otherwise, it is converted to an Observable which synchronously iterates over value.

observable.subscribe([observer])

let subscription = observable.subscribe({
  next(x) { console.log(x) },
  error(err) { console.log(`Finished with error: ${ err }`) },
  complete() { console.log('Finished') }
});

Subscribes to the observable. Observer objects may have any of the following methods:

  • next(value) Receives the next value of the sequence.
  • error(exception) Receives the terminating error of the sequence.
  • complete() Called when the stream has completed successfully.

Returns a subscription object that can be used to cancel the stream.

observable.subscribe(nextCallback[, errorCallback, completeCallback])

let subscription = observable.subscribe(
  x => console.log(x),
  err => console.log(`Finished with error: ${ err }`),
  () => console.log('Finished')
);

Subscribes to the observable with callback functions. Returns a subscription object that can be used to cancel the stream.

observable.forEach(callback)

observable.forEach(x => {
  console.log(`Received value: ${ x }`);
}).then(() => {
  console.log('Finished successfully')
}).catch(err => {
  console.log(`Finished with error: ${ err }`);
})

Subscribes to the observable and returns a Promise for the completion value of the stream. The callback argument is called once for each value in the stream.

observable.filter(callback)

Observable.of(1, 2, 3).filter(value => {
  return value > 2;
}).subscribe(value => {
  console.log(value);
});
// 3

Returns a new Observable that emits all values which pass the test implemented by the callback argument.

observable.map(callback)

Returns a new Observable that emits the results of calling the callback argument for every value in the stream.

Observable.of(1, 2, 3).map(value => {
  return value * 2;
}).subscribe(value => {
  console.log(value);
});
// 2
// 4
// 6

observable.reduce(callback [,initialValue])

Observable.of(0, 1, 2, 3, 4).reduce((previousValue, currentValue) => {
  return previousValue + currentValue;
}).subscribe(result => {
  console.log(result);
});
// 10

Returns a new Observable that applies a function against an accumulator and each value of the stream to reduce it to a single value.

observable.concat(...sources)

Observable.of(1, 2, 3).concat(
  Observable.of(4, 5, 6),
  Observable.of(7, 8, 9)
).subscribe(result => {
  console.log(result);
});
// 1, 2, 3, 4, 5, 6, 7, 8, 9

Merges the current observable with additional observables.

observable.all()

let observable = Observable.of(1, 2, 3);
for (let value of await observable.all()) {
  console.log(value);
}
// 1, 2, 3

Returns a Promise for an array containing all of the values produced by the observable.

zen-observable's People

Contributors

abdonrd avatar araqnid avatar benoitzugmeyer avatar chocolateboy avatar esrefdurna avatar jamestalmage avatar jbarrington avatar jellelicht avatar jordalgo avatar ljharb avatar qubyte avatar zenparsing avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

zen-observable's Issues

Would you like to use a transpiler?

I have it as a dev dependency an I see it failing in older browsers at least because of "let" statement. Would you like to support any browser?

TypeScript?

Hey @zenparsing!

I have been happily using zen-observable for at least two years now, most recently for threads.js. Sometimes I run into some subtle edge-casy issues with TypeScript, though.

Would you be interested in merging a TypeScript port of zen-observable? It shouldn't have any impact on the JS users, but would be pretty nice for all TS users.

TypeError: from(x) must be observable, iterable, or array-like: [object Object]

Hi,

I've ran into this problem today and it took quite a long time to track down.

I'm using:
most: 1.2.2
zen-observable: 0.4.0
and running in node 7.6.0

The problem occurs because zen-observable relies on Symbol['observable'] to exist otherwise it will use @@observable instead.

mostjs uses symbol-observable which will define Symbol(observable) if the Symbol function exist.

So if you create a zen-observable before including most, then the observable will have @@observable instead of a Symbol and most.from(observable) will fail as a consequence.

Here's how to reproduce:

const Observable = require('zen-observable')
let observable = new Observable(observe => {})
const most = require('most')
most.from(observable)
// TypeError: from(x) must be observable, iterable, or array-like: [object Object]

While changing the order works:

const most = require('most')
const Observable = require('zen-observable')
let observable = new Observable(observe => {})
most.from(observable)
// works!

This really goes back to the decision in #9 .

I'm thinking changing hasSymbol and getSymbol to something along the lines of https://github.com/benlesh/symbol-observable/blob/master/es/ponyfill.js might be a solution.

What do you think?

Throwing an exception from within the forEach callback should cancel the subscription

Here is what I was trying to do:

"use strict";

const Observable = require("zen-observable");

Observable.of(1, 2, 3)
    .forEach(n => {
        console.log(n);

        // Since .forEach returns a promise I assumed that I should be
        // able to return it from here so that errors are propagated up.
        return Observable.of(4, 5, 6)
            .forEach(m => {
                console.log(m);
                throw new Error("ERROR!");
            });
    })
    .then(_ => console.log("completed."))
    .catch(err => console.error(err.stack));

Here is the actual result:

1          
2          
3          
completed. 
4          
5          ! ---------
6          !
4          !
5          !   How come both the forEach's survived an exception?
6          !
4          !
5          !
6          ! ---------

Observations:

  • Errors are silently ignored.
  • "completed." is logged before the inner list has completed.
  • 5 and 6 get logged to the output... I had expected the 'throw' statement to abort the forEach().

What is the best way to handle such a use case with this proposal?

.babelrc in npm

hi guys:

thanks for your great work, really like this simple and powerful library.
I found an issue (maybe I think this is an issue). You put .babelrc file in npm. If I want to compile packages from node_modules, your .babelrc will always has privilege and conflict with my plugins. now I have a workaround to get rid of this.
But I think .babelrc is unnessary in NPM.

Regards

How to combine 2 observables together.

Hi, I'm wondering how to combine observables together.

e.g.

const myObservableUsername = new Observable(...);
const githubObservable = new Observable(...);

const obs = Observable.from([myObservableUsername, githubObservable]).subscribe(console.log);

obs.next('lifeiscontent'); // JSON response of github from username.

if someone can point me to examples on how to do something like this, it would be ideal. Thanks!

Webpack build breaks

Because the require keyword is present in zen-observable.js as a parameter, webpack tries to bundle all of the files in the root folder. Could the project be changed do maybe instead of calling the variable require instead call it requireFn? That way require can be used in the way which build tools like webpack expect.

async function support

Hey apologies for a newbie question. Are async/await functions supported by observables?

Reduced Example

new Observable(async (observer) => {
  await (async () => {})
})

Error

TypeError: [object Promise] is not a function
    at Subscription (/Users/seb/Code/sebdeckers/http2live/node_modules/zen-observable/zen-observable.js:110:15)
    at Observable.subscribe (/Users/seb/Code/sebdeckers/http2live/node_modules/zen-observable/zen-observable.js:229:12)
    at Promise (/Users/seb/Code/sebdeckers/http2live/node_modules/listr/lib/task.js:132:13)
    at Promise (<anonymous>)
    at handleResult (/Users/seb/Code/sebdeckers/http2live/node_modules/listr/lib/task.js:131:14)
    at Promise.resolve.then.then.skipped (/Users/seb/Code/sebdeckers/http2live/node_modules/listr/lib/task.js:169:12)
    at <anonymous>

TypeError: Cannot read property 'call' of undefined

Hello,

I'm currently coding a microframework for Discord (command handler, etc.)

And I've got this error :

TypeError: Cannot read property 'call' of undefined
    at new Subscription (C:\TEMP\ig-bot\node_modules\zen-observable\lib\Observable.js:183:34)
    at CommandHandler.subscribe (C:\TEMP\ig-bot\node_modules\zen-observable\lib\Observable.js:262:14)
    at Object.<anonymous> (C:\TEMP\ig-bot\index.js:21:20)
    at Module._compile (module.js:652:30)
    at Object.Module._extensions..js (module.js:663:10)
    at Module.load (module.js:565:32)
    at tryModuleLoad (module.js:505:12)
    at Function.Module._load (module.js:497:3)
    at Function.Module.runMain (module.js:693:10)
    at startup (bootstrap_node.js:191:16)

Here's my code : https://git.kaki87.net/KaKi87/discord-microframework
(More precisely here : https://git.kaki87.net/KaKi87/discord-microframework/src/branch/master/lib/CommandHandler.js)

The error is triggered when Observable.subscribe is fired and I tried to identify the issue, for days, but I can't find out.

Thanks !!

Lua Port and License

I'd like to make a port of this library to lua. If that is alright with you I'd like to know exactly how to include the license. Should I copy it verbatim or could I add a copyright of my own at the top? Such as:
Port copyright (c) 2016 tylorr (Tylor Reynolds)

subscribe doesn't work

On npm i:

Observable.of(1,2,3).subscribe(function() {
  console.log(arguments);
});

doesn't log

.then doesn't seem to work

Running the following example from the README, then seems to be missing or not implemented.

Observable.of(0, 1, 2, 3, 4).reduce((previousValue, currentValue) => {
    return previousValue + currentValue;
}).then(result => {
    assert(result === 10);
});

All declarations of 'observable' must have identical modifiers

ERROR in node_modules/@types/zen-observable/index.d.ts(9,18): error TS2687: All declarations of 'observable' must have identical modifiers.

Other dependencies have had this problem around June 1. Apparently it is related to an RXJS update.

I need to roll back from the latest version. This is a dependency of one of my Angular dependencies.

Why?

Since there's no "discussions" tab for this project, I ask the question here.

Why zen-observable?
What's the difference with RxJS?

The readme says

An implementation of Observables for JavaScript

Where is the definition of Observables (outside RxJS)?

And last but not the least.. why zen? ๐Ÿง˜๐Ÿปโ€โ™‚๏ธ

Document reliance on `this.constructor` and `Symbol.species` for subclassing?

I've noticed that for .map and friends, you rely on this.constructor[Symbol.species] or this.constructor to get the constructor, so you can support subclassing. This is understandable, but could this be documented? I'm using these to implement various Subject-like classes, and I'd rather not have to read the source to figure out how to properly subclass something.

undefined is not an object (evaluating 'queue.length')

I got a crash error when I using apollo-link with react-native, and I found the problem coming from the package 'zen-observable'.

And I found the code here:

function flushSubscription(subscription) {
  var queue = subscription._queue;
  subscription._queue = undefined;
  subscription._state = 'ready';
  for (var i = 0; i < queue.length; ++i) {
    notifySubscription(subscription, queue[i].type, queue[i].value);
    if (subscription._state === 'closed') break;
  }
}

After I change it to:

function flushSubscription(subscription) {
  var queue = subscription._queue;
  subscription._queue = undefined;
  subscription._state = 'ready';
  for (var i = 0; i < (queue || []).length; ++i) {
    notifySubscription(subscription, queue[i].type, queue[i].value);
    if (subscription._state === 'closed') break;
  }
}

the crash dismissed.

Boxed/Static call needed?

If I do, for example, the following, nothing happens:

var obs = new Observable(observe => {/*... do set up here*/});
obs
  .filter(value => !value || !value.startsWith("//"))
  .reduce((collector,value) => collector.add(value), new Set())
  .forEach(set => console.log(set))

However, if I do:

var obs = new Observable(observe => {/*... do set up here*/});
Observable.from(obs)
  .filter(value => !value || !value.startsWith("//"))
  .reduce((collector,value) => collector.add(value), new Set())
  .forEach(set => console.log(set))

It works. However, that's obviously not ideal.

Please stop polyfilling `Symbol.observable` ๐Ÿ™

I don't use zen-observable directly, but I do import Apollo client which imports this library. Somewhat unexpectedly, this library polyfills Symbol.observable here: https://github.com/zenparsing/zen-observable/blob/master/src/Observable.js#L6-L8

This polyfill is breaking my code because, suddenly, the order of execution of modules accessing Symbol.observable matters. I'll need to add import "zen-observable" to the polyfill section of my code ๐Ÿ˜†. Even though it is a breaking change, it would be fantastic if zen-observable followed the pattern of other libraries accessing Symbol.observable in a non-side effecting manner.

E.g. (from rxjs)

export const observableSymbol: string | symbol = (() => (typeof Symbol === 'function' && Symbol.observable) || '@@observable')();

I think many developers would find the bugs caused by the current behavior just about impossible to diagnose, but since I'm mildly familiar with rxjs internals and have written some custom from() functions before, I got lucky and figured out what was going on.

bug in `combineLatest`

combineLatest can cause wrong order with its result.
This is caused by Map.
map.values() will produce a MapIterator according to assigning orders.

values = new Map()
values.set(1, 1)
values.set(0, 0)
Array.from(values.values())  // [1, 0]

But running orders of oberservables are uncertain.

shared multicast implementation?

As I mentioned in tc39/proposal-observable#66 (comment), without a single agreed-upon multicast implementation, multiple libraries that need multicast in their own internal observables might have redundant implementations that bloat a browser bundle.

Obviously I could make a centralized multicast implementation in separate package, but then someone else might do so too without finding about my package, and we would still have the same problem. So I think this package is really the best place for a multicast implementation to live. Would you be interested in a PR?

Async notification / discrepancy with tc39 implementation

Consider state observable:

import Observable from "zen-observable";

let state = { foo: "foo" };
let o = new Observable(observer => {
  observer.next(state);
});
o.subscribe(state => console.log(state.foo));
state.foo = "bar";

(sandbox)

The es-observable logs 'foo' to console, the zen-observable logs bar, because notification is asynchronous.
Is there a particular reason for Promise.resolve() here?

Uncaught TypeError: Cannot read property 'small' of undefined

Uncaught TypeError: Cannot read property 'small' of undefined

Using

Google Chrome | 74.0.3729.108ย (Official Build)ย (64-bit)
on

OS | Windowsย 10 OS Build 17134

Uncaught TypeError: Cannot read property 'small' of undefined
    at ItemPreviewContent.js:30
    at Object.children (Query.js:63)
    at t (react-apollo.esm.js:379)
    at a.render (react-apollo.esm.js:383)
    at Bi (react-dom.production.min.js:173)
    at Pi (react-dom.production.min.js:172)
    at Ei (react-dom.production.min.js:180)
    at Ur (react-dom.production.min.js:232)
    at qr (react-dom.production.min.js:233)
    at Bs (react-dom.production.min.js:249)

Please add tags

Hello,

to help packaging, could you add tags corresponding to npm releases ?

Cheers,
Xavier (Debian packager)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.