Giter VIP home page Giter VIP logo

reaktive's Introduction

Maven Central Build Status License kotlinlang|reaktive

Kotlin multiplatform implementation of Reactive Extensions.

Should you have any questions or feedback welcome to the Kotlin Slack channel: #reaktive

Setup

There are a number of modules published to Maven Central:

  • reaktive - the main Reaktive library (multiplatform)
  • reaktive-annotations - collection of annotations (mutiplatform)
  • reaktive-testing - testing utilities (multiplatform)
  • utils - some utilities like Clock, AtomicReference, Lock, etc. (multiplatform)
  • coroutines-interop - Kotlin coroutines interoperability helpers (multiplatform)
  • rxjava2-interop - RxJava v2 interoperability helpers (JVM and Android)
  • rxjava3-interop - RxJava v3 interoperability helpers (JVM and Android)

Configuring dependencies

kotlin {
    sourceSets {
        commonMain {
            dependencies {
                implementation 'com.badoo.reaktive:reaktive:<version>'
                implementation 'com.badoo.reaktive:reaktive-annotations:<version>'
                implementation 'com.badoo.reaktive:coroutines-interop:<version>' // For interop with coroutines
                implementation 'com.badoo.reaktive:rxjava2-interop:<version>' // For interop with RxJava v2
                implementation 'com.badoo.reaktive:rxjava3-interop:<version>' // For interop with RxJava v3
            }
        }

        commonTest {
            dependencies {
                implementation 'com.badoo.reaktive:reaktive-testing:<version>'
            }
        }
    }
}

Features:

  • Multiplatform: JVM, Android, iOS, macOS, watchOS, tvOS, JavaScript, Linux X64
  • Schedulers support:
    • computationScheduler - fixed thread pool equal to a number of cores
    • ioScheduler - unbound thread pool with caching policy
    • newThreadScheduler - creates a new thread for each unit of work
    • singleScheduler - executes tasks on a single shared background thread
    • trampolineScheduler - queues tasks and executes them on one of the participating threads
    • mainScheduler - executes tasks on main thread
  • True multithreading for Kotlin/Native (since v2.0 only the new memory model is supported)
  • Supported sources: Observable, Maybe, Single, Completable
  • Subjects: PublishSubject, BehaviorSubject, ReplaySubject, UnicastSubject
  • Interoperability with Kotlin Coroutines
    • Convert suspend functions to/from Single, Maybe and Completable
    • Convert Flow to/from Observable
    • Convert CoroutineContext to Scheduler
    • Convert Scheduler to CoroutineDispatcher
  • Interoperability with RxJava2 and RxJava3
    • Conversion of sources and schedulers between Reaktive and RxJava

Reaktive and Kotlin/Native

Since version 2.x, Reaktive only works with the new memory model.

Reaktive 1.x and the old (strict) memory model

The old (strict) Kotlin Native memory model and concurrency are very special. In general shared mutable state between threads is not allowed. Since Reaktive supports multithreading in Kotlin Native, please read the following documents before using it:

Object detachment is relatively difficult to achieve and is very error-prone when the objects are created from outside and are not fully managed by the library. This is why Reaktive prefers frozen state. Here are some hints:

  • Any callback (and any captured objects) submitted to a Scheduler will be frozen
  • subscribeOn freezes both its upstream source and downstream observer, all the Disposables (upstream's and downstream's) are frozen as well, all the values (including errors) are not frozen by the operator
  • observeOn freezes only its downstream observer and all the values (including errors) passed through it, plus all the Disposables, upstream source is not frozen by the operator
  • Other operators that use scheduler (like debounce, timer, delay, etc.) behave same as observeOn in most of the cases

Thread local tricks to avoid freezing

Sometimes freezing is not acceptable, e.g. we might want to load some data in background and then update the UI. Obviously UI can not be frozen. With Reaktive it is possible to achieve such a behaviour in two ways:

Use threadLocal operator:

val values = mutableListOf<Any>()
var isFinished = false

observable<Any> { emitter ->
    // Background job
}
    .subscribeOn(ioScheduler)
    .observeOn(mainScheduler)
    .threadLocal()
    .doOnBeforeNext { values += it } // Callback is not frozen, we can update the mutable list
    .doOnBeforeFinally { isFinished = true } // Callback is not frozen, we can change the flag
    .subscribe()

Set isThreadLocal flag to true in subscribe operator:

val values = mutableListOf<Any>()
var isComplete = false

observable<Any> { emitter ->
    // Background job
}
    .subscribeOn(ioScheduler)
    .observeOn(mainScheduler)
    .subscribe(
        isThreadLocal = true,
        onNext = { values += it }, // Callback is not frozen, we can update the mutable list
        onComplete = { isComplete = true } // Callback is not frozen, we can change the flag
    )

In both cases subscription (subscribe call) must be performed on the Main thread.

Coroutines interop

This functionality is provided by the coroutines-interop module. Please mind some known problems with multi-threaded coroutines on Kotlin/Native.

Examples

val flow: Flow<Int> = observableOf(1, 2, 3).asFlow()
val observable: Observable<Int> = flowOf(1, 2, 3).asObservable()
fun doSomething() {
    singleFromCoroutine { getSomething() }
        .subscribe { println(it) }
}

suspend fun getSomething(): String {
    delay(1.seconds)
    return "something"
}
val defaultScheduler = Dispatchers.Default.asScheduler()
val computationDispatcher = computationScheduler.asCoroutineDispatcher()

Subscription management with DisposableScope

Reaktive provides an easy way to manage subscriptions: DisposableScope.

Take a look at the following examples:

val scope =
    disposableScope {
        observable.subscribeScoped(...) // Subscription will be disposed when the scope is disposed

        doOnDispose {
            // Will be called when the scope is disposed
        }

        someDisposable.scope() // `someDisposable` will be disposed when the scope is disposed
    }

// At some point later
scope.dispose()
class MyPresenter(
    private val view: MyView,
    private val longRunningAction: Completable
) : DisposableScope by DisposableScope() {

    init {
        doOnDispose {
            // Will be called when the presenter is disposed
        }
    }

    fun load() {
        view.showProgressBar()

        // Subscription will be disposed when the presenter is disposed
        longRunningAction.subscribeScoped(onComplete = view::hideProgressBar)
    }
}

class MyActivity : AppCompatActivity(), DisposableScope by DisposableScope() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        MyPresenter(...).scope()
    }

    override fun onDestroy() {
        dispose()

        super.onDestroy()
    }
}

Reaktive and Swift interoperability

Please see the corresponding documentation page: Reaktive and Swift interoperability.

Plugins

Reaktive provides Plugin API, something similar to RxJava plugins. The Plugin API provides a way to decorate Reaktive sources. A plugin should implement the ReaktivePlugin interface, and can be registered using the registerReaktivePlugin function and unregistered using the unregisterReaktivePlugin function.

object MyPlugin : ReaktivePlugin {
    override fun <T> onAssembleObservable(observable: Observable<T>): Observable<T> =
        object : Observable<T> {
            private val traceException = TraceException()

            override fun subscribe(observer: ObservableObserver<T>) {
                observable.subscribe(
                    object : ObservableObserver<T> by observer {
                        override fun onError(error: Throwable) {
                            observer.onError(error, traceException)
                        }
                    }
                )
            }
        }

    override fun <T> onAssembleSingle(single: Single<T>): Single<T> =
        TODO("Similar to onAssembleSingle")

    override fun <T> onAssembleMaybe(maybe: Maybe<T>): Maybe<T> = 
        TODO("Similar to onAssembleSingle")

    override fun onAssembleCompletable(completable: Completable): Completable =
        TODO("Similar to onAssembleSingle")

    private fun ErrorCallback.onError(error: Throwable, traceException: TraceException) {
        if (error.suppressedExceptions.lastOrNull() !is TraceException) {
            error.addSuppressed(traceException)
        }
        onError(error)
    }

    private class TraceException : Exception()
}

Samples:

reaktive's People

Contributors

alexeykorshun avatar amihusb avatar arkivanov avatar bharathmg avatar bjsvedin avatar cherryperry avatar colriot avatar dmitryustimov avatar emartynov avatar euv avatar ilyagulya avatar lamartio avatar lukaville avatar mayokunadeniyi avatar mike-n-jordan avatar minaeweida avatar pfaffenrodt avatar pietrocaselani avatar saket avatar strangepan avatar ygnys avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

reaktive's Issues

Exceptions in subscribe callbacks are not handled

Currently all exceptions thrown by subscribe callbacks (like onSubscribe, onNext, onComplete, etc.) are handled by neither onError nor uncaughtExceptionHandler. onError callback should be user for non-terminal events like onSubscribe or onNext with fallback to uncaughtExceptionHandler. And uncaughtExceptionHandler should be used directly for terminal events like onComplete, onSuccess or onError.

Gradle error: Unable to resolve dependency

Hello ๐Ÿ‘‹,

I am trying to import reaktive dependency using gradle for kotlin-multiplatform project with Android and iOS targets, but get an error on gradle sync:

ERROR: Unable to resolve dependency for ':common@debug/compileClasspath': Could not resolve com.badoo.reaktive:reaktive-android:1.0.0-beta1.

The project I'm adding this library is here:
https://github.com/OlexandrStepanov/Kotlin-Native-MVP-Demo/tree/reaktive-experiments

The files with this dependency added:

All changes in the project, which lead to this error in a single commit:
OlexandrStepanov/Kotlin-Native-MVP-Demo@a43e751

Please, suggest, what I'm doing wrong.
With regards.

ktor suspend fun

hi guys, this is more of a question rather than an issue, i'm trying this library to connect to API using ktor which depend on coroutines.

Let say i have http request

suspend fun getUser(): User {
return user
}

any suggestion how to convert into observable?

subject
.subscribeOn(ioScheduler)
.flatMapSingle { service.getUser() } -> here we're getting result from suspend fun
.observeOn(mainScheduler)

thanks guys

Optimise PriorityQueue

override fun offer(item: T) {
    list.add(item)
    list.sortWith(comparator) // TODO: Optimise later
}

FirstOrError doesn't work with single source

Sample test:

@Test
    fun firstOrErrorTest() {
        val observer = singleOf(2)
            .asObservable()
            .firstOrError()
            .test()
        assertEquals(2, observer.value)
    }

Works fine with observable.
The source of error is probably firstOrAction which emits error when onComplete is called.

Also, do you welcome prs?

Retrofit adapter

Where do you think it would be better to put the adapter for retrofit, in this repository or in theirs?

Why is this library a android project?

I think this library should be a kotlin native project.All of the kotlin project can use this.But now
it's a android library,why don't I use RxKotlin?

Add Observable.takeUntil operator

Hey folks, would it be possible to add a Observable#takeUntil() operator? I took a stab at implementing the operator myself by copying takeUntil from RxJava, but it doesn't seem to work. I am not very good at understanding the internals so I gave up:

fun <T, U> Observable<T>.takeUntil(other: Observable<U>): Observable<T> {
  return observableUnsafe { child ->
    val parent = TakeUntilObserver<T, U>(child)
    child.onSubscribe(parent)

    other.subscribe(parent.otherObserver)
    subscribeSafe(parent)
  }
}

private class TakeUntilObserver<T, U>(
  private val downstream: ObservableObserver<T>
) : ObservableObserver<T>, ValueCallback<T>, Disposable {

  internal val upstream = DisposableWrapper()

  internal val otherObserver = OtherObserver<U>(
      otherError = this::otherError,
      otherComplete = this::otherComplete
  )

  override val isDisposed: Boolean
    get() = upstream.isDisposed

  override fun dispose() {
    upstream.dispose()
    otherObserver.dispose()
  }

  override fun onSubscribe(disposable: Disposable) {
    upstream.set(disposable)
  }

  override fun onNext(value: T) {
    downstream.onNext(value)
  }

  override fun onError(error: Throwable) {
    otherObserver.dispose()
    downstream.onError(error)
  }

  override fun onComplete() {
    otherObserver.dispose()
    downstream.onComplete()
  }

  fun otherError(e: Throwable) {
    upstream.dispose()
    downstream.onError(e)
  }

  fun otherComplete() {
    upstream.dispose()
    downstream.onComplete()
  }

  private class OtherObserver<U>(
    private val otherError: (Throwable) -> Unit,
    private val otherComplete: () -> Unit
  ) : ObservableObserver<U>, Disposable {

    private val disposableWrapper = DisposableWrapper()

    override val isDisposed: Boolean
      get() = disposableWrapper.isDisposed

    override fun dispose() {
      disposableWrapper.dispose()
    }

    override fun onComplete() {
      otherComplete()
    }

    override fun onError(error: Throwable) {
      otherError(error)
    }

    override fun onNext(value: U) {
      disposableWrapper.dispose()
      otherComplete()
    }

    override fun onSubscribe(disposable: Disposable) {
      disposableWrapper.set(disposable)
    }
  }
}

The test:

@Test fun `disposes upstream when other stream produces a value`() {
  val upstream = TestObservable<Int>()
  val other = TestObservable<Unit>()

  upstream.takeUntil(other).test()
  assertTrue(upstream.hasSubscribers)

  other.onNext(Unit)  // Fails here!
  assertFalse(upstream.hasSubscribers)
}

iOS Sample app broken/missing imports

The ios-simple-app doesn't compile in Xcode, somehow ReaktiveSingleObserver as well as those for Observable and Maybe are not available in Swift/XCode. Locking at the generated framework, only ReaktiveObserver is available but it does not have a onSuccess/Error methods. This causes the example to either fail or sometimes crash with TypeInfo.cpp:41: runtime assert: Unknown open method .

I created multiple sample projects with the released 1.0.0-beta1 artifacts and with the generated framework directly built from the repository, both have this problem.

The required interfaces exist in the com.badoo.reaktive:reaktive-iossim:klib:1.0.0-beta1 artifact, they are just not exported to the framework.

The required KotlinThrowable for the onError callback is also not available.

Add IDEA files to repo

Check video before.

Main points:

  • Continuation indent: 4 as all others
  • No wildcard imports
  • Line length same as in detekt

InvalidMutabilityException in subscribe closure

Hello there ๐Ÿ‘‹,

I have faced with an issue, that when using this in onNext closure of subscribe method, it got frozen along with all properties subgraph, which raise InvalidMutabilityException exception on any mutation of that properties.
Looking at issue #72, I see that this is expected behaviour on K/N.
But at the same time, restriction to have non-mutating logic in onNext closure is too bold for me, and makes this framework usage senseless.
I guess there must be some workaround for this problem, just can't figure it out.

The sample project I'm playing with is here:
https://github.com/OlexandrStepanov/Kotlin-Native-MVP-Demo/tree/reaktive-experiments

The file with logic I'm talking about is SearchViewModel.kt, lines 45:48. Uncommenting reload(it) call cause next crash in iOS app on runtime:

Uncaught Kotlin exception: kotlin.native.concurrent.InvalidMutabilityException: mutation attempt of frozen com.akqa.kn.lib.LocationServiceImpl@24ed568

According to the logic in that class, I need to mutate this in onNext callback of subscribe.
Could you, please, suggest, how could I achieve this?

With regards.

Handle exceptions in doOn*** methods

Currently exceptions thrown in doOn*** methods are not handled at all.
Sample test:

    @Test
    fun testErrorIsPropagated() {
        val observer = singleOf(1)
            .doOnBeforeSuccess { throw Exception("Simulated error") }
            .test()
        assertTrue(observer.isError)
    }

RxJava in case of exceptions in such methods proceeds with onError, so it would be great if Reaktive could support that behaviour

Simple Test example ios/android

I got some stuff working on iOS/android but would like to test my multiplatform code somehow.
For example how would one go about testing the ios-app example:

fun calculate(): Single<List<Int>> =
    observableOf(0, 1, 2, 3, 4)
        .subscribeOn(computationScheduler)
        .doOnBeforeNext { println("Should be background thread: ${!isMainThread()}") }
        .map {
            // some off-thread computations
            sleep(1)
            it
        }
        .observeOn(mainScheduler)
        .doOnBeforeNext { println("Should be main thread: ${isMainThread()}") }
        .toList()

private fun isMainThread() = NSOperationQueue.mainQueue == NSOperationQueue.currentQueue

I played around with test scheduler but didn't get it to works, same with android.
I always get The Single did not succeed: com.badoo.reaktive.test.single.TestSingleObserver

I tried something like this:

val scheduler = TestScheduler()
val observable = calculate()
val sub = observable.subscribeOn(scheduler).test()
scheduler.timer.advanceBy(1000)
assertTrue(sub.isSuccess)
assertNotNull(sub.value)
assertTrue(sub.value.size == 5)

Kotlin 1.3.40

Plan

  • Split js to node and browser, remove check in uptime() function.
  • Use default jsTest instead of custom mocha run task.
  • Use FreezableAtomicReference where it is possible (need to investigate @arkivanov).
  • Enable -Xobjc-generics for iOS sample to check Swift generics support.
  • Use new test binaries configuration for native platform.

Read more

Shared Kotlin can't find Reaktive

Hi Arkadii and Dmitry,

Thanks for your hard work on this! I think it's going to be great.

Sorry if this question is due to my unfamiliarity with Gradle instead of a bug on your end. I'm trying to add com.badoo.reaktive:reaktive:1.0.0-beta1 as a dependency for the shared Kotlin layer for my multiplatform experiment here.

I've added it as a dependency here, and am trying to import Reaktive here but when I try to run it, it says Unresolved reference "com" which means my import com.badoo.reaktive.* is failing.

Do you know what I'm doing wrong here?

Thanks!

Native iOS crash

I have a forked version from a few days ago, so almost master. The following crashes, and in an unpleasant way:

class FailTests{
    init {
        ensureNeverFrozen()
    }

    fun loadData(){
        observableOf(listOf("arst"))
                .observeOn(mainScheduler)
                .subscribe {
                    showData(it)
                }
    }

    fun showData(a: Any){

    }
}

The lamba to subscribe captures the object instance of FailTests, but that has had ensureNeverFrozen() called on it, so it should crash. However, rather than report an unhandled exception, we get a low level C++ runtime crash.

/Users/teamcity/buildAgent/work/4d622a065c544371/runtime/src/main/cpp/Weak.cpp:35: runtime assert: Incorrect lock state

From the stack in xcode

#0	0x000000010e87d23e in __pthread_kill ()
#1	0x000000010e8d0c1c in pthread_kill ()
#2	0x000000010e64701d in abort ()
#3	0x000000010a3c52b9 in konan::abort() ()
#4	0x000000010a3c51d3 in RuntimeAssertFailed(char const*, char const*) ()
#5	0x000000010a3c87d8 in WeakReferenceCounterClear ()
#6	0x000000010a3c5d30 in FreeContainer(ContainerHeader*) ()
#7	0x000000010a3d8cdd in LeaveFrame ()
#8	0x000000010a163cf7 in kfun:com.badoo.reaktive.utils.atomicreference.AtomicReference.getAndSet(#GENERIC)#GENERIC at /Users/kgalligan/temp/Reaktive/reaktive/src/nativeCommonMain/kotlin/com/badoo/reaktive/utils/atomicreference/AtomicReference.kt:26
#9	0x000000010a372ff1 in kfun:com.badoo.reaktive.scheduler.MainScheduler.ExecutorImpl.Operation.invoke#internal at /Users/kgalligan/temp/Reaktive/reaktive/src/iosCommonMain/kotlin/com/badoo/reaktive/scheduler/MainScheduler.kt:67
#10	0x000000010a372e47 in kfun:com.badoo.reaktive.scheduler.MainScheduler.ExecutorImpl.Operation.$<bridge-UNN>invoke()#internal at /Users/kgalligan/temp/Reaktive/reaktive/src/iosCommonMain/kotlin/com/badoo/reaktive/scheduler/MainScheduler.kt:60

The assert is coming from kotlin native's runtime. It looks like something in interop, but that's a complex sequence and a very unexpected error. If you remove ensureNeverFrozen() or avoid freezing (https://github.com/touchlab/DroidconKotlin/blob/kpg/reaktive/sessionize/lib/src/commonMain/kotlin/co/touchlab/sessionize/reaktive/ReaktiveExtensions.kt#L65), you avoid the crash.

Detekt

Setup detekt for project?
Probably we also need to tweak to fix at least "No new line at the end of file".

Create "travisCheck" task

Current way of listing all tasks in .travis.yml is not convenient enough. It will be much better if travis will have only ./gradlew travisCheck call.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.