Giter VIP home page Giter VIP logo

callbag's Introduction

Callbag πŸ‘œ

A standard for JS callbacks that enables lightweight observables and iterables

  • Minimal overhead streams, Iterables, Observables, AsyncIterables, etc
  • Modular (each operator is its own npm package)
  • Light (few memory allocations)
  • Not a library, just a standard (for real libraries, see callbag-basics or the wiki)
  • Easy to create your own utilities, read how here

Read also the announcement blog post and this introductory blog post.

Summary

  • Every producer of data is a function (type: number, payload?: any) => void
  • Every consumer of data is a function (type: number, payload?: any) => void
  • type === 0 means "start" (a.k.a. "subscribe" on Observables)
  • type === 1 means "data" (a.k.a. "next" on Observers)
  • type === 2 means "end" (a.k.a. "unsubscribe" on Subscriptions)

Specification

(type: number, payload?: any) => void

Definitions

  • Callbag: a function of signature (TypeScript syntax:) (type: 0 | 1 | 2, payload?: any) => void
  • Greet: if a callbag is called with 0 as the first argument, we say "the callbag is greeted", while the code which performed the call "greets the callbag"
  • Deliver: if a callbag is called with 1 as the first argument, we say "the callbag is delivered data", while the code which performed the call "delivers data to the callbag"
  • Terminate: if a callbag is called with 2 as the first argument, we say "the callbag is terminated", while the code which performed the call "terminates the callbag"
  • Source: a callbag which is expected to deliver data
  • Sink: a callbag which is expected to be delivered data

Protocol

The capitalized keywords used here follow IETF's RFC 2119.

Greets: (type: 0, cb: Callbag) => void

A callbag is greeted when the first argument is 0 and the second argument is another callbag (a function).

Handshake

When a source is greeted and given a sink as payload, the sink MUST be greeted back with a callbag payload that is either the source itself or another callbag (known as the "talkback"). In other words, greets are mutual. Reciprocal greeting is called a handshake.

Termination: (type: 2, err?: any) => void

A callbag is terminated when the first argument is 2 and the second argument is either undefined (signalling termination due to success) or any truthy value (signalling termination due to failure).

After the handshake, the source MAY terminate the sink. Alternatively, the sink MAY terminate the source after the handshake has occurred. If the source terminates the sink, then the sink SHOULD NOT terminate the source, and vice-versa. In other words, termination SHOULD NOT be mutual. A callbag MUST NOT be terminated more than once.

Data delivery (type: 1, data: any) => void

Amount of deliveries:

  • A callbag (either sink or source) MAY be delivered data, once or multiple times

Window of valid deliveries:

  • A callbag MUST NOT be delivered data before it has been greeted
  • A callbag MUST NOT be delivered data after it has been terminated
  • A sink MUST NOT be delivered data after it terminates its source

Reserved codes

A callbag SHOULD NOT be called with any of these numbers as the first argument: 3, 4, 5, 6, 7, 8, 9. Those are called reserved codes. A callbag MAY be called with codes other than those in the range [0-9], but this specification makes no claims in those cases.

Legal

This project is offered to the Public Domain in order to allow free use by interested parties who want to create compatible implementations. For details see COPYING and CopyrightWaivers.txt.

CC0
To the extent possible under law, Callbag Standard Special Interest Group has waived all copyright and related or neighboring rights to Callbag Standard. This work is published from: Finland.

callbag's People

Contributors

acailly avatar andarist avatar bloodyknuckles avatar devanshj avatar dreid avatar johnrees avatar krawaller avatar loreanvictor avatar niieani avatar staltz avatar vinspee avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

callbag's Issues

The type 2 event is called too early after introducing new values into the stream

I created a callbag which exposes its sink to its operator. This allows me to introduce new values onto the stream. However, when I do this, the type 2 event seems to be called too early. In the example below, the "done" message is printed halfway through processing - instead of at the end of processing. What's the trick?

const {fromIter, iterate, interval, map, filter, take, pipe} = require('callbag-basics');
const subject = require('callbag-subject');
const forEach = require('callbag-for-each');

const map_async = function(fn) {
    
    return function(source) {
        return function(start, sink) {
            if (start === 0) {
                source(0, function(type, data) {
                    if (type == 1) {
                        var result = fn(data, function(value) {
                            sink(type, value);
                        });
                        if (result !== undefined) {
                            sink(type, result);
                        }
                    } else {
                        sink(type, data)
                    }
                });
            }
        }
    }
};

const done = function(operation) {
    
    return function(source) {
        return function(start, sink) {
            let talkback;
            if (start === 0) {
                source(0, function(type, data) {
                    if (type === 0) {
                        talkback = data;
                        sink(type, data);
                    } else if (type === 1) {
                        sink(type, data);
                    } else if (type === 2) {
                        operation();
                    }
                });
            }
        }
    }
};

pipe(
    fromIter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
    map_async(function(each, sink) {
        setTimeout(function() {
            sink(each);
            sink(each);
        }, 10);
    }),
    done(function() {
        console.log('done');
    }),
    iterate(function(each) {
        console.log(each)
    })
);

name of the pattern

the name you gave to this pattern is not intuitive. Network or Conversation Pattern would be bettter :)

Stricter types?

Would the Typescript community benefit from stricter types?
It saves a few headaches when your compiler can hold your hand a bit more :)

A few things I would be looking for:

  • Allow the error type to be specified
  • Stricter signal / parameters for each type of callbag

Here is something I have been using:

export enum Signal {
  START = 0,
  DATA = 1,
  END = 2,
}

export type TalkbackArgs = [signal: Signal.DATA] | [signal: Signal.END]
export type Talkback = (...op: TalkbackArgs) => void

export type SinkArgs<E, A> =
  | [signal: Signal.START, talkback: Talkback]
  | [signal: Signal.DATA, data: A]
  | [signal: Signal.END, error?: E]
export type Sink<E, A> = (...op: SinkArgs<E, A>) => void

export type SourceArgs<E, A> = [signal: Signal.START, sink: Sink<E, A>]
export type Source<E, A> = (...op: SourceArgs<E, A>) => void

export type Callbag<E, A> = Source<E, A> | Sink<E, A>

I'm missing something

This is not an issue of course but a doubt. I was building my operator and I'm having problem. So I start thinking I don't get something :)

My operator is named group and returns the data in chunks of n elements so you can run something like this.

const iterate = require('callbag-iterate');
const range = require('callbag-range');
const group = require('callbag-group');

const source = range(1, 10);
const groupedSource = group(5)(source);

iterate((x) => {
  console.log(x);
})(groupedSource);

This should print something like

[1,2,3,4,5]
[6,7,8,9,10]

but nothing happens actually. My operator implementation is the following

const group = n => source => (start, sink) => {
  let bunch = [];
  if (start !== 0) return;
  source(0, (t, d) => {
    if (t === 0) {
      sink(0, d);
    } else if (t === 1) {
      bunch.push(d);
      if (bunch.length === n) {
        sink(1, bunch);
        bunch = [];
      }
    } else if (t === 2) {
      if (bunch.length) {
        sink(1, bunch);
      }
      sink(t, d);
    }
  });
};

module.exports = group;

and you can find tests in its repository. What am I missing about the callbag specification?

If this is not the place where to ask such a thing, please close this issue and ignore me πŸ˜„

Address Error Handling In Spec?

  1. This is beautiful. Elegant!

  2. You may have intentionally omitted error handling from the spec. Perhaps the intent is that errors and their handling are specific to the domain where callbags are being deployed, are a type of β€œdata” to be passed, and up to the user to recognize in that data stream.

  3. I obviously haven’t given this as much thought as the creators, but wonder if errors would be an appropriate use of one of the reserved types?

It would be nice to be able to write something like:

pipe(
  fromIter( ... ),
  forEach( ... ),
  catch( ... )
);
  1. Alternately, if (2) (or some variant) is the case, a brief statement of intent related to errors in the spec could answer this.

Thank you for this contribution to the community!

Why do we say sink is a callbag?

From the definition,

  • all callbags are (type: 0 | 1 | 2, payload?: any) => void
  • sink is a callbag

But in callback-basics and this artical we see forEach as a sink, it receives only a source, source => void, the signature is different from a callbag. But why do we say forEach is a sink (and thus a callbag) ?

Versioning

Is this spec stable? If so, it should have a release pinned to a version number.

As a side note, it would also be helpful to have historic documentation published for previous versions and a statement about breaking change philosophy (i.e. is it committed to be always backwards-compatible like Ecmascript or does it follow semver).

support ES2015 import syntax

The examples and (all) callbags use module and require. While this works perfectly in Node.js it won't work in the browser without a bundling step.
So why not use ES2015 import and export syntax instead, as it is going to be the next standard also for Node.js

What if an exception is thrown during callbag execution?

The callbag spec seems silent about how to deal with exceptions thrown during callbag execution.

For example, if the invocation of someCallbag(1, dataPayload) (i.e., a data delivery) somewhere in the middle of a callbag chain throws, is there any expectation about what should happen with the thrown error, or the overall execution of the chain?

Actually, same question for any of the defined payload types (0, 1, and 2), since any of these types of processing may encounter an exception.

Missing index.js referenced from "main" in package.json breaks Vite

I'm unable to use callbag with Vite, due to the "main" field in package.json being set to index.js, which does not exist.

I get the following error:

  The plugin "vite:dep-scan" was triggered by this import

    index.ts:1:23:
      1 β”‚ import { Source } from 'callbag';
        β•΅                        ~~~~~~~~~

Build failed with 1 error:
node_modules/.pnpm/[email protected]/node_modules/vite/dist/node/chunks/dep-59dc6e00.js:40662:10: ERROR: [plugin: vite:dep-scan] Failed to resolve entry for package "callbag". The package may have incorrect main/module/exports specified in its package.json: Failed to resolve entry for package "callbag". The package may have incorrect main/module/exports specified in its package.json.

I set up a little repo to demo the problem, if needed: https://github.com/jesseskinner/callbag-vite-test

To solve this, I believe you could ether replace "index.js" with "types.d.ts" in the "main" field, just remove the "main" field, or create a dummy placeholder index.js file.

Thanks!

How to handle additional websocket connection state

I made initial implementation of the websocket callbag. The problem I encountered is: how to tell the user that WebSocket is connected. So far I implemented buffer that stores the data that should be sent to the WebSocket connection, but I believe that information about successful connection is way too important to hide it inside callbag.

I checked WebsocketSubject from RxJs, and they allow to pass openedObervable in the config, that stores connection state.

Is it possible to identify if a function is a Callbag?

This is an interop question, but symbol-observable has done a ton for observable libraries to be able to talk to each other. I'm just picturing a 'from' type scenario where the input type could be any number of things and you'd want to handle a callbag properly.

Errors and Termination

First, Callbags are absolutely marvelous! I'm so jealous you thought of this first because it's so simple and elegant it makes me cry a little πŸ˜‚ but also so happy they are in the world now. Really great stuff; I can't wait to re-write everything using this model πŸŽ‰

A couple of questions (mostly around the same topic):

  • Why are errors tied to termination? This is a carry-over from Observables but perhaps Callbags should be not be opinionated about such things. When a relationship is terminated the second argument doesn't necessarily have to indicate error but rather some undefined (type: any) payload that could be some completion data or an error. This is currently the case for data passing (type 1) as any puller or listener has to check 'data' if the producer wants to send any non-terminating error messages.
  • I'm sure you've considered this but perhaps the introduction of a 4th type could be errors πŸ˜„ -- 0) handshake 1) data 2) error 3) termination. I've advocated for this style before in other libs/proposals and TBH, I'm asking again because I haven't understood and/or been satisfied with the provided answers. Doing something like this would be in line with Railway Oriented Programming and make Callbag producers more flexible in terms of the message types they send down (not having to terminate to send an error) and operators could just pass the errors along without even having to be modified from their current implementations:
const map = f => source => (start, sink) => {
  if (start !== 0) return;
  source(0, (t, d) => {
    sink(t, t === 1 ? f(d) : d)
  });
};

I'm a firm believer in this idea because unless you're incorporating some kind of try/catch system in all operators/producers any "error" sent down the pipe is an operational error as there is no guidance on it being re-thrown when it reaches a consumer that doesn't handle it (remember unhandled Promise rejections are thrown errors -- even though they can be caught later). Additionally (and this argument is coming back to me as I type), if your producer wants to send down more structured payloads e.g. { data, error } -- your operators become a lot less pretty e.g.

 const source = map(x => {
  if (x.data) { 
    return { ...x, data: x.data * 0.1 };
  } else { 
    return x; 
  })(fromIter([{ data: 10}, {error: 'ran out of numbers'}]));

I feel like if Callbags aren't going to be opinionated at all about errors, then at least they shouldn't specify that errors represent the payload argument in a terminating message.

Anyway, very curious to hear your thoughts on this and my apologies if you've heard this before from me and it just sounds like a broken record.

sink termination propagation

I've noticed a lot of the callbag libraries out there don't propagate termination backwards to sources, which can leave dangling resources... maybe we could clarify this part of the protocol?

e.g. I have a callbag source that reads from a database. However, if there is an error further down along the callbag chain, or if the sink is terminated. The termination signal needs to be propagated back in order to free resources. Which is commonly not the case with existing callbag elements.

 (start, sink) => {
    if (start !== 0) {
      return
    }

    let changes = null
    let paused = true
    let queue = []

    sink(0, (t, d) => {
      if (t === 1) {
        if (queue.length > 0) {
          sink(1, queue.shift())
          paused = true
        } else {
          paused = false
        }

        if (!changes && queue.length < 512) {
          changes = db
            .changes({
              since,
              live: true,
              retry: true
            })
            .on('change', change => {
              since = change.seq
              if (paused) {
                sink(1, change)
                paused = true
              } else {
                queue.push(change)
              }

              if (queue.length > 1024) {
                changes.cancel()
                changes = null
              }
            })
            .on('error', err => {
              sink(2, err)
            })
        }
      } else if (t === 2) {
        // NOTE: Early termination...
        if (changes) {
          changes.cancel()
          changes = null
        }
      }
    })

A more concise way

Observable: (n,c)=>disposable

n:d=>void
c:()=>void
disposable:()=>void

Operator: source:Observable=>Observable

for example

exports.interval = period => n => {
    let i = 0;
    const id = setInterval(() => n(i++), period)
    return () => clearInterval(id)
}
exports.tap = f => source => (n, c) => source(d => (f(d), n(d)), c);
exports.map = f => source => (n, c) => source(d => n(f(d)), c);
exports.take = count => source => (n, c, _count = count) => {
    const defer = source(d => (n(d), --_count === 0 && (defer(), c())), c)
    return defer
}

exports.subscribe = (n, e = noop, c = noop) => source => source(n, once(err => err ? e(err) : c()))

it all works
sources

Express variance in types

Putting it in the best type theoretical way I can muster, afaict, Callbag<I, O> is covariant on I and contravariant on O. (eg: Source<"a" | "b"> should not be assignable to Source<"a">)

This isn't expressed in types (ie with current types you can assign Source<"a"> to Source<"a" | "b"> which shouldn't be allowed as it can cause runtime errors). This can be simply fixed by using an interface instead of a type alias.

Moreover, it would nice to use interfaces for Source and Sink as well as it would preserve names in error messages (right now error messages have Callbag<never, T> instead of Source<T> as type aliases don't "create new names")

If the changes are agreed upon I would love to send a PR! :)

EDIT: Also this would be a breaking change but only for those who already have error-prone code so imo no need for a major bump

Confusion about spec

The callbag spec dictates that "termination should not be mutual". In the implementation of callbag-take, on line 15, when sink(t, d) is called and taken === max, if sink terminates its source (callback-take) immediately by calling talkback(2), mutual termination will occur after sink(2) is called on line 17. Does the callbag spec expect some degree of asynchronity (setTimeout, process.nextTick)?

Use named constants and non-zero power of two values for type parameter

type = 0 is the reason behind the infamous typeof null === 'object' bug and countless other loose equality bugs in the wild. It would be better to use non-zero power of two values instead, which would also allow for things like type === T_START | T_DATA should the spec ever allow it in the future

Ability to detect callbags

In the spec, a callbag is just a simple function. This is great for keeping things lightweight, but it would be useful to be able to type check against them.

Have you thought at all about adding some sort of type information to callbags? Something which can be inspected at runtime to determine if you are working with a callbag, or just a normal function?

Calling a sourceTalkback with 1 and data

Are there uses for pulling from sources by sending the source data?

For example, you could build an HTTP client that, when it receives 1, data can make an HTTP request with that data, and send the responses down the chain of callbags.

Are there problems with using this pattern with callbags? Bad practice? Does it even make sense given the purpose of callbags?

Just trying to get my head around callbags and the thought came up as a potential way of avoiding the costs with long chains of callbags, as well as potentially open the door to preval-ing your callbag-related code so you can compile away the dependencies (premature optimization, I know, but I just wanted to see what has been done in this use case of callbags).

Backpressure

In order to fully support backpressure don't we need to make sure that operators that don't know the e.g. pause & resume types propagate them upwards?

i.e. could we add to the protocol something like

if a callbag receives a type it doesn't understand it should propagate it.

and maybe possibly a pause === 3 and resume === 4, type?

More readable implementation of callback based duplex stream

What about this way of implementing callbags? Its more readable, although a few more lines of code, and i haven't analyzed the performance overhead. Doesn't seem to conform to the type specification for a callbag but it should work as a duplex stream.

Works by creating a separate context object per pipe, one for the producer and one for the consumer and then binding the "receiver" function with the context object. The bound functions are given as callbacks to the other person on the end of the pipe.

So the producer gets the consumer's receiver function bound with the receiver's context and
the consumer gets the producer's receiver function bound with the producer's context.

function pipe(producer, consumer)
{
  const producerCtx = {};
  const consumerCtx = {};

  producerCtx.sender = consumer.receiver.bind(consumerCtx);
  consumerCtx.sender = producer.receiver.bind(producerCtx);

  consumerCtx.sender(0);
}

const intervalProducer =
{
  receiver(cmd, ...args)
  {
    if(cmd === 0)
    {
      const ctx = this;
      ctx.i = args[0] !== undefined ? args[0] : 0;
      ctx.sender(0);
      ctx.handle = setInterval(() => ctx.sender(1, ctx.i++), 1000);
    }
    else if(cmd === 1 && args[0] !== undefined)
    {
      this.i = args[0];
    }
    else if(cmd === 2)
    {
      clearInterval(this.handle);
    }
  }
};

const loggingConsumer = id => ({
  receiver(cmd, ...args)
  {
    if(cmd === 0)
    {
      const ctx = this;
      setTimeout(() => ctx.sender(2), 3500);
    }
    else if(cmd === 1)
    {
      console.log('consumer id:', id, ', got:',args[0]);
    }
    else if(cmd === 2)
    {
    }
  }
});

console.clear();

//multiple consumer objects on a single producer object, different contexts for each pipe:
for(let i = 0; i < 10; i++)
{
  pipe(intervalProducer, loggingConsumer(i));    
}

//single consumer object on a single producer object, different contexts for each pipe:
const cons420 = loggingConsumer(420);

for(let i = 0; i < 10; i++)
{
  pipe(intervalProducer, cons420);    
}

// single consumer object and single context:
pipe(intervalProducer, loggingConsumer(1000000));```

Control flow assumptions

Although it is bad practice to synchronously call callbacks everywhere due to potential call stack overflows, the spec does not disallow this. It seems like there is a widespread assumption that control flow returns to the calling function immediately due to asynchronity or trampolining. In callbag-take, this assumption results in multiple termination. I propose some possible solutions to the conflict between this common assumption and the spec:

  1. Write an "adapter" callbag to introduce asynchronity by using setTimeout(cb, 0)
  2. Add restrictions on control flow to the spec

Are asynchronous handshakes permitted?

The callbag spec doesn't seem to indicate whether asynchronous handshakes are permitted. It uses only the language "greeting back", without indicating how this may occur (in terms of timing).

I can imagine source implementations that must perform some work before they are able to logically "commit" to greeting back their sinks. Is any delay in handshaking allowed, or is expected that all greeting be strictly synchronous?

Related, what if source initialization fails? Should the source greet the sink back anyway, and then perhaps push nothing but an error (2 with truthy payload) to its sink, ignoring all other interim requests (if any) from the sink?

Issues with type 2 occurring more than once

I created two custom callbags. The first reverses the order of the items. The second calls a done function when the stream has completed. However, my done function is called twice instead of once as I would expect. If I comment out the reverse call, done is called one time as I expect. What's the trick?

const {fromIter, iterate, interval, map, filter, take, pipe} = require('callbag-basics');
const subject = require('callbag-subject');
const forEach = require('callbag-for-each');

const reverse = function() {

	return function(source) {
		return function(start, sink) {
			if (start === 0) {
				let talkback;
				let array;
				source(0, function(type, data) {
					if (type === 0) {
						array = [];
						talkback = data;
						sink(type, data)
					} else if (type === 1) {
						array.push(data);
						talkback(1);
					} else if (type === 2) {
						while (array.length > 0) {
							sink(1, array.pop());
						}
						sink(2, data);
					}
				});
			}
		}
	}
};

const done = function(operation) {
	
	return function(source) {
		return function(start, sink) {
			let talkback;
			if (start === 0) {
				source(0, function(type, data) {
					if (type === 0) {
						talkback = data;
						sink(type, data);
					} else if (type === 1) {
						sink(type, data);
					} else if (type === 2) {
						operation();
					}
				});
			}
		}
	}
};

pipe(
	fromIter([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]),
	reverse(),
	done(function() {
		console.log('done');
	}),
	iterate(function(each) {
		console.log(each)
	})
);

Π‘reate a callbag from a react event

The idea behind callbags is great, but it's not easy to seamlessly integrate it with react. The problem is lying in that fact that Listenable factory should have just enough information in factory arguments to subscribe to some event and emit sink(1, data) in this case. But with react way of passing a handler function it's not so easy. Let's imagine small react component

class MyComponent extends React.Component {
  handleClick() => {}

  render() {
    return <button onClick={this.handleClick} />
  }
}

What we want to do is to somehow subscribe to the calls of handleClick with callbag factory and use the event, something like this

class MyComponent extends React.Component {
  constructor(props) {
    super(props)

    pipe(
      reactEvent(this.handleClick),
      observe(event => this.props.onClick(event))
    )
  }

  handleClick() => {}

  render() {
    return <button onClick={this.handleClick} />
  }
}

But a reference to the function is not enough to subscribe to its calls, to do it we need to ask a handler to call some method (which is sink(1, event)), but we don't have the sink until someone will do a handshake, so it's basically chicken and egg problem.

I puzzled over it and came up with this solution, there will be a ref-like callback to set our handler

class MyComponent extends React.Component {
  handleClick

  constructor(props) {
    super(props)

    pipe(
      reactEvent(send => { this.handleClick = send }),
      observe(event => this.props.onClick(event))
    )
  }

  render() {
    return <button onClick={this.handleClick} />
  }
}

with the implementation of reactEvent

const reactEvent = cb => (start, sink) => {
  if (start !== 0) return
  const talkback = (t, d) => {
    if (t === 2) cb(null)
  }
  sink(0, talkback)
  cb((event) => sink(1, event))
}

Here's the current implementation
I found this solution acceptable, but it's still kinda ugly in my opinion, especially for the stateless component (this let handleClick floating around). Maybe I'm missing something and there's an easier way to do it.

I though about adding a breaking change to the spec which will make callbags pass a value through returns across the pipe of callbags, like

const event = (start, sink) => {
  if (start !== 0) return
  sink(0)
  return (event) => sink(1, event)
}

and

const observe = operation => source => {
  return source(0, (t, d) => {
    if (t === 1) operation(d);
  });
}

In this case we can do something like this, and the best thing is that it can be applied not only to react!

class MyComponent extends React.Component {
  handleClick = pipe(
    event,
    observe(event => this.props.onClick(event))
  )

  render() {
    return <button onClick={this.handleClick} />
  }
}

But it's still looks kinda hacky to me (and I didn't even think about edge cases) and will require changing of a bunch of existing callbags (which is, given its simplicity, not a very cumbersome task, but still). Sorry for the long post, what do you think about the current solution and spec proposal?

Ability to discover what IDs a callbag supports

Edit: Add a means of querying supported IDs from a callbag, remove redundant ID array.

Could there be a way to determine what IDs a callbag's talkback callee supports? It would work like this:

  • testFunc() - Get an array of all supported IDs for this callbag. Duplicates should be tolerated by the receiver, but the sender should attempt to remove them.
    • This call must be reentrant and must return an array carrying only the same set of elements ignoring duplicates and identity.
    • The receiver must not modify this array - the sender may choose to return an identical reference.
  • testFunc(id) - Return true if the callbag supports this ID, false otherwise.
    • This call must be both reentrant and onto for all possible ids, ignoring identity.
  • testFunc(null) and testFunc(undefined) must be treated the same as testFunc().
  • All arguments beyond the first must be ignored.

Here would be the other spec requirements:

  • This testFunc would be sent as a second data parameter to each call: callbag(0, talkback, testFunc).
    • A callbag should operate only on the intersection of the talkback's supported IDs and the IDs it's greeted with.
    • During a handshake, if the callbag payload is the talkback itself, the callbag must greet with the same testFunc it was greeted with.
    • During a handshake, if the callbag payload is not the talkback itself, but a call to it could result in a possibly asynchronous call to the talkback due to the callbag itself (read: operator sources and sinks):
      • It must greet with a testFunc returning the intersection of what it supports and what the talkback it was greeted with supports.
      • The resulting merged testFunc must forward all IDs it doesn't support to the greeted talkback's testFunc via a tail call without modification to the argument.
  • If the callbag is a talkback created by a source for the purpose of sending data to a sink:
    • The callbag must forward all calls with IDs it can't itself handle to the sink with their associated data via tail call, without modification to the data itself. This includes each unsupported ID id where testFunc(id) returns falsy for both the callbag's testFunc and the associated sink's testFunc.
    • The callbag may return choose to forward calls with ID 4 and required ID data arguments to their sinks via tail call.
  • If the callbag is anything else:
    • The callbag's testFunc must return truthy for each ID required for its type.
    • Callbags must ignore and drop any ID id where testFunc(id) returns falsy.
  • For backwards compatibility, when the testFunc is missing, it is assumed to default to this:
    • When called with any non-nullish argument, it returns true.
    • When called with anything else, it returns a proxy to an observably infinite array whose values are the same as its indices' numeric values.
    • Note: A callbag might choose to not reify this, and just use null/undefined instead.

Here's a few things this could enable:

  • Make the required contract smaller by making most builtins optional 0-9 extensions instead:
    • Making termination an optional extension rather than builtin (think: hardware pin, map/filter)
    • Making fatal errors an optional extension rather than builtin (think: numeric sequence generators)
    • Making backpressure handling something operators could try to avoid (most intermediate sinks don't need this).
    • These two would reduce the required ID count to just 0.
  • Adding a new optional 0-9 extension:
    • Making non-fatal errors a common extension: #41
  • Allowing "return" values for callbags by not requiring termination reasons be errors
  • Allowing other domain-specific data to be sent as part of the callbag: #38

There are a few things that should be added to callbag-basics's utilities to sugar over certain inconsistencies:

  • A utility for merging two testFuncs, since it's non-trivial and somewhat perf-sensitive.
    • This would be useful for most transforms.
    • The returned function could be optimized for the IDs it needs.
  • If optional 0-9 extensions are allowed and the above changes made:
    • A utility to send a fatal error with a testFunc, falling back on sending it non-fatally and immediately terminating (if possible) or just terminating normally with the error.
    • A utility to send a non-fatal error with a testFunc, falling back on sending it fatally (if possible) or just terminating normally with the error.

Consuming pullable sources needs careful handling to avoid stack overflow

A common practice to consume pullable sources is to pull an item on every push.
For an example, see staltz/callbag-iterate.

That works, and is conceptually very nice and simple.

But in practice, when the source has many items, this leads to a stack overflow:

  1. The sink pulls an item by calling source(1).
  2. Inside that call, the source pushes an item back by invoking sink(1, item).
  3. Inside that call, the sink pulls another item by calling source(1).
  4. ...

Thus, every pull and every push adds one frame to the stack, until either the source terminates, or the stack overflows.

But with some care, the sink can avoid stack overflow.

In pull request zebulonj/callbag-pump#1, I fixed this for the pump operator by pulling in a loop while an item is pushed back, and restarting the loop on every push if needed.

I choosed pump so that the proposed solution can be reused for every pipe of operators, because pump effectively converts an arbitrary source to a push-based one, thereby eliminating the mentioned problem for all operators further down the pipe.

For example, the following pipe will lead to a stack overflow:

pipe(
  range(0, 10000),
  forEach(x => console.log(x))
);

But that can be fixed by inserting pump before forEach:

pipe(
  range(0, 10000),
  pump,
  forEach(x => console.log(x))
);

On the other hand, some operators do require input sources to be pullable, but they do not have this problem.
For example, toIterable does not directly consume the input source, but converts it to an Iterable, which is then consumed iteratively by the caller.

So to summarize:
Properly consuming pullable sources is harder than one might think at first, but not impossible: It can be done either by using a similar strategy as pump, or directly applying pump before consuming.

A more verbose way to declare callbags : useful ?

Hi,

I just created callbag-from-verbose, a way of creating callbags that I found more verbose but also more explicit.

Instead of calling producer(0, consumer), you can call introduceAtoB(consumer, producer)
Instead of calling producer(1, consumer), you can call send(consumer, message)
Instead of calling producer(2), you can call stop(consumer)

And when declaring your callbag, instead of typing:

function producer(type, payload){
  if(type === 0) {...}
  if(type === 1) {...}
  if(type === 2) {...}
}

you can type:

const producer = fromVerboseCallbag({
  meet: other => { ... },
  receive: message => { ... },
  stop: () => { ... }
})

I find it usefull if only to better understand callbags when you're a beginner like me.

And you, do you find this useful too?

PS : I haven't found a better place to chat about this subject. If there is one, please tell me πŸ˜‰

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    πŸ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. πŸ“ŠπŸ“ˆπŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❀️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.