Giter VIP home page Giter VIP logo

scala.rx's Introduction

Scala.Rx 0.4.1 Build Status Join the chat at https://gitter.im/lihaoyi/scala.rx

Scala.Rx is a change propagation library for Scala. Scala.Rx gives you Reactive variables (Rxs), which are smart variables who auto-update themselves when the values they depend on change. The underlying implementation is push-based FRP based on the ideas in Deprecating the Observer Pattern.

A simple example which demonstrates the behavior is:

import rx._
val a = Var(1); val b = Var(2)
val c = Rx{ a() + b() }
println(c.now) // 3
a() = 4
println(c.now) // 6

The idea being that 99% of the time, when you re-calculate a variable, you re-calculate it the same way you initially calculated it. Furthermore, you only re-calculate it when one of the values it depends on changes. Scala.Rx does this for you automatically, and handles all the tedious update logic for you so you can focus on other, more interesting things!

Apart from basic change-propagation, Scala.Rx provides a host of other functionality, such as a set of combinators for easily constructing the dataflow graph, compile time checks for a high degree of correctness, and seamless interop with existing Scala code. This means it can be easily embedded in an existing Scala application.

Contents

Getting Started

Scala.Rx is available on Maven Central. In order to get started, simply add the following to your build.sbt:

libraryDependencies += "com.lihaoyi" %% "scalarx" % "0.4.1"

After that, opening up the sbt console and pasting the above example into the console should work! You can proceed through the examples in the Basic Usage page to get a feel for what Scala.Rx can do.

ScalaJS

In addition to running on the JVM, Scala.Rx also compiles to Scala-Js! This artifact is currently on Maven Central and an be used via the following SBT snippet:

libraryDependencies += "com.lihaoyi" %%% "scalarx" % "0.4.1"

There are some minor differences between running Scala.Rx on the JVM and in Javascript particularly around asynchronous operations, the parallelism model and memory model. In general, though, all the examples given in the documentation below will work perfectly when cross-compiled to javascript and run in the browser!

Scala.rx 0.4.1 is only compatible with ScalaJS 0.6.5+.

Using Scala.Rx

The primary operations only need a import rx._ before being used, with addtional operations also needing a import rx.ops._. Some of the examples below also use various imports from scala.concurrent or scalatest aswell.

Basic Usage

import rx._

val a = Var(1); val b = Var(2)
val c = Rx{ a() + b() }
println(c.now) // 3
a() = 4
println(c.now) // 6

The above example is an executable program. In general, import rx._ is enough to get you started with Scala.Rx, and it will be assumed in all further examples. These examples are all taken from the unit tests.

The basic entities you have to care about are Var, Rx and Obs:

  • Var: a smart variable which you can get using a() and set using a() = .... Whenever its value changes, it pings any downstream entity which needs to be recalculated.
  • Rx: a reactive definition which automatically captures any Vars or other Rxs which get called in its body, flagging them as dependencies and re-calculating whenever one of them changes. Like a Var, you can use the a() syntax to retrieve its value, and it also pings downstream entities when the value changes.
  • Obs: an observer on one or more Var s or Rx s, performing some side-effect when the observed node changes value and sends it a ping.

Using these components, you can easily construct a dataflow graph, and have the various values within the dataflow graph be kept up to date when the inputs to the graph change:

val a = Var(1) // 1

val b = Var(2) // 2

val c = Rx{ a() + b() } // 3
val d = Rx{ c() * 5 } // 15
val e = Rx{ c() + 4 } // 7
val f = Rx{ d() + e() + 4 } // 26

println(f.now) // 26
a() = 3
println(f.now) // 38

The dataflow graph for this program looks like this:

Dataflow Graph

Where the Vars are represented by squares, the Rxs by circles and the dependencies by arrows. Each Rx is labelled with its name, its body and its value.

Modifying the value of a causes the changes the propagate through the dataflow graph

Dataflow Graph

As can be seen above, changing the value of a causes the change to propagate all the way through c d e to f. You can use a Var and Rx anywhere you an use a normal variable.

The changes propagate through the dataflow graph in waves. Each update to a Var touches off a propagation, which pushes the changes from that Var to any Rx which is (directly or indirectly) dependent on its value. In the process, it is possible for a Rx to be re-calculated more than once.

Observers

As mentioned, Obs s can be created from Rx s or Var s and be used to perform side effects when they change:

val a = Var(1)
var count = 0
val o = a.trigger {
  count = a.now + 1
}
println(count) // 2
a() = 4
println(count) // 5

This creates a dataflow graph that looks like:

Dataflow Graph

When a is modified, the observer o will perform the side effect:

Dataflow Graph

The body of Rxs should be side effect free, as they may be run more than once per propagation. You should use Obs s to perform your side effects, as they are guaranteed to run only once per propagation after the values for all Rxs have stabilized.

Scala.Rx provides a convenient .foreach() combinator, which provides an alternate way of creating an Obs from an Rx:

val a = Var(1)
var count = 0
val o = a.foreach{ x =>
  count = x + 1
}
println(count) // 2
a() = 4
println(count) // 5

This example does the same thing as the code above.

Note that the body of the Obs is run once initially when it is declared. This matches the way each Rx is calculated once when it is initially declared. but it is conceivable that you want an Obs which fires for the first time only when the Rx it is listening to changes. You can do this by using the alternate triggerLater syntax:

val a = Var(1)
var count = 0
val o = a.triggerLater {
  count = count + 1
}
println(count) // 0
a() = 2
println(count) // 1

An Obs acts to encapsulate the callback that it runs. They can be passed around, stored in variables, etc.. When the Obs gets garbage collected, the callback will stop triggering. Thus, an Obs should be stored in the object it affects: if the callback only affects that object, it doesn't matter when the Obs itself gets garbage collected, as it will only happen after that object holding it becomes unreachable, in which case its effects cannot be observed anyway. An Obs can also be actively shut off, if a stronger guarantee is needed:

val a = Var(1)
val b = Rx{ 2 * a() }
var target = 0
val o = b.trigger {
  target = b.now
}
println(target) // 2
a() = 2
println(target) // 4
o.kill()
a() = 3
println(target) // 4

After manually calling .kill(), the Obs no longer triggers. Apart from .kill()ing Obss, you can also kill Rxs, which prevents further updates.

In general, Scala.Rx revolves around constructing dataflow graphs which automatically keep things in sync, which you can easily interact with from external, imperative code. This involves using:

  • Vars as inputs to the dataflow graph from the imperative world
  • Rxs as the intermediate nodes in the dataflow graphs
  • Obss as the output from the dataflow graph back into the imperative world

Complex Reactives

Rxs are not limited to Ints. Strings, Seq[Int]s, Seq[String]s, anything can go inside an Rx:

val a = Var(Seq(1, 2, 3))
val b = Var(3)
val c = Rx{ b() +: a() }
val d = Rx{ c().map("omg" * _) }
val e = Var("wtf")
val f = Rx{ (d() :+ e()).mkString }

println(f.now) // "omgomgomgomgomgomgomgomgomgwtf"
a() = Nil
println(f.now) // "omgomgomgwtf"
e() = "wtfbbq"
println(f.now) // "omgomgomgwtfbbq"

As shown, you can use Scala.Rx's reactive variables to model problems of arbitrary complexity, not just trivial ones which involve primitive numbers.

Error Handling

Since the body of an Rx can be any arbitrary Scala code, it can throw exceptions. Propagating the exception up the call stack would not make much sense, as the code evaluating the Rx is probably not in control of the reason it failed. Instead, any exceptions are caught by the Rx itself and stored internally as a Try.

This can be seen in the following unit test:

val a = Var(1)
val b = Rx{ 1 / a() }
println(b.now) // 1
println(b.toTry) // Success(1)
a() = 0
intercept[ArithmeticException]{
  b()
}
assert(b.toTry.isInstanceOf[Failure])

Initially, the value of a is 1 and so the value of b also is 1. You can also extract the internal Try using b.toTry, which at first is Success(1).

However, when the value of a becomes 0, the body of b throws an ArithmeticException. This is caught by b and re-thrown if you try to extract the value from b using b(). You can extract the entire Try using toTry and pattern match on it to handle both the Success case as well as the Failure case.

When you have many Rxs chained together, exceptions propagate forward following the dependency graph, as you would expect. The following code:

val a = Var(1)
val b = Var(2)

val c = Rx{ a() / b() }
val d = Rx{ a() * 5 }
val e = Rx{ 5 / b() }
val f = Rx{ a() + b() + 2 }
val g = Rx{ f() + c() }

inside(c.toTry){case Success(0) => () }
inside(d.toTry){case Success(5) => () }
inside(e.toTry){case Success(2) => () }
inside(f.toTry){case Success(5) => () }
inside(g.toTry){case Success(5) => () }

b() = 0

inside(c.toTry){case Failure(_) => () }
inside(d.toTry){case Success(5) => () }
inside(e.toTry){case Failure(_) => () }
inside(f.toTry){case Success(3) => () }
inside(g.toTry){case Failure(_) => () }

Creates a dependency graph that looks like the follows:

Dataflow Graph

In this example, initially all the values for a, b, c, d, e, f and g are well defined. However, when b is set to 0:

Dataflow Graph

c and e both result in exceptions, and the exception from c propagates to g. Attempting to extract the value from g using g.now, for example, will re-throw the ArithmeticException. Again, using toTry works too.

Nesting

Rxs can contain other Rxs, arbitrarily deeply. This example shows the Rxs nested two levels deep:

val a = Var(1)
val b = Rx{
    (Rx{ a() }, Rx{ math.random })
}
val r = b.now._2.now
a() = 2
println(b.now._2.now) // r

In this example, we can see that although we modified a, this only affects the left-inner Rx, neither the right-inner Rx (which takes on a different, random value each time it gets re-calculated) or the outer Rx (which would cause the whole thing to re-calculate) are affected. A slightly less contrived example may be:

var fakeTime = 123
trait WebPage{
    def fTime = fakeTime
    val time = Var(fTime)
    def update(): Unit  = time() = fTime
    val html: Rx[String]
}
class HomePage(implicit ctx: Ctx.Owner) extends WebPage {
    val html = Rx{"Home Page! time: " + time()}
}
class AboutPage(implicit ctx: Ctx.Owner) extends WebPage {
    val html = Rx{"About Me, time: " + time()}
}

val url = Var("www.mysite.com/home")
val page = Rx{
    url() match{
        case "www.mysite.com/home" => new HomePage()
        case "www.mysite.com/about" => new AboutPage()
    }
}

println(page.now.html.now) // "Home Page! time: 123"

fakeTime = 234
page.now.update()
println(page.now.html.now) // "Home Page! time: 234"

fakeTime = 345
url() = "www.mysite.com/about"
println(page.now.html.now) // "About Me, time: 345"

fakeTime = 456
page.now.update()
println(page.now.html.now) // "About Me, time: 456"

In this case, we define a web page which has a html value (a Rx[String]). However, depending on the url, it could be either a HomePage or an AboutPage, and so our page object is a Rx[WebPage].

Having a Rx[WebPage], where the WebPage has an Rx[String] inside, seems natural and obvious, and Scala.Rx lets you do it simply and naturally. This kind of objects-within-objects situation arises very naturally when modelling a problem in an object-oriented way. The ability of Scala.Rx to gracefully handle the corresponding Rxs within Rxs allows it to gracefully fit into this paradigm, something I found lacking in most of the Related Work I surveyed.

Most of the examples here are taken from the unit tests, which provide more examples on guidance on how to use this library.

Ownership Context

In the last example above, we had to introduce the concept of Ownership where Ctx.Owner is used. In fact, if we leave out (implicit ctx: Ctx.Owner), we would get the following compile time error:

error: This Rx might leak! Either explicitly mark it unsafe (Rx.unsafe) or ensure an implicit RxCtx is in scope!
           val html = Rx{"Home Page! time: " + time()}

To understand ownership it is important to understand the problem it fixes: leaks. As an example, consider this slight modification to the first example:

var count = 0
val a = Var(1); val b = Var(2)
def mkRx(i: Int) = Rx.unsafe { count += 1; i + b() }
val c = Rx{ 
  val newRx = mkRx(a()) 
  newRx() 
}
println(c.now, count) //(3,1)

In this version, the function mkRx was added, but otherwise the computed value of c remains unchanged. And modfying a appears to behave as expected:

a() = 4
println(c.now, count) //(6,2)

But if we modify b we might start to notice something not quite right:

b() = 3 
println(c.now, count) //(7,5) -- 5??

(0 to 100).foreach { i => a() = i }
println(c.now, count) //(103,106)

b() = 4
println(c.now, count) //(104,211) -- 211!!!

In this example, even though b is only updated a few times, the count value starts to soar as a is modified. This is mkRx leaking! That is, every time c is recomputed, it builds a whole new Rx that sticks around and keeps on evaluating, even after it is no longer reachable as a data dependency and forgotten. So after running that (0 to 100).foreach statment, there are over 100 Rxs that all fire every time b is changed. This clearly is not desirable.

However, by adding an explicit owner (and removing unsafe), we can fix the leak:

var count = 0
val a = Var(1); val b = Var(2)
def mkRx(i: Int)(implicit ctx: Ctx.Owner) = Rx { count += 1; i + b() }
val c = Rx{ 
  val newRx = mkRx(a()) 
  newRx() 
}
println(c.now,count) // (3,1)
a() = 4
println(c.now,count) // (6,2)
b() = 3
println(c.now,count) // (7,4)
(0 to 100).foreach { i => a() = i }
println(c.now,count) //(103,105)
b() = 4
println(c.now,count) //(104,107)

Ownership fixes leaks by keeping allowing a parent Rx to track its "owned" nested Rx. That is whenever an Rx recaculates, it first kills all of its owned dependencies, ensuring they do not leak. In this example, c is the owner of all the Rxs which are created in mkRx and kills them automatically every time c recalculates.

Data Context

Given either a Rx or a Var using () (aka apply) unwraps the current value and adds itself as a dependency to whatever Rx that is currently evaluating. Alternatively, .now can be used to simply unwrap the value and skips over becoming a data dependency:

val a = Var(1); val b = Var(2)
val c = Rx{ a.now + b.now } //not a very useful `Rx`
println(c.now) // 3
a() = 4
println(c.now) // 3 
b() = 5
println(c.now) // 3 

To understand the need for a Data context and how Data contexts differ from Owner contexts, consider the following example:

def foo()(implicit ctx: Ctx.Owner) = {
  val a = rx.Var(1)
  a()
  a
}

val x = rx.Rx{val y = foo(); y() = y() + 1; println("done!") }

With the concept of ownership, if a() is allowed to create a data dependency on its owner, it would enter infinite recursion and blow up the stack! Instead, the above code gives this compile time error:

<console>:17: error: No implicit Ctx.Data is available here!
        a()

We can "fix" the error by explicitly allowing the data dependencies (and see that the stack blows up):

def foo()(implicit ctx: Ctx.Owner, data: Ctx.Data) = {
  val a = rx.Var(1)
  a()
  a
}
val x = rx.Rx{val y = foo(); y() = y() + 1; println("done!") }
...
at rx.Rx$Dynamic$Internal$$anonfun$calc$2.apply(Core.scala:180)
  at scala.util.Try$.apply(Try.scala:192)
  at rx.Rx$Dynamic$Internal$.calc(Core.scala:180)
  at rx.Rx$Dynamic$Internal$.update(Core.scala:184)
  at rx.Rx$.doRecalc(Core.scala:130)
  at rx.Var.update(Core.scala:280)
  at $anonfun$1.apply(<console>:15)
  at $anonfun$1.apply(<console>:15)
  at rx.Rx$Dynamic$Internal$$anonfun$calc$2.apply(Core.scala:180)
  at scala.util.Try$.apply(Try.scala:192)
...

The Data context is the mechanism that an Rx uses to decide when to recaculate. Ownership fixes the problem of leaking. Mixing the two can lead to infinite recursion: when something is both owned and a data dependency of the same parent Rx.

Luckily though it is almost always the case that only one or the other context is needed. when dealing with dynamic graphs, it is almost always the case that only the ownership context is needed, ie functions most often have the form:

def f(...)(implicit ctx: Ctx.Owner) = Rx { ... }

The Data context is needed less often and is useful in, as an example, the case where it is desirable to DRY up some repeated Rx code. Such a funtion would have this form:

def f(...)(implicit data: Ctx.Data) = ...

This would allow some shared data dependency to be pulled out of the body of each Rx and into the shared function.

By splitting up the orthogonal concepts of ownership and data dependencies the problem of infinite recursion as outlined above is greatly limited. Explicit data dependencies also make it more clear when the use of a Var or Rx is meant to be a data dependency, and not just a simple read of the current value (ie .now). Without this distiction, it is easier to introduce "accidental" data dependencies that are unexpected and unintended.

Additional Operations

Apart from the basic building blocks of Var/Rx/Obs, Scala.Rx also provides a set of combinators which allow your to easily transform your Rxs; this allows the programmer to avoid constantly re-writing logic for the common ways of constructing the dataflow graph. The five basic combinators: map(), flatMap, filter(), reduce() and fold() are all modelled after the scala collections library, and provide an easy way of transforming the values coming out of an Rx.

Map

val a = Var(10)
val b = Rx{ a() + 2 }
val c = a.map(_*2)
val d = b.map(_+3)
println(c.now) // 20
println(d.now) // 15
a() = 1
println(c.now) // 2
println(d.now) // 6

map does what you would expect, creating a new Rx with the value of the old Rx transformed by some function. For example, a.map(_*2) is essentially equivalent to Rx{ a() * 2 }, but somewhat more convenient to write.

FlatMap

val a = Var(10)
val b = Var(1)
val c = a.flatMap(a => Rx { a*b() })
println(c.now) // 10
b() = 2
println(c.now) // 20

flatMap is analogous to flatMap from the collections library in that it allows for merging nested Rx s of type Rx[Rx[_]] into a single Rx[_].

This in conjunction with the map combinator allow for scala's for comprehension syntax to work with Rx s and Var s:

val a = Var(10)
val b = for {
  aa <- a
  bb <- Rx { a() + 5}
  cc <- Var(1).map(_*2)
} yield {
  aa + bb + cc
}

Filter

val a = Var(10)
val b = a.filter(_ > 5)
a() = 1
println(b.now) // 10
a() = 6
println(b.now) // 6
a() = 2
println(b.now) // 6
a() = 19
println(b.now) // 19

filter ignores changes to the value of the Rx that fail the predicate.

Note that none of the filter methods is able to filter out the first, initial value of a Rx, as there is no "older" value to fall back to. Hence this:

val a = Var(2)
val b = a.filter(_ > 5)
println(b.now)

will print out "2".

Reduce

val a = Var(1)
val b = a.reduce(_ * _)
a() = 2
println(b.now) // 2
a() = 3
println(b.now) // 6
a() = 4
println(b.now) // 24

The reduce operator combines subsequent values of an Rx together, starting from the initial value. Every change to the original Rx is combined with the previously-stored value and becomes the new value of the reduced Rx.

Fold

val a = Var(1)
val b = a.fold(List.empty[Int])((acc,elem) => elem :: acc)
a() = 2
println(b.now) // List(2,1)
a() = 3
println(b.now) // List(3,2,1)
a() = 4
println(b.now) // List(4,3,2,1)

Fold enables accumulation in a similar way to reduce, but can accumulate to some other type than that of the source Rx.

Each of these five combinators has a counterpart in the .all namespace which operates on Try[T]s rather than Ts, in the case where you need the added flexibility to handle Failures in some special way.

Asynchronous Combinators

These are combinators which do more than simply transforming a value from one to another. These have asynchronous effects, and can spontaneously modify the dataflow graph and begin propagation cycles without any external trigger. Although this may sound somewhat unsettling, the functionality provided by these combinators is often necessary, and manually writing the logic around something like Debouncing, for example, is far more error prone than simply using the combinators provided.

Note that none of these combinators are doing anything that cannot be done via a combination of Obss and Vars; they simply encapsulate the common patterns, saving you manually writing them over and over, and reducing the potential for bugs.

Future

import scala.concurrent.Promise
import scala.concurrent.ExecutionContext.Implicits.global
import rx.async._

val p = Promise[Int]()
val a = p.future.toRx(10)
println(a.now) //10
p.success(5)
println(a.now) //5

The toRx combinator only applies to Future[_]s. It takes an initial value, which will be the value of the Rx until the Future completes, at which point the the value will become the value of the Future.

This async can create Futures as many times as necessary. This example shows it creating two distinct Futures:

import scala.concurrent.Promise
import scala.concurrent.ExecutionContext.Implicits.global
import rx.async._

var p = Promise[Int]()
val a = Var(1)

val b: Rx[Int] = Rx {
  val f =  p.future.toRx(10)
  f() + a()
}
println(b.now) //11
p.success(5)
println(b.now) //6

p = Promise[Int]()
a() = 2
println(b.now) //12

p.success(7)
println(b.now) //9

The value of b() updates as you would expect as the series of Futures complete (in this case, manually using Promises).

This is handy if your dependency graph contains some asynchronous elements. For example, you could have a Rx which depends on another Rx, but requires an asynchronous web request to calculate its final value. With async, the results from the asynchronous web request will be pushed back into the dataflow graph automatically when the Future completes, starting off another propagation run and conveniently updating the rest of the graph which depends on the new result.

Timer

import rx.async._
import rx.async.Platform._
import scala.concurrent.duration._

val t = Timer(100 millis)
var count = 0
val o = t.trigger {
    count = count + 1
}

println(count) // 3
println(count) // 8
println(count) // 13

A Timer is a Rx that generates events on a regular basis. In the example above, using println in the console show that the value t() has increased over time.

The scheduled task is cancelled automatically when the Timer object becomes unreachable, so it can be garbage collected. This means you do not have to worry about managing the life-cycle of the Timer. On the other hand, this means the programmer should ensure that the reference to the Timer is held by the same object as that holding any Rx listening to it. This will ensure that the exact moment at which the Timer is garbage collected will not matter, since by then the object holding it (and any Rx it could possibly affect) are both unreachable.

Delay

import rx.async._
import rx.async.Platform._
import scala.concurrent.duration._

val a = Var(10)
val b = a.delay(250 millis)

a() = 5
println(b.now) // 10
eventually{
  println(b.now) // 5
}

a() = 4
println(b.now) // 5
eventually{
  println(b.now) // 4
}

The delay(t) combinator creates a delayed version of an Rx whose value lags the original by a duration t. When the Rx changes, the delayed version will not change until the delay t has passed.

This example shows the delay being applied to a Var, but it could easily be applied to an Rx as well.

Debounce

import rx.async._
import rx.async.Platform._
import scala.concurrent.duration._

val a = Var(10)
val b = a.debounce(200 millis)
a() = 5
println(b.now) // 5

a() = 2
println(b.now) // 5

eventually{
  println(b.now) // 2
}

a() = 1
println(b.now) // 2

eventually{
  println(b.now) // 1
}

The debounce(t) combinator creates a version of an Rx which will not update more than once every time period t.

If multiple updates happen with a short span of time (less than t apart), the first update will take place immediately, and a second update will take place only after the time t has passed. For example, this may be used to limit the rate at which an expensive result is re-calculated: you may be willing to let the calculated value be a few seconds stale if it lets you save on performing the expensive calculation more than once every few seconds.

Design Considerations

Simple to Use

This meant that the syntax to write programs in a dependency-tracking way had to be as light weight as possible, that programs written using FRP had to look like their normal, old-fashioned, imperative counterparts. This meant using DynamicVariable instead of implicits to automatically pass arguments, sacrificing proper lexical scoping for nice syntax.

I ruled out using a purely monadic style (like reactive-web), as although it would be far easier to implement the library in that way, it would be a far greater pain to actually use it. I also didn't want to have to manually declare dependencies, as this violates DRY when you are declaring your dependencies twice: once in the header of the Rx, and once more when you use it in the body.

The goal was to be able to write code, sprinkle a few Rx{}s around and have the dependency tracking and change propagation just work. Overall, I believe it has been quite successful at that!

Simple to Reason About

This means many things, but most of all it means having no globals. This greatly simplifies many things for someone using the library, as you no longer need to reason about different parts of your program interacting through the library. Using Scala.Rx in different parts of a large program is completely fine; they are completely independent.

Another design decision in this area was to have the parallelism and propagation-scheduling be left mainly to an implicit ExecutionContext, and have the default to simply run the propagation wave on whatever thread made the update to the dataflow graph.

  • The former means that anyone who is used to writing parallel programs in Scala/Akka is already familiar with how to deal with parallelizing Scala.Rx
  • The latter makes it far easier to reason about when propagations happen, at least in the default case: it simply happens right away, and by the time that Var.update() function has returned, the propagation has completed.

Overall, limiting the range of side effects and removing global state makes Scala.Rx easy to reason about, and means a developer can focus on using Scala.Rx to construct dataflow graphs rather than worry about un-predictable far-reaching interactions or performance bottlenecks.

Simple to Interop

This meant that it had to be easy for a programmer to drop in and out of the FRP world. Many of the papers I read in preparing for Scala.Rx described systems that worked brilliantly on their own, and had some amazing properties, but required that the entire program be written in an obscure variant of an obscure language. No thought at all was given to inter-operability with existing languages or paradigms, which makes it impossible to incrementally introduce FRP into an existing codebase.

With Scala.Rx, I resolved to do things differently. Hence, Scala.Rx:

  • Is written in Scala: an uncommon, but probably less-obscure language than Haskell or Scheme
  • Is a library: it is plain-old-scala. There is no source-to-source transformation, no special runtime necessary to use Scala.Rx. You download the source code into your Scala project, and start using it
  • Allows you to use any programming language construct or library functionality within your Rxs: Scala.Rx will figure out the dependencies without the programmer having to worry about it, without limiting yourself to some inconvenient subset of the language
  • Allows you to use Scala.Rx within a larger project without much pain. You can easily embed dataflow graphs within a larger object-oriented universe and interact with them via setting Vars and listening to Obss

Many of the papers reviewed show a beautiful new FRP universe that we could be programming in, if only you ported all your code to FRP-Haskell and limited yourself to the small set of combinators used to create dataflow graphs. On the other hand, by letting you embed FRP snippets anywhere within existing code, using FRP ideas in existing projects without full commitment, and allowing you easy interop between your FRP and non-FRP code, Scala.Rx aims to bring the benefits FRP into the dirty, messy universe which we are programming in today.

Limitations

Scala.Rx has a number of significant limitations, some of which arise from trade-offs in the design, others from the limitations of the underlying platform.

No "Empty" Reactives

The API of Rxs in Scala.Rx tries to follow the collections API as far as possible: you can map, filter and reduce over the Rxs, just as you can over collections. However, it is currently impossible to have a Rx which is empty in the way a collection can be empty: filtering out all values in a Rx will still leave at least the initial value (even if it fails the predicate) and Async Rxs need to be given an initial value to start.

This limitation arises from the difficulty in joining together possibly empty Rxs with good user experience. For example, if I have a dataflow graph:

val a = Var()
val b = Var()
var c = Rx{
    .... a() ...
    ... some computation ...
    ... b() ...
    result
}

Where a and b are initially empty, I have basically four options:

  • Block the current thread which is computing c, waiting for a and then b to become available.
  • Throw an exception when a() and b() are requested, aborting the computation of c but registering it to be restarted when a() or b() become available.
  • Re-write this in a monadic style using for-comprehensions.
  • Use the delimited continuations plugin to transform the above code to monadic code automatically.

The first option is a performance problem: threads are generally extremely heavy weight on most operation systems. You cannot reasonably make more than a few thousand threads, which is a tiny number compared to the amount of objects you can create. Hence, although blocking would be the easiest, it is frowned upon in many systems (e.g. in Akka, which Scala.Rx is built upon) and does not seem like a good solution.

The second option is a performance problem in a different way: with n different dependencies, all of which may start off empty, the computation of c may need to be started and aborted n times even before completing even once. Although this does not block any threads, it does seem extremely expensive.

The third option is a no-go from a user experience perspective: it would require far reaching changes in the code base and coding style in order to benefit from the change propagation, which I'm not willing to require.

The last option is problematic due to the bugginess of the delimited continuations plugin. Although in theory it should be able to solve everything, a large number of small bugs (messing up type inferencing, interfering with implicit resolution) combined with a few fundamental problems meant that even on a small scale project (less than 1000 lines of reactive code) it was getting painful to use.

No Automatic Parallelization at the Start

As mentioned earlier, Scala.Rx can perform automatic parallelization of updates occurring in the dataflow graph: simply provide an appropriate ExecutionContext, and independent Rxs will have their updates spread out over multiple cores.

However, this only works for updates, and not when the dataflow graph is being initially defined: in that case, every Rx evaluates its body once in order to get its default value, and it all happens serially on the same thread. This limitation arises from the fact that we do not have a good way to work with "empty" Rxs, and we do not know what an Rxs dependencies are before the first time we evaluate it.

Hence, we cannot start all our Rxs evaluating in parallel as some may finish before others they depend on, which would then be empty, their initial value still being computed. We also cannot choose to parallelize those which do not have dependencies on each other, as before execution we do not know what the dependencies are!

Thus, we have no choice but to have the initial definitions of Rxs happen serially. If necessary, a programmer can manually create independent Rxs in parallel using Futures.

Glitchiness and Redundant Computation

In the context of FRP, a glitch is a temporary inconsistency in the dataflow graph. Due to the fact that updates do not happen instantaneously, but instead take time to compute, the values within an FRP system may be transiently out of sync during the update process. Furthermore, depending on the nature of the FRP system, it is possible to have nodes be updated more than once in a propagation.

This may or may not be a problem, depending on how tolerant the application is of occasional stale inconsistent data. In a single-threaded system, it can be avoided in a number of ways

  • Make the dataflow graph static, and perform a topological sort to rank nodes in the order they are to be updated. This means that a node always is updated after its dependencies, meaning they will never see any stale data
  • Pause the updating of a node when it tries to call upon a dependency which has not been updated. This could be done by blocking the thread, for example, and only resuming after the dependency has been updated.

However, both of these approaches have problems. The first approach is extremely constrictive: a static dataflow graph means that a large amount of useful behavior, e.g. creating and destroying sections of the graph dynamically at run-time, is prohibited. This goes against Scala.Rx's goal of allowing the programmer to write code "normally" without limits, and letting the FRP system figure it out.

The second case is a problem for languages which do not easily allow computations to be paused. In Java, and by extension Scala, the threads used are operating system (OS) threads which are extremely expensive. Hence, blocking an OS thread is frowned upon. Coroutines and continuations could also be used for this, but Scala lacks both of these facilities.

The last problem is that both these models only make sense in the case of single threaded, sequential code. As mentioned on the section on Concurrency and Parallelism, Scala.Rx allows you to use multiple threads to parallelize the propagation, and allows propagations to be started by multiple threads simultaneously. That means that a strict prohibition of glitches is impossible.

Scala.Rx maintains somewhat looser model: the body of each Rx may be evaluated more than once per propagation, and Scala.Rx only promises to make a "best-effort" attempt to reduce the number of redundant updates. Assuming the body of each Rx is pure, this means that the redundant updates should only affect the time taken and computation required for the propagation to complete, but not affect the value of each node once the propagation has finished.

In addition, Scala.Rx provides the Obss, which are special terminal-nodes guaranteed to update only once per propagation, intended to produce some side effect. This means that although a propagation may cause the values of the Rxs within the dataflow graph to be transiently out of sync, the final side-effects of the propagation will only happen once the entire propagation is complete and the Obss all fire their side effects.

If multiple propagations are happening in parallel, Scala.Rx guarantees that each Obs will fire at most once per propagation, and at least once overall. Furthermore, each Obs will fire at least once after the entire dataflow graph has stabilized and the propagations are complete. This means that if you are relying on Obs to, for example, send updates over the network to a remote client, you can be sure that you don't have any unnecessary chatter being transmitted over the network, and when the system is quiescent the remote client will have the updates representing the most up-to-date version of the dataflow graph.

Related Work

Scala.Rx was not created in a vacuum, and borrows ideas and inspiration from a range of existing projects.

Scala.React

Scala.React, as described in Deprecating the Observer Pattern, contains a reactive change propagation portion (there called Signals) which is similar to what Scala.Rx does. However, it does much more than that: It contains implementations for using event-streams, and multiple DSLs using delimited continuations in order to make it easy to write asynchronous workflows.

I have used this library, and my experience is that it is extremely difficult to set up and get started. It requires a fair amount of global configuration, with a global engine doing the scheduling and propagation, even running its own thread pools. This made it extremely difficult to reason about interactions between parts of the programs: would completely-separate dataflow graphs be able to affect each other through this global engine? Would the performance of multithreaded code start to slow down as the number of threads rises, as the engine becomes a bottleneck? I never found answers to many of these questions, and had did not manage to contact the author.

The global propagation engine also makes it difficult to get started. It took several days to get a basic dataflow graph (similar to the example at the top of this document) working. That is after a great deal of struggling, reading the relevant papers dozens of times and hacking the source in ways I didn't understand. Needless to say, these were not foundations that I would feel confident building upon.

reactive-web

reactive-web was another inspiration. It is somewhat orthogonal to Scala.Rx, focusing more on event streams and integration with the Lift web framework, while Scala.Rx focuses purely on time-varying values.

Nevertheless, reactive-web comes with its own time-varying values (called Signals), which are manipulated using combinators similar to those in Scala.Rx (map, filter, flatMap, etc.). However, reactive-web does not provide an easy way to compose these Signals: the programmer has to rely entirely on map and flatMap, possibly using Scala's for-comprehensions.

I did not like the fact that you had to program in a monadic style (i.e. living in .map() and .flatMap() and for{} comprehensions all the time) in order to take advantage of the change propagation. This is particularly cumbersome in the case of [nested Rxs](Basic-Usage#nesting), where Scala.Rx's

// a b and c are Rxs
x = Rx{ a() + b().c() }

becomes

x = for {
  va <- a
  vb <- b
  vc <- vb.c
} yield (va + vc)

As you can see, using for-comprehensions as in reactive-web results in the code being significantly longer and much more obfuscated.

Knockout.js

Knockout.js does something similar for Javascript, along with some other extra goodies like DOM-binding. In fact, the design and implementation and developer experience of the automatic-dependency-tracking is virtually identical. This:

this.firstName = ko.observable('Bob');
this.lastName = ko.observable('Smith');
fullName = ko.computed(function() {
    return this.firstName() + " " + this.lastName();
}, this);

is semantically equivalent to the following Scala.Rx code:

val firstName = Var("Bob")
val lastName = Var("Smith")
fullName = Rx{ firstName() + " " + lastName() }

a ko.observable maps directly onto a Var, and a kocomputed maps directly onto an Rx. Apart from the longer variable names and the added verbosity of Javascript, the semantics are almost identical.

Apart from providing an equivalent of Var and Rx, Knockout.js focuses its efforts in a different direction. It lacks the majority of the useful combinators that Scala.Rx provides, but provides a great deal of other functionality, for example integration with the browser's DOM, that Scala.Rx lacks.

Others

This idea of change propagation, with time-varying values which notify any value which depends on them when something changes, part of the field of Functional Reactive Programming. This is a well studied field with a lot of research already done. Scala.Rx builds upon this research, and incorporates ideas from the following projects:

All of these projects are filled with good ideas. However, generally they are generally very much research projects: in exchange for the benefits of FRP, they require you to write your entire program in an obscure variant of an obscure language, with little hope inter-operating with existing, non-FRP code.

Writing production software in an unfamiliar paradigm such as FRP is already a significant risk. On top of that, writing production software in an unfamiliar language is an additional variable, and writing production software in an unfamiliar paradigm in an unfamiliar language with no inter-operability with existing code is downright reckless. Hence it is not surprising that these libraries have not seen significant usage. Scala.Rx aims to solve these problems by providing the benefits of FRP in a familiar language, with seamless interop between FRP and more traditional imperative or object-oriented code.

Version History

0.4.0

  • Dropped Scala 2.10 support
  • Added Bidirectional Vars and friends
  • Fixed multiple rx "glitches"

0.3.2

  • Bumped to Scala 2.12.0.

0.3.1

  • Fixed leak with observers (they also require an owning context).

  • Fixed type issue with flatMap

0.3.0

  • Introduced Owner and Data context. This is a completely different implementation of dependency and lifetime managment that allows for safe construction of runtime dynamic graphs.

  • More default combinators: fold and flatMap are now implemented by default.

Credits

Copyright (c) 2013, Li Haoyi (haoyi.sg at gmail.com)

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

scala.rx's People

Contributors

3tty0n avatar cornerman avatar fdietze avatar j-keck avatar jodersky avatar keyone avatar lihaoyi avatar saisse avatar scala-steward avatar stewsquared avatar sujeet avatar torgeadelin avatar vasily-kirichenko avatar vendethiel avatar voltir avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

scala.rx's Issues

How to handle `Ctx.Data`

Hi,
I ugraded to 0.3.1, and I am asked for providing with Ctx.Data. I did this in the right functions, but at the top of my calls (typically in the main()) I would need to pass an implementation of this Ctx.Data, and it does not seem to have one (whereas there is scala Ctx.Owner.safe() for Owner).

Thanks

move to scalajs 0.6-RC1

Please, move the library to scalaJS 0.6-RC1.

P.S. If you do not have time and are ok with substitution of utest.jsrunner.JsCrossBuild to CrossProject I can make a PR on that.

Confusion about the behavior of for-loop / flatMap

When first using for-syntax on Rx, it seemed to work the same as with combining them in Rx{}. But they behave differently when using a diamond-shape flow-graph:

          b
        /   \
source a     o trigger
        \   /
          c

Here is an example code:

println("initializing...")
val rxa = Var(2)
val rxb = rxa.map(_ + 1)
val rxc = rxa.map(_ + 1)
for (
  b <- rxb;
  c <- rxc
) {
  println(s"  for loop triggered: $b, $c")
}
Rx {
  val b = rxb()
  val c = rxc()
  println(s"  rx triggered: $b, $c")
}
println("initialization done")
println("set rxa to 12...")
rxa() = 12
println("set rxa to 22...")
rxa() = 22
println("set rxa to 32...")
rxa() = 32

And here the printed result:

initializing...
  for loop triggered: 3, 3
  rx triggered: 3, 3
initialization done
set rxa to 12...
   rx triggered: 13, 13
   for loop triggered: 13, 13
   for loop triggered: 3, 13
 set rxa to 22...
   rx triggered: 23, 23
   for loop triggered: 23, 23
   for loop triggered: 13, 23
   for loop triggered: 3, 23
 set rxa to 32...
   rx triggered: 33, 33
   for loop triggered: 33, 33
   for loop triggered: 13, 33
   for loop triggered: 3, 33
   for loop triggered: 23, 33

The for-loop prints for every individual change of a,b and itself, while the Rx-version only prints when the calculation wave has finished. Even more, the for-loop leaks Observers for b. This looks like a bug to me. Shouldn't the for-loop ideally behave exactly like the Rx-version?

Introductory example misses the point

The first example on the readme.md does not deliver the point what is so great about Scala.RX. We had the following dicussion on stackoverflow where the too simple example confused a user into thinking Scala.RX is about memoization.

Please consider to update example, so it shows asynchronous notifications. Of course you can use the example I have provided on SO:

import rx._
import rx.ops._

val a = Var(1); val b = Var(2)
val c: Rx[Int] = Rx{ a() + b() }

val o = c.foreach{value =>
  println(s"c has a new value: ${value}")
}

a()=4
b()=12
a()=35

which prints

c has a new value: 3 
c has a new value: 6 
c has a new value: 16 
c has a new value: 47

How to do reactive loops

Hi Li,

I'm considering to replace scala.react with scala.rx in my project WaveTuner.
Does scala.rx have an equivalent to Reactor.loop (as described on RHS of page 4 here) which I'm using here?

Thanks!
Martin

Scala.rx in Ammonite

When I used scala.Rx in Ammonite.
I did only import, then Var(1), then Var(2)

yury@yury-HP-ProBook-450-G3:~$ amm
Loading...
Welcome to the Ammonite Repl 0.8.1
(Scala 2.12.1 Java 1.8.0_111)
yury-yury@ import $ivy.com.lihaoyi::scalarx:0.3.2, rx._
import $ivy.$ , rx._
yury-yury@ Var(1)
res1: Var[Int] = Var@51(1)
yury-yury@ Var(2)
cmd2.sc:8: exception during macro expansion:
scala.ScalaReflectionException: object Internal encapsulates multiple overloaded alternatives and cannot be treated as a method. Consider invoking <offending symbol>.asTerm.alternatives and manually picking the required method
at scala.reflect.api.Symbols$SymbolApi.asMethod(Symbols.scala:228)
at scala.reflect.api.Symbols$SymbolApi.asMethod$(Symbols.scala:222)
at scala.reflect.internal.Symbols$SymbolContextApiImpl.asMethod(Symbols.scala:88)
at derive.Derive.$anonfun$getArgSyms$3(Derive.scala:391)
at derive.Derive.$anonfun$getArgSyms$3$adapted(Derive.scala:391)
at scala.collection.Iterator.find(Iterator.scala:981)
at scala.collection.Iterator.find$(Iterator.scala:978)
at scala.collection.AbstractIterator.find(Iterator.scala:1406)
at scala.collection.IterableLike.find(IterableLike.scala:78)
at scala.collection.IterableLike.find$(IterableLike.scala:77)
at scala.reflect.internal.Scopes$Scope.find(Scopes.scala:51)
at derive.Derive.$anonfun$getArgSyms$1(Derive.scala:391)
at scala.util.Either$RightProjection.flatMap(Either.scala:719)
at derive.Derive.getArgSyms(Derive.scala:387)
at derive.Derive.onFail$1(Derive.scala:155)
at derive.Derive.$anonfun$deriveType$5(Derive.scala:212)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:79)
at derive.Derive.rec$1(Derive.scala:185)
at derive.Derive.deriveType(Derive.scala:230)
at derive.Derive.derive(Derive.scala:60)
at pprint.Internals$LowerPriPPrint$.liftedTree1$1(PPrint.scala:408)
at pprint.Internals$LowerPriPPrint$.FinalRepr(PPrint.scala:408)

        .print(res2, res2, "res2", _root_.scala.None)
              ^

Compilation Failed

README corrections

Sorry - normally I'd submit a pull request for this kind of thing but unfortunately I don't have time.

Many people new to the project will start by working through the examples in the README so it's relatively important that it be mistake free. Here are some issues I found:

1. I had to change the following code:

intercept[ArithmeticException]{
  b()
}
assert(b.toTry.isInstanceOf[Failure])

To this:

intercept[ArithmeticException]{
  b.now // Otherwise it complains "No implicit Ctx.Data is available here!"
}
assert(b.toTry.isInstanceOf[Failure[_]])  // Otherwise it complains "class Failure takes type parameters "assert(b.toTry.isInstanceOf[Failure])

2. An unknown function inside is referenced a number of times, e.g. like so:

inside(c.toTry){case Success(0) => () }

I think occurances of inside should be replaced with assertMatch

assertMatch(c.toTry){case Success(0) => ()}

But then it's probably necessary to go on and explain that to use assertMatch you have to extend TestSuite from https://github.com/lihaoyi/utest

import utest._

object XyzTests extends TestSuite {
  val tests = this {
    "pqrTest" - {
      assertMatch(b.toTry){case Success(0)=>}
    }
  }
}

3. In the following code why is a wrapped in another Rx when creating b - isn't a already a perfectly acceptable Rx?

val a = Var(1)
val b = Rx{
    (Rx{ a() }, Rx{ math.random })
}

some helpers for reactive collections

I am using scala react for databiding in my scalajs app. It is easy to use in case of binding to particular values but it is not clear for me how to make collections reactive.

Consider I have something like:
val myCollection:Var[Map[String,MyCaseClass]]

and I have to update DOM when there are some changes there. In such case I have to make something like myCollection.reduce(updatehandler _) that would compare old and new versions and be used by Observer that will react on changes ( move elements in dom tree, delte from DOM, add to dom)

I think that this usecase is very common (not only for scalajs but in other cases when we want to look what changes in collections happened and react) to make some helpers for that.

In the same time I am not sure how to implement it right. The most stratiforward approach is to iterate whole collection and produce Inserted/Deleted/Moved events that would include elements that changed. But maybe there is a better way for this?

Wrong value propagated to `Rx.now` in `Obs`

I'm executing the following ammonite script:

import $ivy.`com.lihaoyi::scalarx:0.3.2`, rx._
object Example {
val v1 = Var(0)
val v2 = v1.map(identity)
val v3 = v1.map(identity).map(identity).map(identity)
def q(implicit trackDependency: Ctx.Data) = {
    if (v1() == 0) v2()
    else {
      val b = v3() != v2()
      if (b)
        103
      else
        17
    }
  }
val v = Rx { q }
val list = {
    var result = List.empty[Int]
    val obs = v.trigger { result = result :+ v.now }
    v1() = 1
    v1() = 2
    result
  }
}
println(Example.list)

The output of this code is nondeterministically sometimes List(0, 103, 17) and sometimes List(0, 17). (actually, it seems to be List(0, 17) after the first run and List(0, 103, 17) for already compiled runs).
Only List(0, 17) is the correct result AFAICT, because 103 should never happen. trigger is supposed to be called only after all Rx's have stabilized as presented in the README.
It should only be called when v1, v2, v3 have the same value, thus b should be always false at the time the Obs is reevaluated. Hence, we should be in the 17 case and v.now should be 17.

some helpers (or docs) to tweak propogation

I wonder is there a way to tweak how propagation works?
In my case I want to avoid propagation if the same values are pushed to creatives (for instance fooVar equals 1 and want to avoid propogation if I do fooVar()=1) globably (without a need to say something like val bar = fooVar.filter(v=>v!=fooVar.now) for each reactive variable

documentations for macro Operators

It is hard to understand how all those macroses in Operators work, it would be nice to have an example in Documentation explaining one of them and giving recommendations how to write your own OPerators

0.3 Roadmap?

I noticed that you had said on other issues that you wanted to rewrite, and then saw that you made what seems to be a lot of progress on the 0.3.0 branch. For sake of others like me who may wonder about status and what to expect, could you comment on goals of 0.3 and the scope of changes you anticipate? Are there any problems that stalled you, other than time?

Thanks!

Observe multiple Rx's

Let's say we have three Rxs and we want to perform side-effects when any of them changes:

val va = Var(0)
val vb = Var(0)
val vc = Var(0)
for { a <- va ; b <- vb ; c <- vc }
  effectful(a, b, c)

This obviously doesn't work since this creates a new Obs every time va or vb changes.

Currently I'm resorting to creating a Rx that bundles all Rxs into a tuple, and then create an Obs from it:

for { (a, b, c) <- Rx { (va(), vb(), vc()) } }
  effectful(a, b, c)

While this works, it seems to me that it is a pretty inelegant solution. Is there any possibility to provide a combinator or a better abstraction for this pattern? I believe this is a very common pattern and the library should provide an easy way to solve that.

Thanks.

How to handle `Ctx.Owner` in classes

I try to avoid adding (implicit ctx: Ctx.Owner) to each class with following approach:

class Test {
    implicit val ctx: Ctx.Owner = Ctx.Owner.safe()

    var count = 0
    val a = Var(1); val b = Var(2)
    def mkRx(i: Int)(implicit ctx: Ctx.Owner) = Rx { count += 1; i + b() }
    val c = Rx {
      val newRx = mkRx(a())
      newRx()
    }
    println(c.now, count)
    a() = 4
    println(c.now, count)
    b() = 3
    println(c.now, count) //(7,5) -- 5??

    (0 to 100).foreach { i => a() = i }
    println(c.now, count)
    b() = 4
    println(c.now, count) //(104,211) -- 211!!!
  }

This seams to work but I don't know if this is really correct or if there is a better way to do this.

Surprising behavior when `Var` value does not change

I would expect that updating a Var to a value equal to its previous value won't trigger Obs depending on this Var. However:

val v = Var(10)
val o = v.foreach(println)  // prints 10
v() = 11  // prints 11
v() = 11  // again prints 11!
v() = 11  // and again

When Var is pulled through an Rx it works correctly:

val v = Var(10)
val w = Rx { v() }
val o = w.foreach(println)  // prints 10
v() = 11  // prints 11
v() = 11  // nothing, just as expected
v() = 11  // again, nothing

Change Var[T] covariant

Current implementation of Var[T] looks more natural to be covariant, and it is needed for my use case.

  • Is there any restriction to prevent this change?

Var Groups / Array Formulars

Just an idea.

In Excel there is a concept of array formulas. This would be an interesting concept to have in Scala Rx. These can be treated as vectors / matrices / arrays / grouping where if you do a operation on the grouping this will be applied to all its members. Also allow vars could be put into multiple grouping and them to have the notion of dimensionality with the view of implementing linear algebraic operations on them. Excel has some basic matrix operations which might give an idea.

Make filter return option[A]

Instead of making b return 5 in this sample code, why not make it return None?

val a = Var(5)
val b = a filter (_ > 5)

Obs are not linked by strong references to Var and are therefore GCed

Obs set on Vars are being garabage colected while the observed Var isn't. Is it normal ? I would think that an observable set on a Var should have the same lifecycle as the observed Var.

For example, this code has different behaviour wether we call System.gc or not:

object Test extends App {

  val l: Seq[Var[Int]] =
    (0 until 10).map { v => Var(v) }


  l.foreach {
     v => Obs(v) { println("observed " + v()) }
  }

  System.gc() // changes the code behaviour

  l.head() = 9

}

Is the user suposed to keep strong ref to the Obs he defines ?

Missing code in Readme

In the section where the Timer is explained, the text below the example references a for-loop that does not exist (line 666 I believe)

Proposal: Safe Consistency Mappings

I'd like to have some feedback for an extension to scala.rx:
PR: #84

The motivation goes like this:

I'm using reactive programming for data consistency to avoid manual bookkeeping of changes.

Example:

val list = Var(List(17,4,23))
val selected = Var[Option[Int]](17) // should only contain an existing element from list

In this case the data can become inconsistent if the selected element is removed from list. It is also possible to set selected to an element which never existed.

As an initial solution, I add a reactive consistency layer, which makes sure that selected only contains elements from list:

val list = Var(List(17,4,23))
val rawSelected = Var[Option[Int]](17)
val selected = selected.filter(list contains)

Now I have an Rx where I only can read consistent data from.

Thinking further there are some issues with this approach:

  • I could still read inconsistent data from rawSelected
  • I need to read from selected and write to rawSelected

To overcome these issues, the idea is to wrap both Var and Rx in a datastructure RxVar which behaves like a Var. The difference is that reading is mapped to the consistent Rx, while writing is mapped to the inner Var. Transformations like map and filter only operate on the Rx-Part and result in another RxVar where I can still write to.

The Example above would be implemented like this:

val list = Var(List(17,4,23))
val selected = RxVar(None:Option[Int]).filter(list contains)

This gives some safety guarantees:

  • It is not possible to read inconsistent data
  • I have one RxVar which I can use to read and write

It was not possible to implement this extension without forking, since

  • Rx is a sealed trait
  • There is no trait for Var which I could use to let RxVar behave like a Var.

I'd like to hear what you think of this approach and if such a structure should become part of scala.rx.

Var to readOnly Rx method

Sometimes one part of the code wants read/write access, but it only wants other part of the code to get read/access with the ability to observe. It would be useful if one could transform a Var to an Rx in such a way that it cannot be downcast back to the Var.

Compile Error with custom extractors

Let's say I have a custom Extractor and a Var:

object Extractor {
  def unapply(x: Int): Option[Int] = Some(x + 1)
}

val a = Var(1)

Then the following code raises a comile error:

Rx {
  a() match {
    case Extractor(x) => x
  }
}

Error:

[error] unexpected UnApply Extractor.unapply(<unapply-selector>) <unapply> ((x @ _))

Reference to recommended conversion implicits for scalajs

Not having standard safe implicits is a barrier to using scalarx with scalajs. Below are links to several implementations people have come up with. It would be nice to have some standard implicits that we could refer to and that the community could improve over time.

Observations:

  • The Hands on Scalajs book has a recommended implicit that needs to be updated asap.
  • Some people's implicits use .toTry and get values safely, some do not
  • Some people implicits wrap the rx value in a container node, making it easy to remove all child nodes. However adding these intermediate nodes doesn't work well when using libraries like bootstrap. (example Voltir/framework)
  • I am not sure how to combine safety, being sure to remove/replace all children, and not creating a container node that messes up bootstrap css/js.

Implementations:

Uncaught TypeError: undefined is not a function Flow.scala:37

I updated a ScalaJS code to Scala 0.4.1

@JSExport
object HelloWorldExample {

  val model = Var("world")

and I got the following error:

Uncaught TypeError: undefined is not a function Flow.scala:37
ScalaJS.impls.rx_core_Emitter$class__$init$__Lrx_core_Emitter__V Flow.scala:37
ScalaJS.c.rx_core_Var.init___Lscala_Function0__T Core.scala:128
ScalaJS.c.rx_core_Var$.apply__Lscala_Function0__T__Lrx_core_Var Core.scala:109
ScalaJS.c.example_helloworld_HelloWorldExample$.init___ HelloWorldExample.scala:13
ScalaJS.modules.example_helloworld_HelloWorldExample HelloWorldExample.scala:10
(anonymous function)

turn of console debug messages

I wonder is there a way to turn of console messages from Observables in ReactiveJS? I am implementing rx-based scalajs data-binding right now and getting a lot of console messages is very irritating.
Maybe it will be better to provide logger for reactive variable system instead?

Issue with reduce

Maybe this doesn't work the way I'm expecting it to. It appears to me that the before is always an empty list. Am I doing something wrong?

Code:

val l = Var(ListString)
val before = l.reduce{(b,a) => println("before: "+b); b}
val after = l.reduce{(b,a) => println("after: "+a); a}

l() = l() :+ "9"
l() = l() :+ "2"

Results:

after: List(9)
before: List()

after: List(9,2)
before: List()

Leaking without compile error

Moving the example from the Readme into a class without adding an explicit owner in mkRx leads to an leak. and no compiler error helps you to avoid this.

The class:

class Test(implicit ctx: Ctx.Owner) {
    var count = 0
    val a = Var(1); val b = Var(2)
    def mkRx(i: Int) = Rx { count += 1; i + b() }
    val c = Rx {
      val newRx = mkRx(a())
      newRx()
    }
    println(c.now, count)
    a() = 4
    println(c.now, count)
    b() = 3
    println(c.now, count) //(7,5) -- 5??

    (0 to 100).foreach { i => a() = i }
    println(c.now, count)
    b() = 4
    println(c.now, count) //(104,211) -- 211!!!
  }

I don't know if there is a way to handle this or it's known issue, but I thought it should be mentioned.

add more combinators

I think lack of coordinators (in comparison to Reactive extensions) is one of the weakest points of ScalaRx and it is hard to extend them as most of the classes are marked as private or protected (in comparison to Reactive Extensions where there are a lot of methods I can use) there. For instance I want to make reactive that compares current and previous results and maps it to a new Type. Current Reducer does not allow mapping so I have to write my own, but most of the classes that current reducer uses are private so I cannot use them outside of Scala React =(

Implementation in Python

Hi,

Nice to see a complex concept condensed into a elegant form. I've been looking around for similar thing in Python. By any chance, you have came across similar implementation in Python. ?

add Var.update { oldValue => newValue }

It would be great if we could update a Var in one function in atomic style.
The current update does not deliver the current value, leading to such code:

val model = Var("world")
model.update {
  val v = model()
  v.toUpperCase
}

It would be easier to write:

val model = Var("world")
model.update { v =>
  v.toUpperCase
}

Or

val model = Var("world")
model.update(_.toUpperCase)

Some tests fail sporadically

Some timing tests fail sporadically. The one that fails roughly 50% of the time is

[info] 36/67   rx.GcTests.timersGetGced     Failure(utest.AssertionError: (eventually count == 3
[info] count: Int = 2,null))

I stumbled upon some other failures when running the tests repeatedly for the issue above. These occured less frequently:

[info] 34/67   rx.EventedTests.timerShouldEmitEvents        Failure(utest.AssertionError: (eventually eventually(t() == i)
[info] t: rx.ops.Timer[Unit] = rx.ops.Timer@11ee4e44
[info] i: Int = 3,null))

[info] 34/67   rx.EventedTests.timerShouldEmitEvents        Failure(utest.AssertionError: (eventually eventually(t() == i)
[info] t: rx.ops.Timer[Unit] = rx.ops.Timer@5000ebba
[info] i: Int = 4,null))

[info] 35/67   rx.EventedTests.debounce.simple      Failure(utest.AssertionError: (assert(b() == 2)
[info] b: rx.Rx[Int] = rx.ops.Debounce@20772569,null))

[info] 38/67   rx.EventedTests.debounce.longer      Failure(utest.AssertionError: (assert(count == 4)
[info] count: Int = 7,null))

[info] 29/67   rx.GcTests.obsGetGCed        Failure(utest.AssertionError: (eventually eventually{
[info] count: Int = 1
[info] a: rx.core.Var[Int] = rx.core.Var@64fc8c3d
[info] a: rx.core.Var[Int] = rx.core.Var@64fc8c3d
[info] oldCount: Int = 1
[info] count: Int = 2,null))

Changes not always propagated in a simple example

Rx#apply() scaladocs says : If this is called within the body of another [[Rx]], this will create a dependency between the two [[Rx]]s

And it works here :

      val a = Var(1)
      val rx = Rx { a() }
      rx.foreach(
        value => {
          println("change")
        }
        , skipInitial = true
      )
      a() = a() + 1
      // "change" printed 

but the same example doesn't work for ArrayBuffer :

      val arr = Var(new ArrayBuffer[String](20))
      val rx = Rx { arr() }
      rx.foreach(
        value => {
          println("change")
        }
        , skipInitial = true
      )

      arr() = arr() += "whatever"
     // doesn't print anything

Tried both JS and JVM ... Maybe I'm missing some fundamental fact, but I can't figure it out...

rx subscriptions for collections

In my libraries (like scala-js-binding ) I often nead to subscribe to Rx-ses of collections of items and also react to the changes inside of items themselves.
So, I really, really, need a nice memory-leack-safe way to subscribe both to collection of rx-es and to each Var[Item] separately. I have something like:

val items: Rx[List[Var[Item]]  = Rx{/*some code*/}
val views = Rx{ items() /*some other code*/

So, when new items are added to the collection , HTML view are created and binded to the item. Whenever an item is deleted, both view and its subscription to the item should also be deleted

Documentation and examples for "ownership context" and "data context"

Version 0.3.x introduces the concepts of "ownership context" and "data context". I have not found any substantial (real-world) examples for these concepts, like the ones demonstrated here in a presentation for version 0.2.5.

Any help on this will be much appreciated.

Also, in the code below, why does c require an owning context?

import rx._
val a = Var(1)
val b = Var(2)
val c = Rx { a() + b() }

Rx parents are not updated if value has not changed

The failing test case:

  val a = Var[Option[Int]](None)
  val b = Var[Option[Int]](None)
  val c = Rx { for (x <- a(); y <- b()) yield x + y }

  a() = Some(1)
  b() = Some(2)

  assert (c() == Some(3))

If my understanding is correct, the problem is that after the first assignment c doesn't change it's value, so Spinlock.ping ignores this update thus loosing a new parent b.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.