Giter VIP home page Giter VIP logo

rxcpp's People

Contributors

aargor avatar alexeymarkov avatar besser82 avatar briangru avatar daixtrose avatar diorcety avatar elelel avatar ericniebler avatar exctues avatar freezestudio avatar furuholm avatar gchudnov avatar georgis avatar guhwanbae avatar iam avatar ivan-cukic avatar kirkshoop avatar lebdron avatar mattpd avatar mattpodwysocki avatar nitrram avatar petoknm avatar ralphsteinhagen avatar ravirael avatar sblom avatar shivashankarp avatar thepont avatar tinkerbeast avatar valerykopylov avatar victimsnino avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rxcpp's Issues

Observing on main thread

In RxAndroid, there's .observeOn(AndroidSchedulers.mainThread()). How do you achieve that in RxCpp?

For example, how do I ensure that the call to printf below is executed on the main thread?

auto numbers = rxcpp::observable<>::range(1, 100).subscribe_on(rxcpp::observe_on_new_thread());
numbers
    .map([](int n) { return n * 2; })
    .map([](int n) { return n + 1; })
    .as_blocking()
    .subscribe([](int n)
    {
        printf("Number: %d\n", n);
    });

Default coordination on iterate

Is there a reason why 'iterate' operator has 'identity_current_thread' as a default coordination, while for example 'from' uses 'identity_immediate' as default? I happend to miss this and assumed immediate and got results I did not expect.
Thanks for a great library!
/Mattias

zip tests fail with gcc-4.9.2 on linux

I've been having some problems with zip and gcc-4.9, but couldn't run the tests due to the bug in libstdc++ with uncaught_exception.

I've build libstdc++ with the fix for exception_ptr:
https://gcc.gnu.org/ml/gcc-patches/2015-02/msg00027.html

Here is the test output:

Scenario: zip return/return
     Given: 2 hot observables of ints.
      When: each int is combined with the latest from the other source
      Then: the output contains combined ints
-------------------------------------------------------------------------------
/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.1.cpp:578
...............................................................................

/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.1.cpp:618: FAILED:
  REQUIRE( required == actual )
with expansion:
  { @220-on_next( 5), @240-on_completed() }
  ==
  { @220-on_next( 65534), @240-on_completed() }

-------------------------------------------------------------------------------
Scenario: zip left completes first
     Given: 2 hot observables of ints.
      When: each int is combined with the latest from the other source
      Then: the output contains combined ints
-------------------------------------------------------------------------------
/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.2.cpp:132
...............................................................................

/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.2.cpp:172: FAILED:
  REQUIRE( required == actual )
with expansion:
  { @215-on_next( 6), @225-on_completed() }
  ==
  { @215-on_next( 65534), @225-on_completed() }

-------------------------------------------------------------------------------
Scenario: zip right completes first
     Given: 2 hot observables of ints.
      When: each int is combined with the latest from the other source
      Then: the output contains combined ints
-------------------------------------------------------------------------------
/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.2.cpp:194
...............................................................................

/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.2.cpp:234: FAILED:
  REQUIRE( required == actual )
with expansion:
  { @215-on_next( 6), @225-on_completed() }
  ==
  { @215-on_next( 65534), @225-on_completed() }

-------------------------------------------------------------------------------
Scenario: zip typical N
     Given: N hot observables of ints.
      When: each int is combined with the latest from the other source
      Then: the output contains combined ints
-------------------------------------------------------------------------------
/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.2.cpp:377
...............................................................................

/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.2.cpp:420: FAILED:
  REQUIRE( required == actual )
with expansion:
  { @360-on_next( 136), @560-on_next( 392), @800-on_completed() }
  ==
  { @360-on_next( 524272), @560-on_next( 524272), @800-on_completed() }

-------------------------------------------------------------------------------
Scenario: zip interleaved with tail
     Given: 2 hot observables of ints.
      When: each int is combined with the latest from the other source
      Then: the output contains combined ints
-------------------------------------------------------------------------------
/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.2.cpp:437
...............................................................................

/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.2.cpp:482: FAILED:
  REQUIRE( required == actual )
with expansion:
  { @220-on_next( 5), @230-on_next( 9), @250-on_completed() }
  ==
  { @220-on_next( 65534), @230-on_next( 65534), @250-on_completed() }

-------------------------------------------------------------------------------
Scenario: zip consecutive
     Given: 2 hot observables of ints.
      When: each int is combined with the latest from the other source
      Then: the output contains combined ints
-------------------------------------------------------------------------------
/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.2.cpp:504
...............................................................................

/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.2.cpp:547: FAILED:
  REQUIRE( required == actual )
with expansion:
  { @235-on_next( 8), @240-on_next( 11), @250-on_completed() }
  ==
  { @235-on_next( 65534), @240-on_next( 65534), @250-on_completed() }

-------------------------------------------------------------------------------
Scenario: zip consecutive ends with error right
     Given: 2 hot observables of ints.
      When: each int is combined with the latest from the other source
      Then: the output contains combined ints followed by an error
-------------------------------------------------------------------------------
/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.2.cpp:634
...............................................................................

/home/ben/development/test/rxcpp/RxCpp/Rx/v2/test/operators/zip.2.cpp:679: FAILED:
  REQUIRE( required == actual )
with expansion:
  { @235-on_next( 8), @240-on_next( 11), @245-on_error(zip on_error from
  source) }
  ==
  { @235-on_next( 65534), @240-on_next( 65534), @245-on_error(zip on_error from
  source) }

===============================================================================
test cases: 204 | 197 passed | 7 failed
assertions: 657 | 650 passed | 7 failed`

I haven't really looked into it yet, but I can tell you that with clang-3.5 address sanitizer does not complain. Unfortunately, I cannot link the tests using clang-3.6 (there's a problem with __attribute__((weak)))

Code does not compile

The rx libraries are hardcoded to be unicode (L"foo") everywhere, and yet in some places you're using ambiguous notation that can be compiled under ANSI. This leads to errors and inability to compile. Would be much better if you either used the right macros (e.g., _T) or wrote everything in Unicode (e.g. CreateWindowExW).

And on the subject, why on earth are Windows involved in all of this? I was kind of hoping that this library would be usable on Linux. This now looks unlikely.

Test suite fails on ARM GCC

Hi, we are having some problems with RxCpp on ARM platform. RxCpp test suite is not passing, however on x86 and x86_64 platforms it runs just fine.

Target platform and toolchain details:
ARM Cortex-A8 VFPv3-D16
glibc 2.21
gcc-5.2.0
same errors were also generated on toolchain based on gcc-4.9.2 and uClibc (same hardware).

Test suite output:
http://hastebin.com/cugojuvexe.coffee

[Question] rxcpp protects on_next. Is this intentional?

Q1 : rxcpp protects on_next. Is this intentional?

...
  auto handler_next      = [ ] (evt_t i) {throw "aaa";};
  auto handler_error     = [ ] (auto  e) { console::error("on_error: "s + e); };
...
  auto observable   = rx::observable<>::create(on_subscribe);
  auto subscription = observable.subscribe(handler_next,
                                           handler_error,
                                           handler_completed);
...
output>
[11:47:53] [LOG]   ----- test start -----
[11:47:53] [ERROR] on_error: aaa
[11:47:53] [LOG]   on_dispose
[11:47:53] [LOG]   ----- test end    -----

But according to

https://github.com/Reactive-Extensions/RxJS/tree/master/doc/designguidelines#53-protect-calls-to-user-code-from-within-an-operator

Note: do not protect calls to subscribe, dispose, onNext, onError and onCompleted methods. 
These calls are on the edge of the monad. 
Calling the onError method from these places will lead to unexpected behavior.

Q2 : Does Rx Specification exist ? where?

async_subject

Here's my first pass at adding 'async_subject' and 'to_async(f)':

#192

Please let me know what you think...

interval(TimePoint when) ambiguity

Actual implementation of interval(TimePoint when) waits for the specified time point and then starts emitting integers infinitely:

    auto scheduler = rxcpp::identity_current_thread();
    auto start = scheduler.now();
    auto values = rxcpp::observable<>::interval(start, scheduler);
    auto s = values.
        map([&](int prime) { 
            return std::make_tuple(std::chrono::duration_cast<std::chrono::milliseconds>(scheduler.now() - start).count(), prime);
        });
    s.
        take(3).
        subscribe(
            rxcpp::util::apply_to(
                [](__int64 t, int v){printf("OnNext: %d. %ld ms after the first event\n", v, t);}),
            [](){printf("OnCompleted\n");});
OnNext: 1. 0 ms after the first event
OnNext: 2. 0 ms after the first event
OnNext: 3. 0 ms after the first event
OnCompleted

It happens because the default value of period is rxsc::scheduler::clock_type::duration::max(), and that causes integer overflow on the next occurence time calculating: *target += period;. So we schedule the next integer in deep past and emit it immediately.

Is this the intended behavior or an issue?

CReate a dynamic stream (Observable)

Hello,

I would like to implement a simple example of CEP using your library. My example is the following : I have two streams (Observables). When I press the 'a' key, an event/message is created in one stream and when I press 'g' an event is created in the other stream. Then I would like to join this two streams and do something.

However, I don't know how to create an 'infinite' observable with your library. I think that I have to use the method rxcpp::observable<>::create(). But, I have no idea about which parameter I should use.

I have already implemented this example in JAVA : the parameter is an "Action1".

Is it the same thing in C++? Do you have any idea, or example, to help me?

Thank you in advance for your help,

Best regards,

Is Ix supported?

I couldn't found a NuGet package with Ix.
I found more TODOs in linq.hpp:
// TODO: groupby(keyfn, eq)
// TODO: join...
// TODO: average
// TODO: concat
// TODO: default_if_empty
// TODO: distinct()
// TODO: distinct(cmp)
// TODO: except(second)
// TODO: except(second, eq)
// TODO: intersect(second)
// TODO: intersect(second, eq)
// TODO: order_by(sel)
// TODO: order_by(sel, less)
// TODO: order_by_descending(sel)
// TODO: order_by_descending(sel, less)
// TODO: sequence_equal(second)
// TODO: sequence_equal(second, eq)
// TODO: single / single_or_default
// TODO: skip_while(pred)
// TODO: sum
// TODO: take_while
// TODO: then_by / then_by_descending ?
// TODO: to_...
// TODO: union(second)
// TODO: union(eq)
// TODO: zip

Did you stop develop this very important project?
What is state of Ix part?

[Question] Does the return value of observable.subscribe() disposable ? and How?

Q1) Does "rx::observable<>::create(on_subscribe)" store return value of on_subscribe?
I want "return value of on_subscribe == on_dispose"
Q2) How can I manually dispose subscription and register on_dispose ?
...
auto subscription = observable.subscribe(handler_next, handler_error, handler_completed);
?? subscription.dispose() ??
...

memory leak when using repeat(void)

after running snippet below memory increase infinitely

#include <rxcpp/rx.hpp>
#include <iostream>

int main(int argc,char** argv)
{
    rxcpp::sources::range(1,10)
    .repeat()
    .subscribe([](auto i)
    {
        std::cout << i << std::endl;
    });
    return 0;
}

Memory leak in multicast_observer

I haven't looked at it in much detail, I suspect it's a shared_ptr cycle.

#include <rxcpp/rx.hpp>
int main()
{
   rxcpp::composite_subscription cs;
   rxcpp::subjects::detail::multicast_observer<int> mco(cs);
}
g++-5 -std=c++14 -fsanitize=address rx.cpp -I RxCpp/Rx/v2/src/ && ./a.out

=================================================================
==4642==ERROR: LeakSanitizer: detected memory leaks

Indirect leak of 128 byte(s) in 1 object(s) allocated from:
    #0 0x7f806e9ff8b2 in operator new(unsigned long) (/usr/lib/x86_64-linux-gnu/libasan.so.2+0x998b2)
    #1 0x408ef2 in __gnu_cxx::new_allocator<std::_Sp_counted_ptr_inplace<rxcpp::detail::composite_subscription_inner::composite_subscription_state, std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state>, (__gnu_cxx::_Lock_policy)2> >::allocate(unsigned long, void const*) (/home/ben/development/test/a.out+0x408ef2)
    #2 0x4084f6 in std::allocator_traits<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::detail::composite_subscription_inner::composite_subscription_state, std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state>, (__gnu_cxx::_Lock_policy)2> > >::allocate(std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::detail::composite_subscription_inner::composite_subscription_state, std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state>, (__gnu_cxx::_Lock_policy)2> >&, unsigned long) (/home/ben/development/test/a.out+0x4084f6)
    #3 0x407883 in std::__allocated_ptr<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::detail::composite_subscription_inner::composite_subscription_state, std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state>, (__gnu_cxx::_Lock_policy)2> > > std::__allocate_guarded<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::detail::composite_subscription_inner::composite_subscription_state, std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state>, (__gnu_cxx::_Lock_policy)2> > >(std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::detail::composite_subscription_inner::composite_subscription_state, std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state>, (__gnu_cxx::_Lock_policy)2> >&) (/home/ben/development/test/a.out+0x407883)
    #4 0x4071fd in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::__shared_count<rxcpp::detail::composite_subscription_inner::composite_subscription_state, std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state>>(std::_Sp_make_shared_tag, rxcpp::detail::composite_subscription_inner::composite_subscription_state*, std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state> const&) (/home/ben/development/test/a.out+0x4071fd)
    #5 0x406a9d in std::__shared_ptr<rxcpp::detail::composite_subscription_inner::composite_subscription_state, (__gnu_cxx::_Lock_policy)2>::__shared_ptr<std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state>>(std::_Sp_make_shared_tag, std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state> const&) (/home/ben/development/test/a.out+0x406a9d)
    #6 0x40619b in std::shared_ptr<rxcpp::detail::composite_subscription_inner::composite_subscription_state>::shared_ptr<std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state>>(std::_Sp_make_shared_tag, std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state> const&) (/home/ben/development/test/a.out+0x40619b)
    #7 0x404d87 in std::shared_ptr<rxcpp::detail::composite_subscription_inner::composite_subscription_state> std::allocate_shared<rxcpp::detail::composite_subscription_inner::composite_subscription_state, std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state>>(std::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state> const&) (/home/ben/development/test/a.out+0x404d87)
    #8 0x403b00 in std::shared_ptr<rxcpp::detail::composite_subscription_inner::composite_subscription_state> std::make_shared<rxcpp::detail::composite_subscription_inner::composite_subscription_state>() (/home/ben/development/test/a.out+0x403b00)
    #9 0x402ccc in rxcpp::detail::composite_subscription_inner::composite_subscription_inner() (/home/ben/development/test/a.out+0x402ccc)
    #10 0x402f73 in rxcpp::composite_subscription::composite_subscription() (/home/ben/development/test/a.out+0x402f73)
    #11 0x401999 in main (/home/ben/development/test/a.out+0x401999)
    #12 0x7f806e023a3f in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x20a3f)

Indirect leak of 128 byte(s) in 1 object(s) allocated from:
    #0 0x7f806e9ff8b2 in operator new(unsigned long) (/usr/lib/x86_64-linux-gnu/libasan.so.2+0x998b2)
    #1 0x40b060 in __gnu_cxx::new_allocator<std::_Sp_counted_ptr_inplace<rxcpp::subjects::detail::multicast_observer<int>::state_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type>, (__gnu_cxx::_Lock_policy)2> >::allocate(unsigned long, void const*) (/home/ben/development/test/a.out+0x40b060)
    #2 0x40ae54 in std::allocator_traits<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subjects::detail::multicast_observer<int>::state_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type>, (__gnu_cxx::_Lock_policy)2> > >::allocate(std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subjects::detail::multicast_observer<int>::state_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type>, (__gnu_cxx::_Lock_policy)2> >&, unsigned long) (/home/ben/development/test/a.out+0x40ae54)
    #3 0x40ab4f in std::__allocated_ptr<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subjects::detail::multicast_observer<int>::state_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type>, (__gnu_cxx::_Lock_policy)2> > > std::__allocate_guarded<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subjects::detail::multicast_observer<int>::state_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type>, (__gnu_cxx::_Lock_policy)2> > >(std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subjects::detail::multicast_observer<int>::state_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type>, (__gnu_cxx::_Lock_policy)2> >&) (/home/ben/development/test/a.out+0x40ab4f)
    #4 0x40a8ee in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::__shared_count<rxcpp::subjects::detail::multicast_observer<int>::state_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type>, rxcpp::composite_subscription&>(std::_Sp_make_shared_tag, rxcpp::subjects::detail::multicast_observer<int>::state_type*, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type> const&, rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x40a8ee)
    #5 0x40a750 in std::__shared_ptr<rxcpp::subjects::detail::multicast_observer<int>::state_type, (__gnu_cxx::_Lock_policy)2>::__shared_ptr<std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type>, rxcpp::composite_subscription&>(std::_Sp_make_shared_tag, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type> const&, rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x40a750)
    #6 0x40a65f in std::shared_ptr<rxcpp::subjects::detail::multicast_observer<int>::state_type>::shared_ptr<std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type>, rxcpp::composite_subscription&>(std::_Sp_make_shared_tag, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type> const&, rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x40a65f)
    #7 0x40a537 in std::shared_ptr<rxcpp::subjects::detail::multicast_observer<int>::state_type> std::allocate_shared<rxcpp::subjects::detail::multicast_observer<int>::state_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type>, rxcpp::composite_subscription&>(std::allocator<rxcpp::subjects::detail::multicast_observer<int>::state_type> const&, rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x40a537)
    #8 0x40a36b in std::shared_ptr<rxcpp::subjects::detail::multicast_observer<int>::state_type> std::make_shared<rxcpp::subjects::detail::multicast_observer<int>::state_type, rxcpp::composite_subscription&>(rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x40a36b)
    #9 0x40a0cc in rxcpp::subjects::detail::multicast_observer<int>::binder_type::binder_type(rxcpp::composite_subscription) (/home/ben/development/test/a.out+0x40a0cc)
    #10 0x409b69 in _ZN9__gnu_cxx13new_allocatorIN5rxcpp8subjects6detail18multicast_observerIiE11binder_typeEE9constructIS6_IRNS1_22composite_subscriptionEEEEvPT_DpOT0_ (/home/ben/development/test/a.out+0x409b69)
    #11 0x4096bd in _ZNSt16allocator_traitsISaIN5rxcpp8subjects6detail18multicast_observerIiE11binder_typeEEE12_S_constructIS5_IRNS0_22composite_subscriptionEEEENSt9enable_ifIXsrSt6__and_IINS7_18__construct_helperIT_IDpT0_EE4typeEEE5valueEvE4typeERS6_PSE_DpOSF_ (/home/ben/development/test/a.out+0x4096bd)
    #12 0x4092df in _ZNSt16allocator_traitsISaIN5rxcpp8subjects6detail18multicast_observerIiE11binder_typeEEE9constructIS5_IRNS0_22composite_subscriptionEEEEDTcl12_S_constructfp_fp0_spcl7forwardIT0_Efp1_EEERS6_PT_DpOSB_ (/home/ben/development/test/a.out+0x4092df)
    #13 0x408c48 in std::_Sp_counted_ptr_inplace<rxcpp::subjects::detail::multicast_observer<int>::binder_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, (__gnu_cxx::_Lock_policy)2>::_Sp_counted_ptr_inplace<rxcpp::composite_subscription&>(std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x408c48)
    #14 0x40801f in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::__shared_count<rxcpp::subjects::detail::multicast_observer<int>::binder_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, rxcpp::composite_subscription&>(std::_Sp_make_shared_tag, rxcpp::subjects::detail::multicast_observer<int>::binder_type*, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type> const&, rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x40801f)
    #15 0x407550 in std::__shared_ptr<rxcpp::subjects::detail::multicast_observer<int>::binder_type, (__gnu_cxx::_Lock_policy)2>::__shared_ptr<std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, rxcpp::composite_subscription&>(std::_Sp_make_shared_tag, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type> const&, rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x407550)
    #16 0x406bbf in std::shared_ptr<rxcpp::subjects::detail::multicast_observer<int>::binder_type>::shared_ptr<std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, rxcpp::composite_subscription&>(std::_Sp_make_shared_tag, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type> const&, rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x406bbf)
    #17 0x406383 in std::shared_ptr<rxcpp::subjects::detail::multicast_observer<int>::binder_type> std::allocate_shared<rxcpp::subjects::detail::multicast_observer<int>::binder_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, rxcpp::composite_subscription&>(std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type> const&, rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x406383)
    #18 0x404f95 in std::shared_ptr<rxcpp::subjects::detail::multicast_observer<int>::binder_type> std::make_shared<rxcpp::subjects::detail::multicast_observer<int>::binder_type, rxcpp::composite_subscription&>(rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x404f95)
    #19 0x404008 in rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription) (/home/ben/development/test/a.out+0x404008)
    #20 0x4019c8 in main (/home/ben/development/test/a.out+0x4019c8)
    #21 0x7f806e023a3f in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x20a3f)

Indirect leak of 96 byte(s) in 1 object(s) allocated from:
    #0 0x7f806e9ff8b2 in operator new(unsigned long) (/usr/lib/x86_64-linux-gnu/libasan.so.2+0x998b2)
    #1 0x409621 in __gnu_cxx::new_allocator<std::_Sp_counted_ptr_inplace<rxcpp::subjects::detail::multicast_observer<int>::binder_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, (__gnu_cxx::_Lock_policy)2> >::allocate(unsigned long, void const*) (/home/ben/development/test/a.out+0x409621)
    #2 0x40916a in std::allocator_traits<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subjects::detail::multicast_observer<int>::binder_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, (__gnu_cxx::_Lock_policy)2> > >::allocate(std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subjects::detail::multicast_observer<int>::binder_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, (__gnu_cxx::_Lock_policy)2> >&, unsigned long) (/home/ben/development/test/a.out+0x40916a)
    #3 0x408a3f in std::__allocated_ptr<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subjects::detail::multicast_observer<int>::binder_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, (__gnu_cxx::_Lock_policy)2> > > std::__allocate_guarded<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subjects::detail::multicast_observer<int>::binder_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, (__gnu_cxx::_Lock_policy)2> > >(std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subjects::detail::multicast_observer<int>::binder_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, (__gnu_cxx::_Lock_policy)2> >&) (/home/ben/development/test/a.out+0x408a3f)
    #4 0x407f92 in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::__shared_count<rxcpp::subjects::detail::multicast_observer<int>::binder_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, rxcpp::composite_subscription&>(std::_Sp_make_shared_tag, rxcpp::subjects::detail::multicast_observer<int>::binder_type*, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type> const&, rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x407f92)
    #5 0x407550 in std::__shared_ptr<rxcpp::subjects::detail::multicast_observer<int>::binder_type, (__gnu_cxx::_Lock_policy)2>::__shared_ptr<std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, rxcpp::composite_subscription&>(std::_Sp_make_shared_tag, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type> const&, rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x407550)
    #6 0x406bbf in std::shared_ptr<rxcpp::subjects::detail::multicast_observer<int>::binder_type>::shared_ptr<std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, rxcpp::composite_subscription&>(std::_Sp_make_shared_tag, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type> const&, rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x406bbf)
    #7 0x406383 in std::shared_ptr<rxcpp::subjects::detail::multicast_observer<int>::binder_type> std::allocate_shared<rxcpp::subjects::detail::multicast_observer<int>::binder_type, std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type>, rxcpp::composite_subscription&>(std::allocator<rxcpp::subjects::detail::multicast_observer<int>::binder_type> const&, rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x406383)
    #8 0x404f95 in std::shared_ptr<rxcpp::subjects::detail::multicast_observer<int>::binder_type> std::make_shared<rxcpp::subjects::detail::multicast_observer<int>::binder_type, rxcpp::composite_subscription&>(rxcpp::composite_subscription&) (/home/ben/development/test/a.out+0x404f95)
    #9 0x404008 in rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription) (/home/ben/development/test/a.out+0x404008)
    #10 0x4019c8 in main (/home/ben/development/test/a.out+0x4019c8)
    #11 0x7f806e023a3f in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x20a3f)

Indirect leak of 64 byte(s) in 1 object(s) allocated from:
    #0 0x7f806e9ff8b2 in operator new(unsigned long) (/usr/lib/x86_64-linux-gnu/libasan.so.2+0x998b2)
    #1 0x409532 in __gnu_cxx::new_allocator<std::_Sp_counted_ptr_inplace<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>, std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >, (__gnu_cxx::_Lock_policy)2> >::allocate(unsigned long, void const*) (/home/ben/development/test/a.out+0x409532)
    #2 0x408fbe in std::allocator_traits<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>, std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >, (__gnu_cxx::_Lock_policy)2> > >::allocate(std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>, std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >, (__gnu_cxx::_Lock_policy)2> >&, unsigned long) (/home/ben/development/test/a.out+0x408fbe)
    #3 0x408711 in std::__allocated_ptr<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>, std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >, (__gnu_cxx::_Lock_policy)2> > > std::__allocate_guarded<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>, std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >, (__gnu_cxx::_Lock_policy)2> > >(std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>, std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >, (__gnu_cxx::_Lock_policy)2> >&) (/home/ben/development/test/a.out+0x408711)
    #4 0x407c60 in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::__shared_count<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>, std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >, rxcpp::detail::composite_subscription_inner>(std::_Sp_make_shared_tag, rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>*, std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> > const&, rxcpp::detail::composite_subscription_inner&&) (/home/ben/development/test/a.out+0x407c60)
    #5 0x407456 in std::__shared_ptr<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>, (__gnu_cxx::_Lock_policy)2>::__shared_ptr<std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >, rxcpp::detail::composite_subscription_inner>(std::_Sp_make_shared_tag, std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> > const&, rxcpp::detail::composite_subscription_inner&&) (/home/ben/development/test/a.out+0x407456)
    #6 0x406b5f in std::shared_ptr<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >::shared_ptr<std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >, rxcpp::detail::composite_subscription_inner>(std::_Sp_make_shared_tag, std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> > const&, rxcpp::detail::composite_subscription_inner&&) (/home/ben/development/test/a.out+0x406b5f)
    #7 0x406225 in std::shared_ptr<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> > std::allocate_shared<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>, std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >, rxcpp::detail::composite_subscription_inner>(std::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> > const&, rxcpp::detail::composite_subscription_inner&&) (/home/ben/development/test/a.out+0x406225)
    #8 0x404ebe in std::shared_ptr<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> > std::make_shared<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>, rxcpp::detail::composite_subscription_inner>(rxcpp::detail::composite_subscription_inner&&) (/home/ben/development/test/a.out+0x404ebe)
    #9 0x403c82 in rxcpp::subscription::subscription<rxcpp::detail::composite_subscription_inner>(rxcpp::detail::composite_subscription_inner, std::enable_if<!rxcpp::is_subscription<rxcpp::detail::composite_subscription_inner>::value, void**>::type) (/home/ben/development/test/a.out+0x403c82)
    #10 0x402fa2 in rxcpp::composite_subscription::composite_subscription() (/home/ben/development/test/a.out+0x402fa2)
    #11 0x401999 in main (/home/ben/development/test/a.out+0x401999)
    #12 0x7f806e023a3f in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x20a3f)

Indirect leak of 64 byte(s) in 1 object(s) allocated from:
    #0 0x7f806e9ff8b2 in operator new(unsigned long) (/usr/lib/x86_64-linux-gnu/libasan.so.2+0x998b2)
    #1 0x40a414 in __gnu_cxx::new_allocator<std::_Sp_counted_ptr_inplace<rxcpp::subscription::subscription_state<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, std::allocator<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, (__gnu_cxx::_Lock_policy)2> >::allocate(unsigned long, void const*) (/home/ben/development/test/a.out+0x40a414)
    #2 0x40a1ae in std::allocator_traits<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subscription::subscription_state<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, std::allocator<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, (__gnu_cxx::_Lock_policy)2> > >::allocate(std::_Sp_counted_ptr_inplace<rxcpp::subscription::subscription_state<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, std::allocator<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, (__gnu_cxx::_Lock_policy)2>&, unsigned long) (/home/ben/development/test/a.out+0x40a1ae)
    #3 0x409c1b in std::__allocated_ptr<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subscription::subscription_state<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, std::allocator<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, (__gnu_cxx::_Lock_policy)2> > > std::__allocate_guarded<std::allocator<std::_Sp_counted_ptr_inplace<rxcpp::subscription::subscription_state<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, std::allocator<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, (__gnu_cxx::_Lock_policy)2> > >(std::__allocated_ptr&) (/home/ben/development/test/a.out+0x409c1b)
    #4 0x4097ee in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::__shared_count<rxcpp::subscription::subscription_state<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, std::allocator<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}>(std::_Sp_make_shared_tag, rxcpp::subscription::subscription_state<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >*, std::allocator<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> > const&, rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}&&) (/home/ben/development/test/a.out+0x4097ee)
    #5 0x4093ae in std::__shared_ptr<rxcpp::subscription::subscription_state<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, (__gnu_cxx::_Lock_policy)2>::__shared_ptr<std::allocator<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}>(std::_Sp_make_shared_tag, std::allocator<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> > const&, rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}&&) (/home/ben/development/test/a.out+0x4093ae)
    #6 0x408d59 in std::shared_ptr<rxcpp::subscription::subscription_state<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> > >::shared_ptr<std::allocator<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}>(std::_Sp_make_shared_tag, std::allocator<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> > const&, rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}&&) (/home/ben/development/test/a.out+0x408d59)
    #7 0x408219 in std::shared_ptr<rxcpp::subscription::subscription_state<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> > > std::allocate_shared<rxcpp::subscription::subscription_state<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, std::allocator<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}>(std::allocator<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> > const&, rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}&&) (/home/ben/development/test/a.out+0x408219)
    #8 0x407620 in std::shared_ptr<rxcpp::subscription::subscription_state<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> > > std::make_shared<rxcpp::subscription::subscription_state<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >, rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}>(rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}&&) (/home/ben/development/test/a.out+0x407620)
    #9 0x406ec2 in rxcpp::subscription::subscription<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >(rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}>, std::enable_if<!rxcpp::is_subscription<rxcpp::static_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}> >::value, void**>::type) (/home/ben/development/test/a.out+0x406ec2)
    #10 0x406533 in std::enable_if<rxcpp::detail::is_unsubscribe_function<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}>::value, rxcpp::subscription>::type rxcpp::make_subscription<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}>(rxcpp::detail::is_unsubscribe_function&&) (/home/ben/development/test/a.out+0x406533)
    #11 0x4051f9 in std::enable_if<rxcpp::detail::is_unsubscribe_function<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}>::value, std::weak_ptr<rxcpp::subscription::base_subscription_state> >::type rxcpp::composite_subscription::add<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}>(rxcpp::detail::is_unsubscribe_function) const (/home/ben/development/test/a.out+0x4051f9)
    #12 0x40406c in rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription) (/home/ben/development/test/a.out+0x40406c)
    #13 0x4019c8 in main (/home/ben/development/test/a.out+0x4019c8)
    #14 0x7f806e023a3f in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x20a3f)

Indirect leak of 48 byte(s) in 1 object(s) allocated from:
    #0 0x7f806e9ff8b2 in operator new(unsigned long) (/usr/lib/x86_64-linux-gnu/libasan.so.2+0x998b2)
    #1 0x408e71 in __gnu_cxx::new_allocator<std::_Rb_tree_node<rxcpp::subscription> >::allocate(unsigned long, void const*) (/home/ben/development/test/a.out+0x408e71)
    #2 0x40847c in std::allocator_traits<std::allocator<std::_Rb_tree_node<rxcpp::subscription> > >::allocate(std::allocator<std::_Rb_tree_node<rxcpp::subscription> >&, unsigned long) (/home/ben/development/test/a.out+0x40847c)
    #3 0x407730 in std::_Rb_tree<rxcpp::subscription, rxcpp::subscription, std::_Identity<rxcpp::subscription>, std::less<rxcpp::subscription>, std::allocator<rxcpp::subscription> >::_M_get_node() (/home/ben/development/test/a.out+0x407730)
    #4 0x40708d in std::_Rb_tree_node<rxcpp::subscription>* std::_Rb_tree<rxcpp::subscription, rxcpp::subscription, std::_Identity<rxcpp::subscription>, std::less<rxcpp::subscription>, std::allocator<rxcpp::subscription> >::_M_create_node<rxcpp::subscription const&>(rxcpp::subscription const&) (/home/ben/development/test/a.out+0x40708d)
    #5 0x4069d9 in std::_Rb_tree_node<rxcpp::subscription>* std::_Rb_tree<rxcpp::subscription, rxcpp::subscription, std::_Identity<rxcpp::subscription>, std::less<rxcpp::subscription>, std::allocator<rxcpp::subscription> >::_Alloc_node::operator()<rxcpp::subscription const&>(rxcpp::subscription const&) const (/home/ben/development/test/a.out+0x4069d9)
    #6 0x405c11 in std::_Rb_tree_iterator<rxcpp::subscription> std::_Rb_tree<rxcpp::subscription, rxcpp::subscription, std::_Identity<rxcpp::subscription>, std::less<rxcpp::subscription>, std::allocator<rxcpp::subscription> >::_M_insert_<rxcpp::subscription const&, std::_Rb_tree<rxcpp::subscription, rxcpp::subscription, std::_Identity<rxcpp::subscription>, std::less<rxcpp::subscription>, std::allocator<rxcpp::subscription> >::_Alloc_node>(std::_Rb_tree_node_base*, std::_Rb_tree_node_base*, rxcpp::subscription const&, std::_Rb_tree<rxcpp::subscription, rxcpp::subscription, std::_Identity<rxcpp::subscription>, std::less<rxcpp::subscription>, std::allocator<rxcpp::subscription> >::_Alloc_node&) (/home/ben/development/test/a.out+0x405c11)
    #7 0x404930 in std::pair<std::_Rb_tree_iterator<rxcpp::subscription>, bool> std::_Rb_tree<rxcpp::subscription, rxcpp::subscription, std::_Identity<rxcpp::subscription>, std::less<rxcpp::subscription>, std::allocator<rxcpp::subscription> >::_M_insert_unique<rxcpp::subscription const&>(rxcpp::subscription const&) (/home/ben/development/test/a.out+0x404930)
    #8 0x403871 in std::set<rxcpp::subscription, std::less<rxcpp::subscription>, std::allocator<rxcpp::subscription> >::insert(rxcpp::subscription const&) (/home/ben/development/test/a.out+0x403871)
    #9 0x40293c in rxcpp::detail::composite_subscription_inner::composite_subscription_state::add(rxcpp::subscription) (/home/ben/development/test/a.out+0x40293c)
    #10 0x402ea8 in rxcpp::detail::composite_subscription_inner::add(rxcpp::subscription) const (/home/ben/development/test/a.out+0x402ea8)
    #11 0x40310e in rxcpp::composite_subscription::add(rxcpp::subscription) const (/home/ben/development/test/a.out+0x40310e)
    #12 0x405210 in std::enable_if<rxcpp::detail::is_unsubscribe_function<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}>::value, std::weak_ptr<rxcpp::subscription::base_subscription_state> >::type rxcpp::composite_subscription::add<rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription)::{lambda()#1}>(rxcpp::detail::is_unsubscribe_function) const (/home/ben/development/test/a.out+0x405210)
    #13 0x40406c in rxcpp::subjects::detail::multicast_observer<int>::multicast_observer(rxcpp::composite_subscription) (/home/ben/development/test/a.out+0x40406c)
    #14 0x4019c8 in main (/home/ben/development/test/a.out+0x4019c8)
    #15 0x7f806e023a3f in __libc_start_main (/lib/x86_64-linux-gnu/libc.so.6+0x20a3f)

SUMMARY: AddressSanitizer: 528 byte(s) leaked in 6 allocation(s).

Memory leak in skip & take with make_hot_observable

This has a memory leak:

#include "rxcpp/rx.hpp"
namespace rx=rxcpp;
namespace rxu=rxcpp::util;
namespace rxsc=rxcpp::schedulers;

#include "rxcpp/rx-test.hpp"

int main()
{
   auto sc = rxsc::make_test();
   auto w = sc.create_worker();
   const rxsc::test::messages<int> on;

   auto xs = sc.make_hot_observable({
      on.next(240, 5),
      on.completed(250)
   });

   auto res = w.start(
         [xs]() {
            return xs
               .take(1);
//               .as_dynamic();
         }
   );
}

I can't work out why. It's fine with make_cold_observable and with buffer.
clang++-3.6 -std=c++14 -O0 -g -stdlib=libc++ -fno-omit-frame-pointer -fPIC -fsanitize=address -lc++abi -fsanitize=address -pie test_rx_leak.cpp -Irxcpp/Rx/v2/src

These are the leaks:

=================================================================
==22438==ERROR: LeakSanitizer: detected memory leaks

Indirect leak of 1152 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f9608848e09 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f9608848e09 in std::__1::allocator<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long> >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f9608848e09 in std::__1::allocator_traits<std::__1::allocator<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long> > >::allocate(std::__1::allocator<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long> >&, unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1439
    #4 0x7f9608848e09 in std::__1::__split_buffer<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long>, std::__1::allocator<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long> >&>::__split_buffer(unsigned long, unsigned long, std::__1::allocator<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long> >&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/__split_buffer:325
    #5 0x7f9608848c43 in void std::__1::vector<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long>, std::__1::allocator<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long> > >::__push_back_slow_path<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long> >(std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long>&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/vector:1579:49
    #6 0x7f96088482e8 in std::__1::vector<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long>, std::__1::allocator<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long> > >::push_back(std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long>&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/vector:1620:9
    #7 0x7f96088482e8 in std::__1::priority_queue<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long>, std::__1::vector<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long>, std::__1::allocator<std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long> > >, rxcpp::schedulers::detail::schedulable_queue<long>::compare_elem>::push(std::__1::pair<rxcpp::schedulers::detail::time_schedulable<long>, long>&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/queue:659
    #8 0x7f96088482e8 in rxcpp::schedulers::detail::schedulable_queue<long>::push(rxcpp::schedulers::detail::time_schedulable<long>&&) /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:870
    #9 0x7f9608847cc9 in rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:219:9
    #10 0x7f9608822aa1 in _ZNK5rxcpp10schedulers4test11test_worker17schedule_absoluteIZNKS2_5startIiZ4mainE3$_0EENS_10subscriberIT_NS_4test17testable_observerIS7_EEEET0_lllEUlRKNS0_11schedulableEE1_JEEENSt3__19enable_ifIXaaoosr6detail18is_action_functionIS7_EE5valuesr15is_subscriptionIS7_EE5valuentsr14is_schedulableIS7_EE5valueEvE4typeElOS7_DpOT0_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:426:13
    #11 0x7f96088223c6 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:464:13
    #12 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #13 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #14 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 184 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f96088401e4 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f96088401e4 in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::schedulers::detail::hot_observable<int>, std::__1::allocator<rxcpp::schedulers::detail::hot_observable<int> > > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f96088401e4 in std::__1::shared_ptr<rxcpp::schedulers::detail::hot_observable<int> > std::__1::shared_ptr<rxcpp::schedulers::detail::hot_observable<int> >::make_shared<std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&, rxcpp::schedulers::worker, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > > >(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&, rxcpp::schedulers::worker&&, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f960883fadb in _ZNSt3__111make_sharedIN5rxcpp10schedulers6detail14hot_observableIiEEJRNS_10shared_ptrINS3_9test_type15test_type_stateEEENS2_6workerENS_6vectorINS1_13notifications8recordedINS6_INSD_6detail17notification_baseIiEEEEEENS_9allocatorISJ_EEEEEEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS6_ISO_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f960883fadb in rxcpp::test::testable_observable<int> rxcpp::schedulers::detail::test_type::make_hot_observable<int>(std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:336
    #6 0x7f9608825b2e in _ZNK5rxcpp10schedulers4test19make_hot_observableINS_13notifications8recordedINSt3__110shared_ptrINS3_6detail17notification_baseIiEEEEEEEEDTclptptdtdefpT6tester19make_hot_observablecvNS5_6vectorIT_NS5_9allocatorISD_EEEE_EEESt16initializer_listISD_E /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:551:21
    #7 0x7f9608821d8c in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:14:14
    #8 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 136 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f9608830431 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f9608830431 in std::__1::allocator<std::__1::__shared_ptr_emplace<void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::state_type, std::__1::allocator<void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::state_type> > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f9608830431 in std::__1::shared_ptr<void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::state_type> std::__1::shared_ptr<void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::state_type>::make_shared<rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::values const&, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&>(rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::values const&, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f960882c0c9 in _ZNSt3__111make_sharedIZNK5rxcpp9operators6detail4takeIiNS1_10observableIiNS1_4test6detail11test_sourceIiEEEEiE12on_subscribeINS1_10subscriberIiNS6_17testable_observerIiEEEEEEvRKT_E10state_typeJRKNSB_6valuesERKSG_EEENS_9enable_ifIXntsr8is_arrayISH_EE5valueENS_10shared_ptrISH_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f960882c0c9 in void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const /home/ben/development/yy/pictet/rxcpp/operators/rx-take.hpp:66
    #6 0x7f960882bead in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda()#1}::operator()() const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:329:17
    #7 0x7f96088311be in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #8 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #9 0x7f9608832584 in rxcpp::schedulers::current_thread::current_worker::schedule(std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-currentthread.hpp:200:17
    #10 0x7f960882bcf8 in _ZNK5rxcpp10schedulers6worker8scheduleIZNKS_10observableIiNS_9operators6detail4takeIiNS3_IiNS_4test6detail11test_sourceIiEEEEiEEE16detail_subscribeINS_10subscriberIiNS7_17testable_observerIiEEEEEENS_22composite_subscriptionET_EUlRKNS0_11schedulableEE_JEEENSt3__19enable_ifIXaaoosr6detail18is_action_functionISK_EE5valuesr15is_subscriptionISK_EE5valuentsr14is_schedulableISK_EE5valueEvE4typeEOSK_DpOT0_ /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:743:5
    #11 0x7f960882b6bd in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:354:5
    #12 0x7f960882b4a4 in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >&>(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >&) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:472:16
    #13 0x7f960882426c in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:462:17
    #14 0x7f960882411b in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}>(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #15 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #16 0x7f960884a0d2 in rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:216:21
    #17 0x7f9608849f6e in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #18 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #19 0x7f96088266c9 in rxcpp::schedulers::detail::virtual_time_base<long, long>::start() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:83:21
    #20 0x7f96088223ee in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:468:13
    #21 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #22 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #23 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 128 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f960882eb35 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f960882eb35 in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::dynamic_observer<int>::specific_observer<rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> >, std::__1::allocator<{lambda()#1}> > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f960882eb35 in void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1} std::__1::shared_ptr<rxcpp::dynamic_observer<int>::specific_observer<rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > >::make_shared<void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}>(void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f960882e978 in _ZNSt3__111make_sharedIN5rxcpp16dynamic_observerIiE17specific_observerINS1_15static_observerIiZNKS1_9operators6detail4takeIiNS1_10observableIiNS1_4test6detail11test_sourceIiEEEEiE12on_subscribeINS1_10subscriberIiNSA_17testable_observerIiEEEEEEvRKT_EUliE_ZNKSG_ISK_EEvSN_EUlSt13exception_ptrE_ZNKSG_ISK_EEvSN_EUlvE_EEEEJSS_EEENS_9enable_ifIXntsr8is_arrayISL_EE5valueENS_10shared_ptrISL_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f960882e978 in std::__1::shared_ptr<rxcpp::dynamic_observer<int>::virtual_observer> rxcpp::dynamic_observer<int>::make_destination<rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> >(rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}>) /home/ben/development/yy/pictet/rxcpp/rx-observer.hpp:176
    #6 0x7f960882e853 in rxcpp::dynamic_observer<int>::dynamic_observer<rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> >(rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}>) /home/ben/development/yy/pictet/rxcpp/rx-observer.hpp:194:23
    #7 0x7f960882e67e in rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> >::as_dynamic() const /home/ben/development/yy/pictet/rxcpp/rx-observer.hpp:263:9
    #8 0x7f960882e4cd in rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > >::as_dynamic() const /home/ben/development/yy/pictet/rxcpp/rx-subscriber.hpp:157:44
    #9 0x7f960882dc68 in _ZNK5rxcpp4test6detail11test_sourceIiE12on_subscribeINS_10subscriberIiNS_8observerIiNS_15static_observerIiZNKS_9operators6detail4takeIiNS_10observableIiS3_EEiE12on_subscribeINS5_IiNS0_17testable_observerIiEEEEEEvRKT_EUliE_ZNKSE_ISH_EEvSK_EUlSt13exception_ptrE_ZNKSE_ISH_EEvSK_EUlvE_EEEEEEEENSt3__19enable_ifIXntsr3std7is_sameISI_NS5_IiNS6_IiNS_16dynamic_observerIiEEEEEEEE5valueEvE4typeESI_ /home/ben/development/yy/pictet/rxcpp/rx-test.hpp:45:26
    #10 0x7f960882da21 in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::test::detail::test_source<int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > > >(rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > >) const::{lambda()#1}::operator()() const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:329:17
    #11 0x7f960882ce29 in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::test::detail::test_source<int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > > >(rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > >) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:349:13
    #12 0x7f960882cb9d in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::test::detail::test_source<int> >::subscribe<rxcpp::composite_subscription&, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}>(rxcpp::composite_subscription&, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}&&, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}&&, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}&&) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:472:16
    #13 0x7f960882c2e4 in void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const /home/ben/development/yy/pictet/rxcpp/operators/rx-take.hpp:72:9
    #14 0x7f960882bead in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda()#1}::operator()() const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:329:17
    #15 0x7f96088311be in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #16 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #17 0x7f9608832584 in rxcpp::schedulers::current_thread::current_worker::schedule(std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-currentthread.hpp:200:17
    #18 0x7f960882bcf8 in _ZNK5rxcpp10schedulers6worker8scheduleIZNKS_10observableIiNS_9operators6detail4takeIiNS3_IiNS_4test6detail11test_sourceIiEEEEiEEE16detail_subscribeINS_10subscriberIiNS7_17testable_observerIiEEEEEENS_22composite_subscriptionET_EUlRKNS0_11schedulableEE_JEEENSt3__19enable_ifIXaaoosr6detail18is_action_functionISK_EE5valuesr15is_subscriptionISK_EE5valuentsr14is_schedulableISK_EE5valueEvE4typeEOSK_DpOT0_ /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:743:5
    #19 0x7f960882b6bd in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:354:5
    #20 0x7f960882b4a4 in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >&>(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >&) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:472:16
    #21 0x7f960882426c in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:462:17
    #22 0x7f960882411b in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}>(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #23 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #24 0x7f960884a0d2 in rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:216:21
    #25 0x7f9608849f6e in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #26 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #27 0x7f96088266c9 in rxcpp::schedulers::detail::virtual_time_base<long, long>::start() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:83:21
    #28 0x7f96088223ee in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:468:13
    #29 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #30 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #31 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 112 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f9608845ba7 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f9608845ba7 in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::detail::composite_subscription_inner::composite_subscription_state, std::__1::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state> > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f9608845ba7 in std::__1::shared_ptr<rxcpp::detail::composite_subscription_inner::composite_subscription_state> std::__1::shared_ptr<rxcpp::detail::composite_subscription_inner::composite_subscription_state>::make_shared<>() /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f9608844d78 in _ZNSt3__111make_sharedIN5rxcpp6detail28composite_subscription_inner28composite_subscription_stateEJEEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS_10shared_ptrIS6_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f9608844d78 in rxcpp::detail::composite_subscription_inner::composite_subscription_inner() /home/ben/development/yy/pictet/rxcpp/rx-subscription.hpp:295
    #6 0x7f9608825964 in rxcpp::composite_subscription::composite_subscription() /home/ben/development/yy/pictet/rxcpp/rx-subscription.hpp:376:5
    #7 0x7f960882c0d1 in void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const /home/ben/development/yy/pictet/rxcpp/operators/rx-take.hpp:68:32
    #8 0x7f960882bead in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda()#1}::operator()() const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:329:17
    #9 0x7f96088311be in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #10 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #11 0x7f9608832584 in rxcpp::schedulers::current_thread::current_worker::schedule(std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-currentthread.hpp:200:17
    #12 0x7f960882bcf8 in _ZNK5rxcpp10schedulers6worker8scheduleIZNKS_10observableIiNS_9operators6detail4takeIiNS3_IiNS_4test6detail11test_sourceIiEEEEiEEE16detail_subscribeINS_10subscriberIiNS7_17testable_observerIiEEEEEENS_22composite_subscriptionET_EUlRKNS0_11schedulableEE_JEEENSt3__19enable_ifIXaaoosr6detail18is_action_functionISK_EE5valuesr15is_subscriptionISK_EE5valuentsr14is_schedulableISK_EE5valueEvE4typeEOSK_DpOT0_ /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:743:5
    #13 0x7f960882b6bd in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:354:5
    #14 0x7f960882b4a4 in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >&>(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >&) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:472:16
    #15 0x7f960882426c in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:462:17
    #16 0x7f960882411b in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}>(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #17 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #18 0x7f960884a0d2 in rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:216:21
    #19 0x7f9608849f6e in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #20 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #21 0x7f96088266c9 in rxcpp::schedulers::detail::virtual_time_base<long, long>::start() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:83:21
    #22 0x7f96088223ee in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:468:13
    #23 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #24 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #25 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 112 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f9608845ba7 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f9608845ba7 in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::detail::composite_subscription_inner::composite_subscription_state, std::__1::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state> > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f9608845ba7 in std::__1::shared_ptr<rxcpp::detail::composite_subscription_inner::composite_subscription_state> std::__1::shared_ptr<rxcpp::detail::composite_subscription_inner::composite_subscription_state>::make_shared<>() /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f9608844d78 in _ZNSt3__111make_sharedIN5rxcpp6detail28composite_subscription_inner28composite_subscription_stateEJEEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS_10shared_ptrIS6_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f9608844d78 in rxcpp::detail::composite_subscription_inner::composite_subscription_inner() /home/ben/development/yy/pictet/rxcpp/rx-subscription.hpp:295
    #6 0x7f9608825964 in rxcpp::composite_subscription::composite_subscription() /home/ben/development/yy/pictet/rxcpp/rx-subscription.hpp:376:5
    #7 0x7f960883fab4 in rxcpp::test::testable_observable<int> rxcpp::schedulers::detail::test_type::make_hot_observable<int>(std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:336:52
    #8 0x7f9608825b2e in _ZNK5rxcpp10schedulers4test19make_hot_observableINS_13notifications8recordedINSt3__110shared_ptrINS3_6detail17notification_baseIiEEEEEEEEDTclptptdtdefpT6tester19make_hot_observablecvNS5_6vectorIT_NS5_9allocatorISD_EEEE_EEESt16initializer_listISD_E /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:551:21
    #9 0x7f9608821d8c in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:14:14
    #10 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 112 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f9608845ba7 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f9608845ba7 in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::detail::composite_subscription_inner::composite_subscription_state, std::__1::allocator<rxcpp::detail::composite_subscription_inner::composite_subscription_state> > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f9608845ba7 in std::__1::shared_ptr<rxcpp::detail::composite_subscription_inner::composite_subscription_state> std::__1::shared_ptr<rxcpp::detail::composite_subscription_inner::composite_subscription_state>::make_shared<>() /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f9608844d78 in _ZNSt3__111make_sharedIN5rxcpp6detail28composite_subscription_inner28composite_subscription_stateEJEEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS_10shared_ptrIS6_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f9608844d78 in rxcpp::detail::composite_subscription_inner::composite_subscription_inner() /home/ben/development/yy/pictet/rxcpp/rx-subscription.hpp:295
    #6 0x7f9608825964 in rxcpp::composite_subscription::composite_subscription() /home/ben/development/yy/pictet/rxcpp/rx-subscription.hpp:376:5
    #7 0x7f9608836a4b in _ZN5rxcpp15make_subscriberIiNS_4test17testable_observerIiEEEENSt3__19enable_ifIXsr11is_observerIT0_EE5valueENS_10subscriberIT_S6_EEE4typeERKS6_ /home/ben/development/yy/pictet/rxcpp/rx-subscriber.hpp:241:5
    #8 0x7f96088367f8 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:191:12
    #9 0x7f9608826227 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::make_subscriber<int>() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:522:20
    #10 0x7f96088221f3 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:456:55
    #11 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #12 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #13 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 104 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f9608847697 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f9608847697 in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::schedulers::detail::test_type::test_type_state, std::__1::allocator<rxcpp::schedulers::detail::test_type::test_type_state> > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f9608847697 in std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state> std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>::make_shared<>() /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f96088475e5 in _ZNSt3__111make_sharedIN5rxcpp10schedulers6detail9test_type15test_type_stateEJEEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS_10shared_ptrIS7_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f96088475e5 in rxcpp::schedulers::detail::test_type::test_type() /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:127
    #6 0x7f9608846fad in std::__1::__libcpp_compressed_pair_imp<std::__1::allocator<rxcpp::schedulers::detail::test_type>, rxcpp::schedulers::detail::test_type, 1u>::__libcpp_compressed_pair_imp(std::__1::allocator<rxcpp::schedulers::detail::test_type>) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:2047:40
    #7 0x7f9608846fad in std::__1::__compressed_pair<std::__1::allocator<rxcpp::schedulers::detail::test_type>, rxcpp::schedulers::detail::test_type>::__compressed_pair(std::__1::allocator<rxcpp::schedulers::detail::test_type>) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:2317
    #8 0x7f9608846fad in std::__1::__shared_ptr_emplace<rxcpp::schedulers::detail::test_type, std::__1::allocator<rxcpp::schedulers::detail::test_type> >::__shared_ptr_emplace(std::__1::allocator<rxcpp::schedulers::detail::test_type>) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:3699
    #9 0x7f9608846fad in std::__1::shared_ptr<rxcpp::schedulers::detail::test_type> std::__1::shared_ptr<rxcpp::schedulers::detail::test_type>::make_shared<>() /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4280
    #10 0x7f9608825720 in _ZNSt3__111make_sharedIN5rxcpp10schedulers6detail9test_typeEJEEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS_10shared_ptrIS6_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #11 0x7f9608825720 in rxcpp::schedulers::make_test() /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:574
    #12 0x7f9608821d0f in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:10:14
    #13 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 96 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f960883755f in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f960883755f in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::dynamic_observer<int>::specific_observer<rxcpp::observer<int, rxcpp::static_observer<int, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(int)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(std::exception_ptr)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda()#1}> > >, std::__1::allocator<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda()#1}> > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f960883755f in rxcpp::static_observer<int, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(int)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(std::exception_ptr)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda()#1}> std::__1::shared_ptr<rxcpp::dynamic_observer<int>::specific_observer<rxcpp::observer<int, rxcpp::static_observer<int, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(int)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(std::exception_ptr)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda()#1}> > > >::make_shared<{lambda()#1}>({lambda()#1}&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f9608837358 in _ZNSt3__111make_sharedIN5rxcpp16dynamic_observerIiE17specific_observerINS1_8observerIiNS1_15static_observerIiZNKS1_10schedulers6detail9test_type16test_type_worker15make_subscriberIiEENS1_10subscriberIT_NS1_4test17testable_observerISD_EEEEvEUliE_ZNKSB_IiEESH_vEUlSt13exception_ptrE_ZNKSB_IiEESH_vEUlvE_EEEEEEJSN_EEENS_9enable_ifIXntsr8is_arrayISD_EE5valueENS_10shared_ptrISD_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f9608837358 in std::__1::shared_ptr<rxcpp::dynamic_observer<int>::virtual_observer> rxcpp::dynamic_observer<int>::make_destination<rxcpp::observer<int, rxcpp::static_observer<int, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(int)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(std::exception_ptr)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda()#1}> > >(rxcpp::observer<int, rxcpp::static_observer<int, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(int)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(std::exception_ptr)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda()#1}> >) /home/ben/development/yy/pictet/rxcpp/rx-observer.hpp:176
    #6 0x7f9608837225 in rxcpp::dynamic_observer<int>::dynamic_observer<rxcpp::observer<int, rxcpp::static_observer<int, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(int)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(std::exception_ptr)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda()#1}> > >(rxcpp::observer<int, rxcpp::static_observer<int, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(int)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(std::exception_ptr)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda()#1}> >) /home/ben/development/yy/pictet/rxcpp/rx-observer.hpp:194:23
    #7 0x7f9608836c0c in _ZN5rxcpp21make_observer_dynamicIiZNKS_10schedulers6detail9test_type16test_type_worker15make_subscriberIiEENS_10subscriberIT_NS_4test17testable_observerIS7_EEEEvEUliE_ZNKS5_IiEESB_vEUlSt13exception_ptrE_ZNKS5_IiEESB_vEUlvE_EENSt3__19enable_ifIXaaaasr6detail13is_on_next_ofIS7_T0_EE5valuesr6detail11is_on_errorIT1_EE5valuesr6detail15is_on_completedIT2_EE5valueENS_8observerIS7_NS_16dynamic_observerIS7_EEEEE4typeEOSI_OSJ_OSK_ /home/ben/development/yy/pictet/rxcpp/rx-observer.hpp:380:5
    #8 0x7f96088367d6 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:191:68
    #9 0x7f9608826227 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::make_subscriber<int>() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:522:20
    #10 0x7f96088221f3 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:456:55
    #11 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #12 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #13 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 88 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f960883eed5 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f960883eed5 in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::schedulers::detail::mock_observer<int>, std::__1::allocator<rxcpp::schedulers::detail::mock_observer<int> > > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f960883eed5 in std::__1::shared_ptr<rxcpp::schedulers::detail::mock_observer<int> > std::__1::shared_ptr<rxcpp::schedulers::detail::mock_observer<int> >::make_shared<std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&>(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f96088365c4 in _ZNSt3__111make_sharedIN5rxcpp10schedulers6detail13mock_observerIiEEJRNS_10shared_ptrINS3_9test_type15test_type_stateEEEEEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS6_ISC_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f96088365c4 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:189
    #6 0x7f9608826227 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::make_subscriber<int>() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:522:20
    #7 0x7f96088221f3 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:456:55
    #8 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #9 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #10 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 72 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f9608845098 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f9608845098 in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>, std::__1::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> > > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f9608845098 in std::__1::shared_ptr<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> > std::__1::shared_ptr<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >::make_shared<rxcpp::detail::composite_subscription_inner>(rxcpp::detail::composite_subscription_inner&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f9608844ebd in _ZNSt3__111make_sharedIN5rxcpp12subscription18subscription_stateINS1_6detail28composite_subscription_innerEEEJS5_EEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS_10shared_ptrIS8_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f9608844ebd in _ZN5rxcpp12subscriptionC2INS_6detail28composite_subscription_innerEEET_NSt3__19enable_ifIXntsr15is_subscriptionIS4_EE5valueEPPvE4typeE /home/ben/development/yy/pictet/rxcpp/rx-subscription.hpp:130
    #6 0x7f9608825988 in rxcpp::composite_subscription::composite_subscription() /home/ben/development/yy/pictet/rxcpp/rx-subscription.hpp:376:5
    #7 0x7f960883fab4 in rxcpp::test::testable_observable<int> rxcpp::schedulers::detail::test_type::make_hot_observable<int>(std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:336:52
    #8 0x7f9608825b2e in _ZNK5rxcpp10schedulers4test19make_hot_observableINS_13notifications8recordedINSt3__110shared_ptrINS3_6detail17notification_baseIiEEEEEEEEDTclptptdtdefpT6tester19make_hot_observablecvNS5_6vectorIT_NS5_9allocatorISD_EEEE_EEESt16initializer_listISD_E /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:551:21
    #9 0x7f9608821d8c in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:14:14
    #10 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 72 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f9608845098 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f9608845098 in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>, std::__1::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> > > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f9608845098 in std::__1::shared_ptr<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> > std::__1::shared_ptr<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >::make_shared<rxcpp::detail::composite_subscription_inner>(rxcpp::detail::composite_subscription_inner&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f9608844ebd in _ZNSt3__111make_sharedIN5rxcpp12subscription18subscription_stateINS1_6detail28composite_subscription_innerEEEJS5_EEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS_10shared_ptrIS8_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f9608844ebd in _ZN5rxcpp12subscriptionC2INS_6detail28composite_subscription_innerEEET_NSt3__19enable_ifIXntsr15is_subscriptionIS4_EE5valueEPPvE4typeE /home/ben/development/yy/pictet/rxcpp/rx-subscription.hpp:130
    #6 0x7f9608825988 in rxcpp::composite_subscription::composite_subscription() /home/ben/development/yy/pictet/rxcpp/rx-subscription.hpp:376:5
    #7 0x7f9608836a4b in _ZN5rxcpp15make_subscriberIiNS_4test17testable_observerIiEEEENSt3__19enable_ifIXsr11is_observerIT0_EE5valueENS_10subscriberIT_S6_EEE4typeERKS6_ /home/ben/development/yy/pictet/rxcpp/rx-subscriber.hpp:241:5
    #8 0x7f96088367f8 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:191:12
    #9 0x7f9608826227 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::make_subscriber<int>() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:522:20
    #10 0x7f96088221f3 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:456:55
    #11 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #12 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #13 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 72 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f9608845098 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f9608845098 in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner>, std::__1::allocator<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> > > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f9608845098 in std::__1::shared_ptr<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> > std::__1::shared_ptr<rxcpp::subscription::subscription_state<rxcpp::detail::composite_subscription_inner> >::make_shared<rxcpp::detail::composite_subscription_inner>(rxcpp::detail::composite_subscription_inner&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f9608844ebd in _ZNSt3__111make_sharedIN5rxcpp12subscription18subscription_stateINS1_6detail28composite_subscription_innerEEEJS5_EEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS_10shared_ptrIS8_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f9608844ebd in _ZN5rxcpp12subscriptionC2INS_6detail28composite_subscription_innerEEET_NSt3__19enable_ifIXntsr15is_subscriptionIS4_EE5valueEPPvE4typeE /home/ben/development/yy/pictet/rxcpp/rx-subscription.hpp:130
    #6 0x7f9608825988 in rxcpp::composite_subscription::composite_subscription() /home/ben/development/yy/pictet/rxcpp/rx-subscription.hpp:376:5
    #7 0x7f960882c0d1 in void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const /home/ben/development/yy/pictet/rxcpp/operators/rx-take.hpp:68:32
    #8 0x7f960882bead in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda()#1}::operator()() const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:329:17
    #9 0x7f96088311be in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #10 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #11 0x7f9608832584 in rxcpp::schedulers::current_thread::current_worker::schedule(std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-currentthread.hpp:200:17
    #12 0x7f960882bcf8 in _ZNK5rxcpp10schedulers6worker8scheduleIZNKS_10observableIiNS_9operators6detail4takeIiNS3_IiNS_4test6detail11test_sourceIiEEEEiEEE16detail_subscribeINS_10subscriberIiNS7_17testable_observerIiEEEEEENS_22composite_subscriptionET_EUlRKNS0_11schedulableEE_JEEENSt3__19enable_ifIXaaoosr6detail18is_action_functionISK_EE5valuesr15is_subscriptionISK_EE5valuentsr14is_schedulableISK_EE5valueEvE4typeEOSK_DpOT0_ /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:743:5
    #13 0x7f960882b6bd in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:354:5
    #14 0x7f960882b4a4 in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >&>(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >&) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:472:16
    #15 0x7f960882426c in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:462:17
    #16 0x7f960882411b in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}>(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #17 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #18 0x7f960884a0d2 in rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:216:21
    #19 0x7f9608849f6e in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #20 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #21 0x7f96088266c9 in rxcpp::schedulers::detail::virtual_time_base<long, long>::start() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:83:21
    #22 0x7f96088223ee in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:468:13
    #23 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #24 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #25 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 64 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f9608846575 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f9608846575 in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::schedulers::detail::test_type::test_type_worker, std::__1::allocator<rxcpp::schedulers::detail::test_type::test_type_worker> > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f9608846575 in std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_worker> std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_worker>::make_shared<std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&>(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f9608847454 in _ZNSt3__111make_sharedIN5rxcpp10schedulers6detail9test_type16test_type_workerEJRNS_10shared_ptrINS4_15test_type_stateEEEEEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS6_ISB_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f9608847454 in rxcpp::schedulers::detail::test_type::create_worker(rxcpp::composite_subscription) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:136
    #6 0x7f960883fac0 in rxcpp::test::testable_observable<int> rxcpp::schedulers::detail::test_type::make_hot_observable<int>(std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:336:52
    #7 0x7f9608825b2e in _ZNK5rxcpp10schedulers4test19make_hot_observableINS_13notifications8recordedINSt3__110shared_ptrINS3_6detail17notification_baseIiEEEEEEEEDTclptptdtdefpT6tester19make_hot_observablecvNS5_6vectorIT_NS5_9allocatorISD_EEEE_EEESt16initializer_listISD_E /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:551:21
    #8 0x7f9608821d8c in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:14:14
    #9 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 56 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f9608842ef2 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f9608842ef2 in std::__1::allocator<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f9608842ef2 in std::__1::allocator_traits<std::__1::allocator<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > > > >::allocate(std::__1::allocator<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > > >&, unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1439
    #4 0x7f9608842ef2 in std::__1::__split_buffer<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > >, std::__1::allocator<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > > >&>::__split_buffer(unsigned long, unsigned long, std::__1::allocator<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > > >&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/__split_buffer:325
    #5 0x7f9608842d92 in void std::__1::vector<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > >, std::__1::allocator<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > > > >::__push_back_slow_path<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > > const&>(rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > > const&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/vector:1579:49
    #6 0x7f9608840a4b in std::__1::vector<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > >, std::__1::allocator<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > > > >::push_back(rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > > const&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/vector:1600:9
    #7 0x7f9608840a4b in rxcpp::schedulers::detail::hot_observable<int>::on_subscribe(rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > >) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:313
    #8 0x7f960882dc71 in _ZNK5rxcpp4test6detail11test_sourceIiE12on_subscribeINS_10subscriberIiNS_8observerIiNS_15static_observerIiZNKS_9operators6detail4takeIiNS_10observableIiS3_EEiE12on_subscribeINS5_IiNS0_17testable_observerIiEEEEEEvRKT_EUliE_ZNKSE_ISH_EEvSK_EUlSt13exception_ptrE_ZNKSE_ISH_EEvSK_EUlvE_EEEEEEEENSt3__19enable_ifIXntsr3std7is_sameISI_NS5_IiNS6_IiNS_16dynamic_observerIiEEEEEEEE5valueEvE4typeESI_ /home/ben/development/yy/pictet/rxcpp/rx-test.hpp:45:9
    #9 0x7f960882da21 in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::test::detail::test_source<int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > > >(rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > >) const::{lambda()#1}::operator()() const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:329:17
    #10 0x7f960882ce29 in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::test::detail::test_source<int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > > >(rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > >) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:349:13
    #11 0x7f960882cb9d in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::test::detail::test_source<int> >::subscribe<rxcpp::composite_subscription&, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}>(rxcpp::composite_subscription&, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}&&, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}&&, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}&&) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:472:16
    #12 0x7f960882c2e4 in void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const /home/ben/development/yy/pictet/rxcpp/operators/rx-take.hpp:72:9
    #13 0x7f960882bead in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda()#1}::operator()() const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:329:17
    #14 0x7f96088311be in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #15 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #16 0x7f9608832584 in rxcpp::schedulers::current_thread::current_worker::schedule(std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-currentthread.hpp:200:17
    #17 0x7f960882bcf8 in _ZNK5rxcpp10schedulers6worker8scheduleIZNKS_10observableIiNS_9operators6detail4takeIiNS3_IiNS_4test6detail11test_sourceIiEEEEiEEE16detail_subscribeINS_10subscriberIiNS7_17testable_observerIiEEEEEENS_22composite_subscriptionET_EUlRKNS0_11schedulableEE_JEEENSt3__19enable_ifIXaaoosr6detail18is_action_functionISK_EE5valuesr15is_subscriptionISK_EE5valuentsr14is_schedulableISK_EE5valueEvE4typeEOSK_DpOT0_ /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:743:5
    #18 0x7f960882b6bd in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:354:5
    #19 0x7f960882b4a4 in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >&>(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >&) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:472:16
    #20 0x7f960882426c in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:462:17
    #21 0x7f960882411b in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}>(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #22 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #23 0x7f960884a0d2 in rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:216:21
    #24 0x7f9608849f6e in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #25 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #26 0x7f96088266c9 in rxcpp::schedulers::detail::virtual_time_base<long, long>::start() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:83:21
    #27 0x7f96088223ee in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:468:13
    #28 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #29 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #30 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 56 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f960883d31a in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f960883d31a in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::notifications::notification<int>::on_next_notification, std::__1::allocator<rxcpp::notifications::notification<int>::on_next_notification> > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f960883d31a in std::__1::shared_ptr<rxcpp::notifications::notification<int>::on_next_notification> std::__1::shared_ptr<rxcpp::notifications::notification<int>::on_next_notification>::make_shared<int>(int&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f960883d153 in _ZNSt3__111make_sharedIN5rxcpp13notifications12notificationIiE20on_next_notificationEJiEEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS_10shared_ptrIS7_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f960883d153 in std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > rxcpp::notifications::notification<int>::on_next<int>(int) /home/ben/development/yy/pictet/rxcpp/rx-notification.hpp:211
    #6 0x7f960883cf19 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(int)#1}::operator()(int) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:196:62
    #7 0x7f96088379bc in rxcpp::dynamic_observer<int>::specific_observer<rxcpp::observer<int, rxcpp::static_observer<int, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(int)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda(std::exception_ptr)#1}, rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda()#1}> > >::on_next(int) const /home/ben/development/yy/pictet/rxcpp/rx-observer.hpp:161:13
    #8 0x7f960882f750 in void rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >::nextdetacher::operator()<int>(int) /home/ben/development/yy/pictet/rxcpp/rx-subscriber.hpp:47:13
    #9 0x7f960882f5d8 in void rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >::on_next<int&>(int&) const /home/ben/development/yy/pictet/rxcpp/rx-subscriber.hpp:168:9
    #10 0x7f960882f427 in void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}::operator()(int) const /home/ben/development/yy/pictet/rxcpp/operators/rx-take.hpp:82:25
    #11 0x7f960883dc90 in void rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > >::nextdetacher::operator()<int>(int) /home/ben/development/yy/pictet/rxcpp/rx-subscriber.hpp:47:13
    #12 0x7f960883db18 in void rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > >::on_next<int const&>(int const&) const /home/ben/development/yy/pictet/rxcpp/rx-subscriber.hpp:168:9
    #13 0x7f9608844944 in rxcpp::schedulers::detail::hot_observable<int>::hot_observable(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>, rxcpp::schedulers::worker, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >)::{lambda(rxcpp::schedulers::schedulable const&)#1}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:305:29
    #14 0x7f960884474e in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::schedulers::detail::hot_observable<int>::hot_observable(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>, rxcpp::schedulers::worker, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >)::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::schedulers::detail::hot_observable<int>::hot_observable(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>, rxcpp::schedulers::worker, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >)::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #15 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #16 0x7f960884a0d2 in rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:216:21
    #17 0x7f9608849f6e in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #18 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #19 0x7f96088266c9 in rxcpp::schedulers::detail::virtual_time_base<long, long>::start() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:83:21
    #20 0x7f96088223ee in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:468:13
    #21 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #22 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #23 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 56 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f960883d31a in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f960883d31a in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::notifications::notification<int>::on_next_notification, std::__1::allocator<rxcpp::notifications::notification<int>::on_next_notification> > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f960883d31a in std::__1::shared_ptr<rxcpp::notifications::notification<int>::on_next_notification> std::__1::shared_ptr<rxcpp::notifications::notification<int>::on_next_notification>::make_shared<int>(int&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f960883d153 in _ZNSt3__111make_sharedIN5rxcpp13notifications12notificationIiE20on_next_notificationEJiEEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS_10shared_ptrIS7_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f960883d153 in std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > rxcpp::notifications::notification<int>::on_next<int>(int) /home/ben/development/yy/pictet/rxcpp/rx-notification.hpp:211
    #6 0x7f9608825c41 in rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > rxcpp::schedulers::test::messages<int>::next<int>(long, int) /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:381:41
    #7 0x7f9608821d54 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:15:7
    #8 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 48 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f96088380f7 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f96088380f7 in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::notifications::notification<int>::on_completed_notification, std::__1::allocator<rxcpp::notifications::notification<int>::on_completed_notification> > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f96088380f7 in std::__1::shared_ptr<rxcpp::notifications::notification<int>::on_completed_notification> std::__1::shared_ptr<rxcpp::notifications::notification<int>::on_completed_notification>::make_shared<>() /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f9608837e84 in _ZNSt3__111make_sharedIN5rxcpp13notifications12notificationIiE25on_completed_notificationEJEEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS_10shared_ptrIS7_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f9608837e84 in rxcpp::notifications::notification<int>::on_completed() /home/ben/development/yy/pictet/rxcpp/rx-notification.hpp:215
    #6 0x7f9608837c40 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda()#1}::operator()() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:208:62
    #7 0x7f960882f147 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >::on_completed() const /home/ben/development/yy/pictet/rxcpp/rx-subscriber.hpp:182:9
    #8 0x7f960882f44a in void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}::operator()(int) const /home/ben/development/yy/pictet/rxcpp/operators/rx-take.hpp:85:25
    #9 0x7f960883dc90 in void rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > >::nextdetacher::operator()<int>(int) /home/ben/development/yy/pictet/rxcpp/rx-subscriber.hpp:47:13
    #10 0x7f960883db18 in void rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > >::on_next<int const&>(int const&) const /home/ben/development/yy/pictet/rxcpp/rx-subscriber.hpp:168:9
    #11 0x7f9608844944 in rxcpp::schedulers::detail::hot_observable<int>::hot_observable(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>, rxcpp::schedulers::worker, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >)::{lambda(rxcpp::schedulers::schedulable const&)#1}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:305:29
    #12 0x7f960884474e in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::schedulers::detail::hot_observable<int>::hot_observable(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>, rxcpp::schedulers::worker, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >)::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::schedulers::detail::hot_observable<int>::hot_observable(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>, rxcpp::schedulers::worker, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >)::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #13 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #14 0x7f960884a0d2 in rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:216:21
    #15 0x7f9608849f6e in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #16 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #17 0x7f96088266c9 in rxcpp::schedulers::detail::virtual_time_base<long, long>::start() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:83:21
    #18 0x7f96088223ee in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:468:13
    #19 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #20 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #21 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 48 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f96088380f7 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f96088380f7 in std::__1::allocator<std::__1::__shared_ptr_emplace<rxcpp::notifications::notification<int>::on_completed_notification, std::__1::allocator<rxcpp::notifications::notification<int>::on_completed_notification> > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f96088380f7 in std::__1::shared_ptr<rxcpp::notifications::notification<int>::on_completed_notification> std::__1::shared_ptr<rxcpp::notifications::notification<int>::on_completed_notification>::make_shared<>() /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4279
    #4 0x7f9608837e84 in _ZNSt3__111make_sharedIN5rxcpp13notifications12notificationIiE25on_completed_notificationEJEEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS_10shared_ptrIS7_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #5 0x7f9608837e84 in rxcpp::notifications::notification<int>::on_completed() /home/ben/development/yy/pictet/rxcpp/rx-notification.hpp:215
    #6 0x7f9608825d29 in rxcpp::schedulers::test::messages<int>::completed(long) /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:385:41
    #7 0x7f9608821d65 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:16:7
    #8 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 48 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f960883f56f in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f960883f56f in std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f960883f56f in std::__1::allocator_traits<std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >::allocate(std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > >&, unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1439
    #4 0x7f960883f56f in std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >::allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/vector:936
    #5 0x7f960883f4da in std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >::vector(std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > > const&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/vector:1214:9
    #6 0x7f96088436ce in rxcpp::schedulers::detail::hot_observable<int>::hot_observable(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>, rxcpp::schedulers::worker, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >) /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:294:11
    #7 0x7f9608840385 in std::__1::__libcpp_compressed_pair_imp<std::__1::allocator<rxcpp::schedulers::detail::hot_observable<int> >, rxcpp::schedulers::detail::hot_observable<int>, 1u>::__libcpp_compressed_pair_imp<std::__1::allocator<rxcpp::schedulers::detail::hot_observable<int> >&, std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&, rxcpp::schedulers::worker&&, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >&&, 0ul, 0ul, 1ul, 2ul>(std::__1::piecewise_construct_t, std::__1::tuple<std::__1::allocator<rxcpp::schedulers::detail::hot_observable<int> >&>, std::__1::tuple<std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&, rxcpp::schedulers::worker&&, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >&&>, std::__1::__tuple_indices<0ul>, std::__1::__tuple_indices<0ul, 1ul, 2ul>) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:2100:15
    #8 0x7f9608840385 in std::__1::__compressed_pair<std::__1::allocator<rxcpp::schedulers::detail::hot_observable<int> >, rxcpp::schedulers::detail::hot_observable<int> >::__compressed_pair<std::__1::allocator<rxcpp::schedulers::detail::hot_observable<int> >&, std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&, rxcpp::schedulers::worker&&, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >&&>(std::__1::piecewise_construct_t, std::__1::tuple<std::__1::allocator<rxcpp::schedulers::detail::hot_observable<int> >&>, std::__1::tuple<std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&, rxcpp::schedulers::worker&&, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >&&>) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:2366
    #9 0x7f9608840385 in std::__1::__shared_ptr_emplace<rxcpp::schedulers::detail::hot_observable<int>, std::__1::allocator<rxcpp::schedulers::detail::hot_observable<int> > >::__shared_ptr_emplace<std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&, rxcpp::schedulers::worker, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > > >(std::__1::allocator<rxcpp::schedulers::detail::hot_observable<int> >, std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&, rxcpp::schedulers::worker&&, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:3704
    #10 0x7f9608840385 in std::__1::shared_ptr<rxcpp::schedulers::detail::hot_observable<int> > std::__1::shared_ptr<rxcpp::schedulers::detail::hot_observable<int> >::make_shared<std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&, rxcpp::schedulers::worker, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > > >(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>&, rxcpp::schedulers::worker&&, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4280
    #11 0x7f960883fadb in _ZNSt3__111make_sharedIN5rxcpp10schedulers6detail14hot_observableIiEEJRNS_10shared_ptrINS3_9test_type15test_type_stateEEENS2_6workerENS_6vectorINS1_13notifications8recordedINS6_INSD_6detail17notification_baseIiEEEEEENS_9allocatorISJ_EEEEEEENS_9enable_ifIXntsr8is_arrayIT_EE5valueENS6_ISO_EEE4typeEDpOT0_ /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:4644:12
    #12 0x7f960883fadb in rxcpp::test::testable_observable<int> rxcpp::schedulers::detail::test_type::make_hot_observable<int>(std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:336
    #13 0x7f9608825b2e in _ZNK5rxcpp10schedulers4test19make_hot_observableINS_13notifications8recordedINSt3__110shared_ptrINS3_6detail17notification_baseIiEEEEEEEEDTclptptdtdefpT6tester19make_hot_observablecvNS5_6vectorIT_NS5_9allocatorISD_EEEE_EEESt16initializer_listISD_E /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:551:21
    #14 0x7f9608821d8c in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:14:14
    #15 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 48 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f960883a51a in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f960883a51a in std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > >::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f960883a51a in std::__1::allocator_traits<std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >::allocate(std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > >&, unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1439
    #4 0x7f960883a51a in std::__1::__split_buffer<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > >&>::__split_buffer(unsigned long, unsigned long, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > >&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/__split_buffer:325
    #5 0x7f960883a3b2 in void std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >::__push_back_slow_path<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > >(rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/vector:1579:49
    #6 0x7f9608837cfd in std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >::push_back(rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/vector:1620:9
    #7 0x7f9608837cfd in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::detail::test_type::test_type_worker::make_subscriber<int>() const::{lambda()#1}::operator()() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:207
    #8 0x7f960882f147 in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >::on_completed() const /home/ben/development/yy/pictet/rxcpp/rx-subscriber.hpp:182:9
    #9 0x7f960882f44a in void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}::operator()(int) const /home/ben/development/yy/pictet/rxcpp/operators/rx-take.hpp:85:25
    #10 0x7f960883dc90 in void rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > >::nextdetacher::operator()<int>(int) /home/ben/development/yy/pictet/rxcpp/rx-subscriber.hpp:47:13
    #11 0x7f960883db18 in void rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > >::on_next<int const&>(int const&) const /home/ben/development/yy/pictet/rxcpp/rx-subscriber.hpp:168:9
    #12 0x7f9608844944 in rxcpp::schedulers::detail::hot_observable<int>::hot_observable(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>, rxcpp::schedulers::worker, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >)::{lambda(rxcpp::schedulers::schedulable const&)#1}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:305:29
    #13 0x7f960884474e in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::schedulers::detail::hot_observable<int>::hot_observable(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>, rxcpp::schedulers::worker, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >)::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::schedulers::detail::hot_observable<int>::hot_observable(std::__1::shared_ptr<rxcpp::schedulers::detail::test_type::test_type_state>, rxcpp::schedulers::worker, std::__1::vector<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > >, std::__1::allocator<rxcpp::notifications::recorded<std::__1::shared_ptr<rxcpp::notifications::detail::notification_base<int> > > > >)::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #14 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #15 0x7f960884a0d2 in rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:216:21
    #16 0x7f9608849f6e in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #17 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #18 0x7f96088266c9 in rxcpp::schedulers::detail::virtual_time_base<long, long>::start() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:83:21
    #19 0x7f96088223ee in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:468:13
    #20 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #21 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #22 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

Indirect leak of 16 byte(s) in 1 object(s) allocated from:
    #0 0x7f9608820e32 in operator new(unsigned long) /home/ben/development/llvm/3.6.0/llvm/projects/compiler-rt/lib/asan/asan_new_delete.cc:62:35
    #1 0x7f9608842675 in std::__1::__allocate(unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/new:156:10
    #2 0x7f9608842675 in std::__1::allocator<rxcpp::notifications::subscription>::allocate(unsigned long, void const*) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1634
    #3 0x7f9608842675 in std::__1::allocator_traits<std::__1::allocator<rxcpp::notifications::subscription> >::allocate(std::__1::allocator<rxcpp::notifications::subscription>&, unsigned long) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/memory:1439
    #4 0x7f9608842675 in std::__1::__split_buffer<rxcpp::notifications::subscription, std::__1::allocator<rxcpp::notifications::subscription>&>::__split_buffer(unsigned long, unsigned long, std::__1::allocator<rxcpp::notifications::subscription>&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/__split_buffer:325
    #5 0x7f9608842500 in void std::__1::vector<rxcpp::notifications::subscription, std::__1::allocator<rxcpp::notifications::subscription> >::__push_back_slow_path<rxcpp::notifications::subscription>(rxcpp::notifications::subscription&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/vector:1579:49
    #6 0x7f9608840b39 in std::__1::vector<rxcpp::notifications::subscription, std::__1::allocator<rxcpp::notifications::subscription> >::push_back(rxcpp::notifications::subscription&&) /home/ben/development/llvm/3.6.0/install/release/bin/../include/c++/v1/vector:1620:9
    #7 0x7f9608840b39 in rxcpp::schedulers::detail::hot_observable<int>::on_subscribe(rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int> > >) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:314
    #8 0x7f960882dc71 in _ZNK5rxcpp4test6detail11test_sourceIiE12on_subscribeINS_10subscriberIiNS_8observerIiNS_15static_observerIiZNKS_9operators6detail4takeIiNS_10observableIiS3_EEiE12on_subscribeINS5_IiNS0_17testable_observerIiEEEEEEvRKT_EUliE_ZNKSE_ISH_EEvSK_EUlSt13exception_ptrE_ZNKSE_ISH_EEvSK_EUlvE_EEEEEEEENSt3__19enable_ifIXntsr3std7is_sameISI_NS5_IiNS6_IiNS_16dynamic_observerIiEEEEEEEE5valueEvE4typeESI_ /home/ben/development/yy/pictet/rxcpp/rx-test.hpp:45:9
    #9 0x7f960882da21 in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::test::detail::test_source<int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > > >(rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > >) const::{lambda()#1}::operator()() const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:329:17
    #10 0x7f960882ce29 in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::test::detail::test_source<int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > > >(rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::static_observer<int, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}> > >) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:349:13
    #11 0x7f960882cb9d in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::test::detail::test_source<int> >::subscribe<rxcpp::composite_subscription&, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}>(rxcpp::composite_subscription&, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(int)#1}&&, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda(std::exception_ptr)#1}&&, void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const::{lambda()#1}&&) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:472:16
    #12 0x7f960882c2e4 in void rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int>::on_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > const&) const /home/ben/development/yy/pictet/rxcpp/operators/rx-take.hpp:72:9
    #13 0x7f960882bead in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda()#1}::operator()() const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:329:17
    #14 0x7f96088311be in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #15 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #16 0x7f9608832584 in rxcpp::schedulers::current_thread::current_worker::schedule(std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-currentthread.hpp:200:17
    #17 0x7f960882bcf8 in _ZNK5rxcpp10schedulers6worker8scheduleIZNKS_10observableIiNS_9operators6detail4takeIiNS3_IiNS_4test6detail11test_sourceIiEEEEiEEE16detail_subscribeINS_10subscriberIiNS7_17testable_observerIiEEEEEENS_22composite_subscriptionET_EUlRKNS0_11schedulableEE_JEEENSt3__19enable_ifIXaaoosr6detail18is_action_functionISK_EE5valuesr15is_subscriptionISK_EE5valuentsr14is_schedulableISK_EE5valueEvE4typeEOSK_DpOT0_ /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:743:5
    #18 0x7f960882b6bd in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::detail_subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > >(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:354:5
    #19 0x7f960882b4a4 in rxcpp::composite_subscription rxcpp::observable<int, rxcpp::operators::detail::take<int, rxcpp::observable<int, rxcpp::test::detail::test_source<int> >, int> >::subscribe<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >&>(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> >&) const /home/ben/development/yy/pictet/rxcpp/rx-observable.hpp:472:16
    #20 0x7f960882426c in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:462:17
    #21 0x7f960882411b in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}>(rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const::{lambda(rxcpp::schedulers::schedulable const&)#2}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #22 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #23 0x7f960884a0d2 in rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}::operator()(rxcpp::schedulers::schedulable const&) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:216:21
    #24 0x7f9608849f6e in rxcpp::schedulers::action rxcpp::schedulers::make_action<rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}>(rxcpp::schedulers::virtual_time<long, long>::schedule_absolute(long, rxcpp::schedulers::schedulable const&) const::{lambda(rxcpp::schedulers::schedulable const&)#1}&&)::{lambda(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const&)#1}::operator()(rxcpp::schedulers::schedulable const&, rxcpp::schedulers::recurse const) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:663:17
    #25 0x7f96088269a9 in rxcpp::schedulers::schedulable::operator()(rxcpp::schedulers::recurse const&) const /home/ben/development/yy/pictet/rxcpp/rx-scheduler.hpp:603:9
    #26 0x7f96088266c9 in rxcpp::schedulers::detail::virtual_time_base<long, long>::start() const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-virtualtime.hpp:83:21
    #27 0x7f96088223ee in rxcpp::subscriber<int, rxcpp::test::testable_observer<int> > rxcpp::schedulers::test::test_worker::start<int, main::$_0>(main::$_0, long, long, long) const /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:468:13
    #28 0x7f9608822098 in _ZNK5rxcpp10schedulers4test11test_worker5startIZ4mainE3$_0EENSt3__19enable_ifIXsr6detail25is_create_source_functionIT_EE5valueENS2_12start_traitsIS7_EEE4type15subscriber_typeES7_ /home/ben/development/yy/pictet/rxcpp/schedulers/rx-test.hpp:513:20
    #29 0x7f9608821de4 in main /home/ben/development/yy/pictet/Test/TestRx/test_rx_leak.cpp:19:15
    #30 0x7f96070fdec4 in __libc_start_main /build/buildd/eglibc-2.19/csu/libc-start.c:287

SUMMARY: AddressSanitizer: 2880 byte(s) leaked in 22 allocation(s).

compilation error when rxcpp/rx.hpp is included after boost/asio.hpp on linux

simple program below causes compile error on linux:

#include <boost/asio.hpp>
#include <rxcpp/rx.hpp>

int main(int ,char**){
return 0;
}

The problem is that asio on linux include termios.h which in turn include bits/termios.h

bits/termios.h defindes B0 which conflicts with
rx-util.hpp
all_true struct.

Solution would be change B0 to another name in rx-util.hpp

operator `distinct`

It looks only operator distinct_until_changed is implemented. But there is no distinct. Any reasons for that?

I can implement it (e.g. using unordered_set) and issue a PR if you wish..

Reducing of empty observable

When Reduce, Average, First, or Last operator is applied to an empty sequence, on Rx.NET and RxJava it causes OnError().

On RxCpp we have different behavior:
reduce returns result_selector(seed)
average returns 0 (T() to be correct)
first calls abort()
last calls abort()
I propose to call OnError() for all operators. @kirkshoop, please let me know your opinion.

Create observable with disposable

In some implementation of Rx, when observable is manually created, api allow to optionally provide a manual disposable cleanup.

Here is the example from RxJs

var o = Rx.Observable.create(function (observer) {
    observer.onNext(42);
    observer.onCompleted();
    return function () {
        console.log('disposed');
    };
});

I expect something along this line, but it doesn't seem to be supported.

auto o = rxcpp::observable<>::create<int>([](rxcpp::subscriber<int> s) {
  s.on_next(42);
  s.on_complete();
  return []() { std::cout << "manual disposed" << std::endl; };
});

How can we achieve the same thing with RxCpp?

Compiling for iOS

Clang gives this error when compiling for iOS (arm64):

In file included from ../deps/rxcpp/Rx/v2/src/rxcpp/rx.hpp:8:
In file included from ../deps/rxcpp/Rx/v2/src/rxcpp/rx-includes.hpp:124:
In file included from ../deps/rxcpp/Rx/v2/src/rxcpp/rx-scheduler.hpp:861:
../deps/rxcpp/Rx/v2/src/rxcpp/schedulers/rx-currentthread.hpp:35:16: error: thread-local storage is unsupported for the current target
        static RXCPP_THREAD_LOCAL current_thread_queue_type* queue;

Version info (Xcode 5.1.1):

Apple LLVM version 5.1 (clang-503.0.40) (based on LLVM 3.4svn)
Target: x86_64-apple-darwin13.2.0
Thread model: posix

I get this for both device and simulator builds. Apologies if Rx isn't supported on iOS yet. I'm curious to check it out!

Observer / observable as member of a class

Hello,

I created an observer by applying the onNext and onError functions.
After I get an observer from the subscriber (by creating an observable on same data type).
This observers, get from 2 differents sources, but handling the same data type, seams to have different signatures.

How to align the signature of this 2 observers get from different sources, and copy to a member of a class? (No "auto" allowed here).

EDIT:
Similar problem with the observable.
I create it by setting to it a lambda. I need to copy this observable in a member of a class.
Here is the error displayed by vc++

Error C2440: 'return' : cannot convert from 'rxcpp::observable<T,rxcpp::sources::detail::create<T,Scene::onMessage::<lambda_4e1311247554cfa7219cec4f9e2964dd>>>' to 'rxcpp::observable<int,rxcpp::dynamic_observable<T>>' ...

The signature I use is rx::observable<int>

Memory Leak related to concurrency

There is a fairly easy way to leak memory in rxcpp. It only occurs once you introduced concurrency.

Here are a few contrived examples which create a leak:

while (true)
{
    rxcpp::sources::interval(std::chrono::milliseconds(100))
        .subscribe_on(rx::observe_on_new_thread())
        .take_until(rx::sources::timer(std::chrono::milliseconds(1)))
        .as_blocking()
        .subscribe();
}

this will do it too:

while (true)
{
    rxcpp::sources::range(1, 1000000000)
        .subscribe_on(rx::observe_on_new_thread())
        .filter([](int x) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); return true; })
        .take_until(rx::sources::timer(std::chrono::milliseconds(1)))
        .as_blocking()
        .subscribe();
}

and this one:

while (true)
{
    auto pipeline =
        rxcpp::sources::range(1, 1000000000)
        .filter([](int x) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); return true; })
        ;

    auto subscription = pipeline.subscribe_on(rx::observe_on_new_thread()).publish().connect();

    std::this_thread::sleep_for(std::chrono::milliseconds(1));
    subscription.unsubscribe();
}

In my real world use case, I have my own source which produces multimedia samples and is implemented using the rxcpp scheduling abstractions (i.e. not just a loop, refer to issue #129). The examples above appear to be leaking an individual value in the pipeline (educated guess). In my case, my individual value is a multimedia sample and is much bigger than an int! So the memory leak is very noticeable.

Hopefully I've provided enough examples where someone much smarter than myself can chase down the cause.

Thanks,

Jeff

building rxcpp on windows

IDE build also the makefile build ends up in errors:

Error [some error number] error C3203: 'value_type_t' : unspecialized alias template can't be used as a template argument for template parameter 'T', expected a real type [some files]

Seems the VC++ compiler not fully support these C++11 features?
Any solution for that?

Implementation of catch operators

Are there any plans for implementing the catch operators? I wanted to use onErrorResumeNext, but I realized it's not implemented yet for RxCpp. I'm using RxCpp to write a web api client for my application. Basically, I want to be able to do something like this:

apiClient.fetchSomeResource(token)
   .on_error_resume_next([apiClient](std::exception_ptr e) {
      try {
         std::rethrow_exception(e);
      } catch (const UnauthorizedHttpException &e) {

         // When server returns 401, call authenticate() to get
         // a new token, and then recall the same api
         return apiClient.authenticate()
            .flat_map([](const AuthToken &token) {
               return apiClient.fetchSomeResource(token);
            }, [](const AuthToken &token, Resource resource) {
               return resource;
            });

      } catch (const std::exception &e) {
         std::rethrow_exception(e);
      }
   })
   .subscribe([](const Resource &resource) {
      //...
   });

Not all source code available in VS2012 solution?

Maybe I missed something, but AFAICS the header files are not included with the VS solution/project generated by CMake-3.2.2, are they? I can only access these via external dependencies.
I do not see any files from the Rx/v2/src/rxcpp directory included directly. Is this by purpose and can we add them to the solution?

image

Can't create observable

Hello,

I can't create an observable like this "cep" example :
https://github.com/Reactive-Extensions/RxCpp/blob/bd97cedb5a11c0666c14efa9d5abb9769480a884/Rx/v2/examples/cep/main.cpp#L19

IntelliSense: no instance of function template "rxcpp::observable<void, void>::create" matches the argument list
    argument types are: (lambda []void (rxcpp::subscriber<int, rxcpp::observer<int, rxcpp::dynamic_observer<int>>> dest)->void) ...  

Is the example broken or did I do something wrong?

linq_selectmany.hpp:34:26: error: use of undeclared identifier 'from'

Hi,
I am a newbie and I have something like:

#include <vector>

#include <cpplinq/linq.hpp>

void ReadEventData(std::vector<std::vector<Particle>>& v);

std::vector<std::vector<Particle>> ED;

void ROOTLinq() {
   ReadEventData(ED);

   using namespace cpplinq;

   auto n = from(ED).select_many([](std::vector<Particle> v) {return 1;}).
      where([](const Particle&p){return p.fCharge > 0;}).count();
...

I think select_many doesn't have the definition of the template from. I compile with:
clang -v
Apple LLVM version 5.1 (clang-503.0.40) (based on LLVM 3.4svn)
Target: x86_64-apple-darwin13.3.0
Thread model: posix
Cheers,
Vassil

How do avoid racing conditions with is_subscribed

Hi!

I feel somewhat uncomfortable with is_subscribed. There is a sequence point after calling is_subscribed. So another thread might unsubscribe the subscriber and then s.on_next() gets called on an unsubscribed object? Is this safe? And if yes, why do we call is_subscribed anyway?

auto published_observable =
    rxcpp::sources::create<int>([](const rxcpp::subscriber<int>& s)
    {
        for (int i = 0; i < 1000; i++)
        {
            if (!s.is_subscribed())
                return;
            s.on_next(i);
        }
    })

Consider renaming of root namespace (rxcpp)

The cpp in the rxcpp namespace seems superfluous. I therefore suggest that a switch of the base namespace should be considered.

Rx.Net uses the namespace reactive and RxJava uses the namespace rx. So the two natural suggestions would be "reactive" and "rx".

I would prefer "rx" since it reads much better (in the test code @kirkshoop has aliased the rxcpp namespace to rx). I know however that @kirkshoop considers it too short and that it therefore could clash with the name of classes or a namespaces in existing codebases. However, I would argue that an existing codebase could just as well have a class called reactive as a class called rx (don't have any data whatsoever to back that up though :). It seems less likely that a user have class called rxcpp, so I guess that this comes down to a tradeoff between "grade of uniqueness" vs. "nice and readable".

compile error when scan is followed by other operator.

It seems to be omitted const qualifier in scan::on_subscribe function.

this is my test code:

#include <iostream>
#include <chrono>
#include "rxcpp/rx.hpp"

namespace rx = rxcpp;
namespace rxsc = rxcpp::schedulers;

int main()
{
  auto sc = rxsc::make_current_thread();
  auto so = rx::synchronize_in_one_worker(sc);
  auto start = sc.now() + std::chrono::seconds(2);
  auto period = std::chrono::seconds(1);

  rx::observable<>::interval(start, period, so)
    .scan(0, [] (int a, int i) { return a + i; })
    .map([] (int i) { return i * i; })
    .subscribe([] (int i) { std::cout << i << std::endl; });
}

g++ gives this error messages :

.../ext/rxcpp/Rx/v2/src/rxcpp/operators/rx-lift.hpp:65:16: error: no matching member function for call to 'on_subscribe'
        source.on_subscribe(std::move(lifted));
        ~~~~~~~^~~~~~~~~~~~
...
.../ext/rxcpp/Rx/v2/src/rxcpp/operators/rx-scan.hpp:48:10: note: candidate function not viable: 'this' argument has type 'const source_operator_type' (aka 'const rxcpp::operators::detail::scan<long, rxcpp::observable<long, rxcpp::sources::detail::interval<rxcpp::synchronize_in_one_worker> >, <lambda at
      main.cpp:17:14>, int>'), but method is not marked const
    void on_subscribe(Subscriber o) {
         ^

unqualified `ptrdiff_t` & `size_t` use

Is this intentional?

As of GCC 4.9 this is no longer supported (and it was never std. compliant) -- see "Header changes": https://gcc.gnu.org/gcc-4.9/porting_to.html
// Edit: this appears to refer to the header inclusion missing entirely.

Currently #include <cstddef> is used in two places:

A fix would be to either:

  • fully qualify each use of std::ptrdiff_t and std::size_t (there aren't that many)
  • replace #include <cstddef> with #include <stddef.h>

I presume the choice may very well be a matter of preference :-)

Backpressure operators

From what I saw, in RxCpp operators like onBackpressureBuffer, onBackpressureDrop, onBackpressureLatest described here are not implemented.
Is there a plan to implement them in the near future?

feedback on coordination

The goal of coordination is to remove all synchronization from the 'join' and time operators. Re-implementing an optimized serialization of calls to multiple subscribers in each operator obscures the functionality, duplicates the same work over and over again and slows down the cases where all the sources are already serialized (UI events etc..).

An operator passes all the sources through the optional coordination and uses the returned observables instead. These are expected to never overlap calls across all the observers subscribed to them.

This results in multiple implementations of coordination that accomplish the goal in different ways with different tradeoffs, but the operators do not care. The default impl of coordination passes the original source through unchanged which results in the default having no synch overhead.

connect/publish threading issue

consider this:

auto published_observable =
    rx::sources::range(1, 100)
    .filter([](int i)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        std::cout << i << std::endl;
        return true;
    })
    .subscribe_on(rx::observe_on_new_thread())
    .publish();

auto subscription = published_observable.connect();
std::this_thread::sleep_for(std::chrono::seconds(1));
subscription.unsubscribe();
std::cout << "unsubscribed" << std::endl;

I expect this to print 10ish numbers then "unsubscribed". However it prints the entire range up to 100. Am I missing something? I looked around at the source for publish and connect but I couldn't figure out a solution to my problem. I need to be able to unsubscribe the connect()ed subscription.

Thanks for your help,

Jeff

Master branch vs. release/v1.0.2 branch

I am looking to write code that uses this library and am wondering what the difference is between branch master and branch release/v1.0.2 - it seems like master is currently being worked on, as it was updated earlier this week and the other was last updated in February.

However, the master branch doesn't seem to define a method for CreateObservable to create an observable from a function. The other branch does define such a method. Is there a reason for this lack of functionality in the master branch (ie, it's still being worked on and isn't ready for use yet)? Which branch would you recommend using?

Coverting observable<T> to observable<list<T>>

What's the equivalent of RxJava's toList in RxCpp?

auto numbers = rxcpp::observable<>::range(1, 100);
numbers
    ./* convert rxcpp::observable<int> to rxcpp::observable<std::list<int>>> */
    .subscribe([](std::list<int> numbers)
    {
        /* do stuff */
    });

New schedulers?

I'm working on integration for RxCPP and Boost.ASIO as I think they are perfect match, a test scheduler using boost::asio::io_service can be found at https://gist.github.com/windoze/a6e684143833d5d65ba6 and more complete version is coming.
Before submitting PR, I want to know:

  1. Does RxCPP accept 3rd-party dependencies?
  2. Where to put 3rd party dependent components?

Any suggestions?

Builderror on linux 64 bit gcc

I pasted this code in my terminal while being in the root of RxCpp:

mkdir projects/build
cd projects/build
cmake -G"Unix Makefiles" -DCMAKE_C_COMPILER=gcc -DCMAKE_CXX_COMPILER=g++ -DCMAKE_BUILD_TYPE=RelWithDebInfo -B. ../CMake
make```
I got the following output / error message: 

~/dvl/cpp/RxCpp/projects/build$ cmake -G"Unix Makefiles" -DCMAKE_C_COMPILER=gcc -DCMAKE_CXX_COMPILER=g++ -DCMAKE_BUILD_TYPE=RelWithDebInfo -B. ../CMake
-- The C compiler identification is GNU 4.9.2
-- The CXX compiler identification is GNU 4.9.2
-- Check for working C compiler: /usr/bin/gcc
-- Check for working C compiler: /usr/bin/gcc -- works
-- Detecting C compiler ABI info
-- Detecting C compiler ABI info - done
-- Check for working CXX compiler: /usr/bin/g++
-- Check for working CXX compiler: /usr/bin/g++ -- works
-- Detecting CXX compiler ABI info
-- Detecting CXX compiler ABI info - done
-- Looking for include file pthread.h
-- Looking for include file pthread.h - found
-- Looking for pthread_create
-- Looking for pthread_create - not found
-- Looking for pthread_create in pthreads
-- Looking for pthread_create in pthreads - not found
-- Looking for pthread_create in pthread
-- Looking for pthread_create in pthread - found
-- Found Threads: TRUE
-- CMAKE_CXX_COMPILER_ID: GNU
-- using gnu settings
-- RXCPP_DIR: /home/dhaeb/dvl/cpp/RxCpp
-- Found Doxygen: /usr/bin/doxygen (found version "1.8.6")
-- Configuring done
-- Generating done
-- Build files have been written to: /home/dhaeb/dvl/cpp/RxCpp/projects/build
~/dvl/cpp/RxCpp/projects/build$ make
Scanning dependencies of target one_test
[ 2%] Building CXX object CMakeFiles/one_test.dir/home/dhaeb/dvl/cpp/RxCpp/Rx/v2/test/test.cpp.o
/home/dhaeb/dvl/cpp/RxCpp/Rx/v2/test/test.cpp:11:21: fatal error: catch.hpp: No such file or directory
#include "catch.hpp"
^
compilation terminated.
make[2]: *** [CMakeFiles/one_test.dir/home/dhaeb/dvl/cpp/RxCpp/Rx/v2/test/test.cpp.o] Error 1
make[1]: *** [CMakeFiles/one_test.dir/all] Error 2
make: *** [all] Error 2

Where is the catch.hpp file? A ```find . -name "catch.hpp"``` couldn't find it ether... 

heap-use-after-free in rx-subject.hpp

When I was testing with Thread Sanitizer, it kept complaining about this piece of code in rx-subject.hpp and it took a while for me to realize the problem here.

    if (b->current_generation != b->state->generation) {
        std::unique_lock<std::mutex> guard(b->state->lock);
        b->current_generation = b->state->generation;
        b->current_completer = b->completer;
    }

    auto current_completer = b->current_completer;

What happens if Thread-A updates b->current_completer just before Thread-B assigns b->current_completer to the local variable? Assuming no one else is holding reference to b->current_completer, the instance will be destroyed by Thread-A and Thread-B may start operating on the destructed heap space. Can you confirm?

The obvious solution is to not depend on generation numbers and always acquire the mutex and assign b->completer to a local variable. I can submit a patch if we agree on the issue and a possible solution.

lifetime issue

This is similar to #127.

consider this:

auto published_observable =
    rxcpp::sources::create<int>([](const rxcpp::subscriber<int>& s)
    {
        for (int i = 0; i < 1000; i++)
        {
            if (!s.is_subscribed())
                return;
            s.on_next(i);
        }
    })
    .filter([](int i)
    {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        std::cout << i << std::endl;
        return true;
    })
    .subscribe_on(rxcpp::observe_on_new_thread())
    .publish()
    ;

auto subscription = published_observable.connect();
std::this_thread::sleep_for(std::chrono::seconds(1));
subscription.unsubscribe();
std::cout << "unsubscribed" << std::endl;

I'd expect this to print out 10ish integers followed by "unsubscribed". However all 1000 integers are printed out.

In my specific use case, I am actually implementing rxcpp::sources::source_base and creating an observable from it.

    class av_source
        : public rxcpp::sources::source_base<std::shared_ptr<av_sample>>
    {
    public:
        av_source(ipc::media::clip clip);
        void on_subscribe(rxcpp::subscriber<std::shared_ptr<av_sample>> destination) const;

    private:
        const ipc::media::clip clip_;
    };

    inline rxcpp::observable<std::shared_ptr<av_sample>> demux(clip clip)
    {
        return rxcpp::observable<std::shared_ptr<av_sample>, av_source>(av_source(std::move(clip)));
    }

So really, I need to know how to quit processing inside of my source_base implementation. However the issue also presents itself in rxcpp::sources::create.

Thanks,

Jeff

Make_event_loop::inner is empty

I encounter a bug when I create two workers using this code :
rxcpp::schedulers::make_event_loop().create_worker()

Not everytime, but sometimes it crash because inner member is empty (in create_worker)
Do you know this issue?

I encounter this issue sometimes on a standalone app, and most of the time with the Unreal Engine 4 (when I call it several times).

Access Violation Exception during parallel concat calls at startup

Workaround:

spin up rxcpp ahead of time using this code.

rxcpp::sources::from(1).subscribe([](int i) {});

Issue:

Consider the following code.

#include "stdafx.h"
#include "rx.hpp"

int _tmain(int argc, _TCHAR* argv[])
{
    std::thread t1([]()
    {
        std::vector<int> ints1 = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
        rxcpp::sources::iterate(ints1)
            .map([](int i) { return rxcpp::sources::range(i, i); })
            .concat()
            .subscribe([](int i) { std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << i << std::endl; });
    });

    std::thread t2([]()
    {
        std::vector<int> ints2 = { 11, 12, 13, 14, 15, 16, 17, 18, 19, 20 };
        rxcpp::sources::iterate(ints2)
            .map([](int i) { return rxcpp::sources::range(i, i); })
            .concat()
            .subscribe([](int i) { std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << i << std::endl; });
    });

    t1.join();
    t2.join();

    return 0;
}

The following error is thrown a fair amount of the time. It appears to be a library initialization issue, as I've only seen this when the first calls into rxcpp are done using the concat operator in parallel.

Unhandled exception at 0x003356E5 in RxBug.exe: 0xC0000005: Access violation reading location 0x00000000.

RxBug.exe!rxcpp::schedulers::scheduler::create_worker(rxcpp::composite_subscription cs) Line 374 C++
RxBug.exe!rxcpp::identity_one_worker::create_coordinator(rxcpp::composite_subscription cs) Line 159 C++
RxBug.exe!rxcpp::operators::detail::concatrxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker >,rxcpp::observablerxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker >,rxcpp::dynamic_observablerxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker > > >,rxcpp::identity_one_worker>::on_subscribe<rxcpp::subscriber<int,rxcpp::observer<int,rxcpp::static_observer<int,void (int),rxcpp::detail::OnErrorEmpty,rxcpp::detail::OnCompletedEmpty> > > >(rxcpp::subscriber<int,rxcpp::observer<int,rxcpp::static_observer<int,void (int),rxcpp::detail::OnErrorEmpty,rxcpp::detail::OnCompletedEmpty> > > scbr) Line 131 C++
RxBug.exe!rxcpp::observable<int,rxcpp::operators::detail::concat<rxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker> >,rxcpp::observablerxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker >,rxcpp::dynamic_observablerxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker > > >,rxcpp::identity_one_worker> >::detail_subscribe::__l11::() Line 330 C++
RxBug.exe!rxcpp::observable<int,rxcpp::operators::detail::concat<rxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker> >,rxcpp::observablerxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker >,rxcpp::dynamic_observablerxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker > > >,rxcpp::identity_one_worker> >::detail_subscribe::__l15::(const rxcpp::schedulers::schedulable & scbl) Line 346 C++
RxBug.exe!rxcpp::schedulers::make_action::__l11::(const rxcpp::schedulers::schedulable & s, const rxcpp::schedulers::recurse & r) Line 661 C++
RxBug.exe!std::_Callable_obj<void (const rxcpp::schedulers::schedulable &, const rxcpp::schedulers::recurse &),0>::_ApplyX<void,rxcpp::schedulers::schedulable const &,rxcpp::schedulers::recurse const &>(const rxcpp::schedulers::schedulable & <_Args_0>, const rxcpp::schedulers::recurse & <_Args_1>) Line 284 C++
RxBug.exe!std::_Func_impl<std::_Callable_obj<void (const rxcpp::schedulers::schedulable &, const rxcpp::schedulers::recurse &),0>,std::allocator<std::_Func_class<void,rxcpp::schedulers::schedulable const &,rxcpp::schedulers::recurse const &> >,void,rxcpp::schedulers::schedulable const &,rxcpp::schedulers::recurse const &>::_Do_call(const rxcpp::schedulers::schedulable & <_Args_0>, const rxcpp::schedulers::recurse & <_Args_1>) Line 229 C++
RxBug.exe!std::_Func_class<void,rxcpp::schedulers::schedulable const &,rxcpp::schedulers::recurse const &>::operator()(const rxcpp::schedulers::schedulable & <_Args_0>, const rxcpp::schedulers::recurse & <_Args_1>) Line 315 C++
RxBug.exe!rxcpp::schedulers::detail::action_type::operator()(const rxcpp::schedulers::schedulable & s, const rxcpp::schedulers::recurse & r) Line 631 C++
RxBug.exe!rxcpp::schedulers::action::operator()(const rxcpp::schedulers::schedulable & s, const rxcpp::schedulers::recurse & r) Line 638 C++
RxBug.exe!rxcpp::schedulers::schedulable::operator()(const rxcpp::schedulers::recurse & r) Line 597 C++
RxBug.exe!rxcpp::schedulers::current_thread::current_worker::schedule(std::chrono::time_pointstd::chrono::system_clock,std::chrono::duration<__int64,std::ratio<1,10000000 > > when, const rxcpp::schedulers::schedulable & scbl) Line 202 C++
RxBug.exe!rxcpp::schedulers::current_thread::current_worker::schedule(const rxcpp::schedulers::schedulable & scbl) Line 173 C++
RxBug.exe!rxcpp::schedulers::worker::schedule<void (const rxcpp::schedulers::schedulable &) >(rxcpp::observable<int,rxcpp::operators::detail::concat<rxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker> >,rxcpp::observablerxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker >,rxcpp::dynamic_observablerxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker > > >,rxcpp::identity_one_worker> >::detail_subscribe::__l15::void (const rxcpp::schedulers::schedulable &) && a0) Line 740 C++
RxBug.exe!rxcpp::observable<int,rxcpp::operators::detail::concat<rxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker> >,rxcpp::observablerxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker >,rxcpp::dynamic_observablerxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker > > >,rxcpp::identity_one_worker> >::detail_subscribe<rxcpp::subscriber<int,rxcpp::observer<int,rxcpp::static_observer<int,void (int),rxcpp::detail::OnErrorEmpty,rxcpp::detail::OnCompletedEmpty> > > >(rxcpp::subscriber<int,rxcpp::observer<int,rxcpp::static_observer<int,void (int),rxcpp::detail::OnErrorEmpty,rxcpp::detail::OnCompletedEmpty> > > o) Line 346 C++
RxBug.exe!rxcpp::observable<int,rxcpp::operators::detail::concat<rxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker> >,rxcpp::observablerxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker >,rxcpp::dynamic_observablerxcpp::observable<int,rxcpp::sources::detail::range<int,rxcpp::identity_one_worker > > >,rxcpp::identity_one_worker> >::subscribe<void (int) >(wmain::__l8::void (void)::__l12::void (int) && <an_0>) Line 448 C++
RxBug.exe!wmain::__l8::() Line 21 C++
RxBug.exe!std::_Bind<0,void,void (void) >::_Do_call<>(std::tuple<> _Myfargs, std::_Arg_idx<> __formal) Line 1150 C++
RxBug.exe!std::_Bind<0,void,void (void) >::operator()<>() Line 1138 C++
RxBug.exe!std::_LaunchPad<std::_Bind<0,void,void (void) > >::_Run(std::_LaunchPad<std::_Bind<0,void,void (void) > > * _Ln) Line 196 C++
RxBug.exe!std::_LaunchPad<std::_Bind<0,void,void (void) > >::_Go() Line 187 C++
msvcp120d.dll!_Call_func(void * _Data) Line 28 C++
msvcr120d.dll!_callthreadstartex() Line 376 C
msvcr120d.dll!_threadstartex(void * ptd) Line 359 C
kernel32.dll!@BaseThreadInitThunk@12�() Unknown
ntdll.dll!___RtlUserThreadStart@8�() Unknown
ntdll.dll!__RtlUserThreadStart@8�() Unknown

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.