summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/rx-subscriber.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/v2/src/rxcpp/rx-subscriber.hpp')
-rw-r--r--Rx/v2/src/rxcpp/rx-subscriber.hpp835
1 files changed, 0 insertions, 835 deletions
diff --git a/Rx/v2/src/rxcpp/rx-subscriber.hpp b/Rx/v2/src/rxcpp/rx-subscriber.hpp
deleted file mode 100644
index 3d6c515..0000000
--- a/Rx/v2/src/rxcpp/rx-subscriber.hpp
+++ /dev/null
@@ -1,835 +0,0 @@
-// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
-
-#pragma once
-
-#if !defined(RXCPP_RX_SUBSCRIBER_HPP)
-#define RXCPP_RX_SUBSCRIBER_HPP
-
-#include "rx-includes.hpp"
-
-namespace rxcpp {
-
-template<class T>
-struct subscriber_base : public observer_base<T>, public subscription_base
-{
- typedef tag_subscriber subscriber_tag;
-};
-
-/*!
- \brief binds an observer that consumes values with a composite_subscription that controls lifetime.
-
- \ingroup group-core
-
-*/
-template<class T, class Observer = observer<T>>
-class subscriber : public subscriber_base<T>
-{
- static_assert(!is_subscriber<Observer>::value, "not allowed to nest subscribers");
- static_assert(is_observer<Observer>::value, "subscriber must contain an observer<T, ...>");
- typedef subscriber<T, Observer> this_type;
- typedef rxu::decay_t<Observer> observer_type;
-
- composite_subscription lifetime;
- observer_type destination;
- trace_id id;
-
- struct nextdetacher
- {
- ~nextdetacher()
- {
- trace_activity().on_next_return(*that);
- if (do_unsubscribe) {
- that->unsubscribe();
- }
- }
- nextdetacher(const this_type* that)
- : that(that)
- , do_unsubscribe(true)
- {
- }
- template<class U>
- void operator()(U u) {
- trace_activity().on_next_enter(*that, u);
- RXCPP_TRY {
- that->destination.on_next(std::move(u));
- do_unsubscribe = false;
- } RXCPP_CATCH(...) {
- auto ex = rxu::current_exception();
- trace_activity().on_error_enter(*that, ex);
- that->destination.on_error(std::move(ex));
- trace_activity().on_error_return(*that);
- }
- }
- const this_type* that;
- volatile bool do_unsubscribe;
- };
-
- struct errordetacher
- {
- ~errordetacher()
- {
- trace_activity().on_error_return(*that);
- that->unsubscribe();
- }
- errordetacher(const this_type* that)
- : that(that)
- {
- }
- inline void operator()(rxu::error_ptr ex) {
- trace_activity().on_error_enter(*that, ex);
- that->destination.on_error(std::move(ex));
- }
- const this_type* that;
- };
-
- struct completeddetacher
- {
- ~completeddetacher()
- {
- trace_activity().on_completed_return(*that);
- that->unsubscribe();
- }
- completeddetacher(const this_type* that)
- : that(that)
- {
- }
- inline void operator()() {
- trace_activity().on_completed_enter(*that);
- that->destination.on_completed();
- }
- const this_type* that;
- };
-
- subscriber();
-public:
- typedef typename composite_subscription::weak_subscription weak_subscription;
-
- subscriber(const this_type& o)
- : lifetime(o.lifetime)
- , destination(o.destination)
- , id(o.id)
- {
- }
- subscriber(this_type&& o)
- : lifetime(std::move(o.lifetime))
- , destination(std::move(o.destination))
- , id(std::move(o.id))
- {
- }
-
- template<class U, class O>
- friend class subscriber;
-
- template<class O>
- subscriber(
- const subscriber<T, O>& o,
- typename std::enable_if<
- !std::is_same<O, observer<T>>::value &&
- std::is_same<Observer, observer<T>>::value, void**>::type = nullptr)
- : lifetime(o.lifetime)
- , destination(o.destination.as_dynamic())
- , id(o.id)
- {
- }
-
- template<class U>
- subscriber(trace_id id, composite_subscription cs, U&& o)
- : lifetime(std::move(cs))
- , destination(std::forward<U>(o))
- , id(std::move(id))
- {
- static_assert(!is_subscriber<U>::value, "cannot nest subscribers");
- static_assert(is_observer<U>::value, "must pass observer to subscriber");
- trace_activity().create_subscriber(*this);
- }
-
- this_type& operator=(this_type o) {
- lifetime = std::move(o.lifetime);
- destination = std::move(o.destination);
- id = std::move(o.id);
- return *this;
- }
-
- const observer_type& get_observer() const {
- return destination;
- }
- observer_type& get_observer() {
- return destination;
- }
- const composite_subscription& get_subscription() const {
- return lifetime;
- }
- composite_subscription& get_subscription() {
- return lifetime;
- }
- trace_id get_id() const {
- return id;
- }
-
- subscriber<T> as_dynamic() const {
- return subscriber<T>(id, lifetime, destination.as_dynamic());
- }
-
- // observer
- //
- template<class V>
- void on_next(V&& v) const {
- if (!is_subscribed()) {
- return;
- }
- nextdetacher protect(this);
- protect(std::forward<V>(v));
- }
- void on_error(rxu::error_ptr e) const {
- if (!is_subscribed()) {
- return;
- }
- errordetacher protect(this);
- protect(std::move(e));
- }
- void on_completed() const {
- if (!is_subscribed()) {
- return;
- }
- completeddetacher protect(this);
- protect();
- }
-
- // composite_subscription
- //
- bool is_subscribed() const {
- return lifetime.is_subscribed();
- }
- weak_subscription add(subscription s) const {
- return lifetime.add(std::move(s));
- }
- template<class F>
- auto add(F f) const
- -> typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
- return lifetime.add(make_subscription(std::move(f)));
- }
- void remove(weak_subscription w) const {
- return lifetime.remove(std::move(w));
- }
- void clear() const {
- return lifetime.clear();
- }
- void unsubscribe() const {
- return lifetime.unsubscribe();
- }
-
-};
-
-template<class T, class Observer>
-auto make_subscriber(
- subscriber<T, Observer> o)
- -> subscriber<T, Observer> {
- return subscriber<T, Observer>(std::move(o));
-}
-
-// observer
-//
-
-template<class T>
-auto make_subscriber()
- -> typename std::enable_if<
- detail::is_on_next_of<T, detail::OnNextEmpty<T>>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>>>::type {
- return subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>>(trace_id::make_next_id_subscriber(), composite_subscription(),
- observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>()));
-}
-
-template<class T, class I>
-auto make_subscriber(
- const observer<T, I>& o)
- -> subscriber<T, observer<T, I>> {
- return subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), composite_subscription(), o);
-}
-template<class T, class Observer>
-auto make_subscriber(const Observer& o)
- -> typename std::enable_if<
- is_observer<Observer>::value &&
- !is_subscriber<Observer>::value,
- subscriber<T, Observer>>::type {
- return subscriber<T, Observer>(trace_id::make_next_id_subscriber(), composite_subscription(), o);
-}
-template<class T, class Observer>
-auto make_subscriber(const Observer& o)
- -> typename std::enable_if<
- !detail::is_on_next_of<T, Observer>::value &&
- !is_subscriber<Observer>::value &&
- !is_subscription<Observer>::value &&
- !is_observer<Observer>::value,
- subscriber<T, observer<T, Observer>>>::type {
- return subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), composite_subscription(), o);
-}
-template<class T, class OnNext>
-auto make_subscriber(const OnNext& on)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type {
- return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(trace_id::make_next_id_subscriber(), composite_subscription(),
- observer<T, detail::stateless_observer_tag, OnNext>(on));
-}
-template<class T, class OnNext, class OnError>
-auto make_subscriber(const OnNext& on, const OnError& oe)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value &&
- detail::is_on_error<OnError>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type {
- return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(trace_id::make_next_id_subscriber(), composite_subscription(),
- observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe));
-}
-template<class T, class OnNext, class OnCompleted>
-auto make_subscriber(const OnNext& on, const OnCompleted& oc)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value &&
- detail::is_on_completed<OnCompleted>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type {
- return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(trace_id::make_next_id_subscriber(), composite_subscription(),
- observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc));
-}
-template<class T, class OnNext, class OnError, class OnCompleted>
-auto make_subscriber(const OnNext& on, const OnError& oe, const OnCompleted& oc)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value &&
- detail::is_on_error<OnError>::value &&
- detail::is_on_completed<OnCompleted>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type {
- return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(trace_id::make_next_id_subscriber(), composite_subscription(),
- observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc));
-}
-
-// explicit lifetime
-//
-
-template<class T>
-auto make_subscriber(const composite_subscription& cs)
- -> subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>> {
- return subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>>(trace_id::make_next_id_subscriber(), cs,
- observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>()));
-}
-
-template<class T, class I>
-auto make_subscriber(const composite_subscription& cs,
- const observer<T, I>& o)
- -> subscriber<T, observer<T, I>> {
- return subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), cs, o);
-}
-template<class T, class I>
-auto make_subscriber(const composite_subscription& cs,
- const subscriber<T, I>& s)
- -> subscriber<T, I> {
- return subscriber<T, I>(trace_id::make_next_id_subscriber(), cs, s.get_observer());
-}
-template<class T, class Observer>
-auto make_subscriber(const composite_subscription& cs, const Observer& o)
- -> typename std::enable_if<
- !is_subscriber<Observer>::value &&
- is_observer<Observer>::value,
- subscriber<T, Observer>>::type {
- return subscriber<T, Observer>(trace_id::make_next_id_subscriber(), cs, o);
-}
-template<class T, class Observer>
-auto make_subscriber(const composite_subscription& cs, const Observer& o)
- -> typename std::enable_if<
- !detail::is_on_next_of<T, Observer>::value &&
- !is_subscriber<Observer>::value &&
- !is_subscription<Observer>::value &&
- !is_observer<Observer>::value,
- subscriber<T, observer<T, Observer>>>::type {
- return subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), cs, make_observer<T>(o));
-}
-template<class T, class OnNext>
-auto make_subscriber(const composite_subscription& cs, const OnNext& on)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type {
- return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(trace_id::make_next_id_subscriber(), cs,
- observer<T, detail::stateless_observer_tag, OnNext>(on));
-}
-template<class T, class OnNext, class OnError>
-auto make_subscriber(const composite_subscription& cs, const OnNext& on, const OnError& oe)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value &&
- detail::is_on_error<OnError>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type {
- return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(trace_id::make_next_id_subscriber(), cs,
- observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe));
-}
-template<class T, class OnNext, class OnCompleted>
-auto make_subscriber(const composite_subscription& cs, const OnNext& on, const OnCompleted& oc)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value &&
- detail::is_on_completed<OnCompleted>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type {
- return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(trace_id::make_next_id_subscriber(), cs,
- observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc));
-}
-template<class T, class OnNext, class OnError, class OnCompleted>
-auto make_subscriber(const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value &&
- detail::is_on_error<OnError>::value &&
- detail::is_on_completed<OnCompleted>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type {
- return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(trace_id::make_next_id_subscriber(), cs,
- observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc));
-}
-
-// explicit id
-//
-
-template<class T>
-auto make_subscriber(trace_id id)
- -> subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>> {
- return subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>>(std::move(id), composite_subscription(),
- observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>()));
-}
-
-template<class T>
-auto make_subscriber(trace_id id, const composite_subscription& cs)
- -> subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>> {
- return subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>>(std::move(id), cs,
- observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>()));
-}
-
-template<class T, class I>
-auto make_subscriber(trace_id id,
- const observer<T, I>& o)
- -> subscriber<T, observer<T, I>> {
- return subscriber<T, observer<T, I>>(std::move(id), composite_subscription(), o);
-}
-template<class T, class I>
-auto make_subscriber(trace_id id, const composite_subscription& cs,
- const observer<T, I>& o)
- -> subscriber<T, observer<T, I>> {
- return subscriber<T, observer<T, I>>(std::move(id), cs, o);
-}
-template<class T, class Observer>
-auto make_subscriber(trace_id id, const Observer& o)
- -> typename std::enable_if<
- is_observer<Observer>::value,
- subscriber<T, Observer>>::type {
- return subscriber<T, Observer>(std::move(id), composite_subscription(), o);
-}
-template<class T, class Observer>
-auto make_subscriber(trace_id id, const composite_subscription& cs, const Observer& o)
- -> typename std::enable_if<
- is_observer<Observer>::value,
- subscriber<T, Observer>>::type {
- return subscriber<T, Observer>(std::move(id), cs, o);
-}
-template<class T, class Observer>
-auto make_subscriber(trace_id id, const Observer& o)
- -> typename std::enable_if<
- !detail::is_on_next_of<T, Observer>::value &&
- !is_subscriber<Observer>::value &&
- !is_subscription<Observer>::value &&
- !is_observer<Observer>::value,
- subscriber<T, observer<T, Observer>>>::type {
- return subscriber<T, observer<T, Observer>>(std::move(id), composite_subscription(), o);
-}
-template<class T, class Observer>
-auto make_subscriber(trace_id id, const composite_subscription& cs, const Observer& o)
- -> typename std::enable_if<
- !detail::is_on_next_of<T, Observer>::value &&
- !is_subscriber<Observer>::value &&
- !is_subscription<Observer>::value &&
- !is_observer<Observer>::value,
- subscriber<T, observer<T, Observer>>>::type {
- return subscriber<T, observer<T, Observer>>(std::move(id), cs, o);
-}
-template<class T, class OnNext>
-auto make_subscriber(trace_id id, const OnNext& on)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type {
- return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(std::move(id), composite_subscription(),
- observer<T, detail::stateless_observer_tag, OnNext>(on));
-}
-template<class T, class OnNext>
-auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type {
- return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(std::move(id), cs,
- observer<T, detail::stateless_observer_tag, OnNext>(on));
-}
-template<class T, class OnNext, class OnError>
-auto make_subscriber(trace_id id, const OnNext& on, const OnError& oe)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value &&
- detail::is_on_error<OnError>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type {
- return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(std::move(id), composite_subscription(),
- observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe));
-}
-template<class T, class OnNext, class OnError>
-auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value &&
- detail::is_on_error<OnError>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type {
- return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(std::move(id), cs,
- observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe));
-}
-template<class T, class OnNext, class OnCompleted>
-auto make_subscriber(trace_id id, const OnNext& on, const OnCompleted& oc)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value &&
- detail::is_on_completed<OnCompleted>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type {
- return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(std::move(id), composite_subscription(),
- observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc));
-}
-template<class T, class OnNext, class OnCompleted>
-auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value &&
- detail::is_on_completed<OnCompleted>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type {
- return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(std::move(id), cs,
- observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc));
-}
-template<class T, class OnNext, class OnError, class OnCompleted>
-auto make_subscriber(trace_id id, const OnNext& on, const OnError& oe, const OnCompleted& oc)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value &&
- detail::is_on_error<OnError>::value &&
- detail::is_on_completed<OnCompleted>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type {
- return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(std::move(id), composite_subscription(),
- observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc));
-}
-template<class T, class OnNext, class OnError, class OnCompleted>
-auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value &&
- detail::is_on_error<OnError>::value &&
- detail::is_on_completed<OnCompleted>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type {
- return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(std::move(id), cs,
- observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc));
-}
-
-// chain defaults from subscriber
-//
-
-template<class T, class OtherT, class OtherObserver, class I>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr,
- const observer<T, I>& o)
- -> subscriber<T, observer<T, I>> {
- auto r = subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), o);
- trace_activity().connect(r, scbr);
- return r;
-}
-template<class T, class OtherT, class OtherObserver, class I>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id,
- const observer<T, I>& o)
- -> subscriber<T, observer<T, I>> {
- auto r = subscriber<T, observer<T, I>>(std::move(id), scbr.get_subscription(), o);
- trace_activity().connect(r, scbr);
- return r;
-}
-template<class T, class OtherT, class OtherObserver, class Observer>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const Observer& o)
- -> typename std::enable_if<
- is_observer<Observer>::value,
- subscriber<T, Observer>>::type {
- auto r = subscriber<T, Observer>(std::move(id), scbr.get_subscription(), o);
- trace_activity().connect(r, scbr);
- return r;
-}
-template<class T, class OtherT, class OtherObserver, class Observer>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const Observer& o)
- -> typename std::enable_if<
- !is_subscription<Observer>::value &&
- is_observer<Observer>::value,
- subscriber<T, Observer>>::type {
- auto r = subscriber<T, Observer>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), o);
- trace_activity().connect(r, scbr);
- return r;
-}
-template<class T, class OtherT, class OtherObserver, class Observer>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const Observer& o)
- -> typename std::enable_if<
- !detail::is_on_next_of<T, Observer>::value &&
- !is_subscriber<Observer>::value &&
- !is_subscription<Observer>::value &&
- !is_observer<Observer>::value,
- subscriber<T, observer<T, Observer>>>::type {
- auto r = subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), make_observer<T>(o));
- trace_activity().connect(r, scbr);
- return r;
-}
-template<class T, class OtherT, class OtherObserver, class Observer>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const Observer& o)
- -> typename std::enable_if<
- !detail::is_on_next_of<T, Observer>::value &&
- !is_subscriber<Observer>::value &&
- !is_subscription<Observer>::value &&
- !is_observer<Observer>::value,
- subscriber<T, observer<T, Observer>>>::type {
- auto r = subscriber<T, observer<T, Observer>>(std::move(id), scbr.get_subscription(), o);
- trace_activity().connect(r, scbr);
- return r;
-}
-template<class T, class OtherT, class OtherObserver, class OnNext>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type {
- auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(),
- observer<T, detail::stateless_observer_tag, OnNext>(on));
- trace_activity().connect(r, scbr);
- return r;
-}
-template<class T, class OtherT, class OtherObserver, class OnNext>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type {
- auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(std::move(id), scbr.get_subscription(),
- observer<T, detail::stateless_observer_tag, OnNext>(on));
- trace_activity().connect(r, scbr);
- return r;
-}
-template<class T, class OtherT, class OtherObserver, class OnNext, class OnError>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnError& oe)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value &&
- detail::is_on_error<OnError>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type {
- auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(),
- observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe));
- trace_activity().connect(r, scbr);
- return r;
-}
-template<class T, class OtherT, class OtherObserver, class OnNext, class OnError>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnError& oe)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value &&
- detail::is_on_error<OnError>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type {
- auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(std::move(id), scbr.get_subscription(),
- observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe));
- trace_activity().connect(r, scbr);
- return r;
-}
-template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnCompleted& oc)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value &&
- detail::is_on_completed<OnCompleted>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type {
- auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(),
- observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc));
- trace_activity().connect(r, scbr);
- return r;
-}
-template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnCompleted& oc)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value &&
- detail::is_on_completed<OnCompleted>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type {
- auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(std::move(id), scbr.get_subscription(),
- observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc));
- trace_activity().connect(r, scbr);
- return r;
-}
-template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnError& oe, const OnCompleted& oc)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value &&
- detail::is_on_error<OnError>::value &&
- detail::is_on_completed<OnCompleted>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type {
- auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(),
- observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc));
- trace_activity().connect(r, scbr);
- return r;
-}
-template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnError& oe, const OnCompleted& oc)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value &&
- detail::is_on_error<OnError>::value &&
- detail::is_on_completed<OnCompleted>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type {
- auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(std::move(id), scbr.get_subscription(),
- observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc));
- trace_activity().connect(r, scbr);
- return r;
-}
-
-template<class T, class OtherT, class OtherObserver, class I>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>& , const composite_subscription& cs,
- const observer<T, I>& o)
- -> subscriber<T, observer<T, I>> {
- return subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), cs, o);
-}
-template<class T, class OtherT, class OtherObserver, class I>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>&, trace_id id, const composite_subscription& cs,
- const observer<T, I>& o)
- -> subscriber<T, observer<T, I>> {
- return subscriber<T, observer<T, I>>(std::move(id), cs, o);
-}
-template<class T, class OtherT, class OtherObserver, class Observer>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const Observer& o)
- -> typename std::enable_if<
- is_observer<Observer>::value,
- subscriber<T, Observer>>::type {
- auto r = subscriber<T, Observer>(trace_id::make_next_id_subscriber(), cs, o);
- trace_activity().connect(r, scbr);
- return r;
-}
-template<class T, class OtherT, class OtherObserver, class Observer>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const Observer& o)
- -> typename std::enable_if<
- is_observer<Observer>::value,
- subscriber<T, Observer>>::type {
- auto r = subscriber<T, Observer>(std::move(id), cs, o);
- trace_activity().connect(r, scbr);
- return r;
-}
-template<class T, class OtherT, class OtherObserver, class Observer>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const Observer& o)
- -> typename std::enable_if<
- !detail::is_on_next_of<T, Observer>::value &&
- !is_subscriber<Observer>::value &&
- !is_subscription<Observer>::value &&
- !is_observer<Observer>::value,
- subscriber<T, observer<T, Observer>>>::type {
- auto r = subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), cs, o);
- trace_activity().connect(r, scbr);
- return r;
-}
-template<class T, class OtherT, class OtherObserver, class Observer>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const Observer& o)
- -> typename std::enable_if<
- !detail::is_on_next_of<T, Observer>::value &&
- !is_subscriber<Observer>::value &&
- !is_subscription<Observer>::value &&
- !is_observer<Observer>::value,
- subscriber<T, observer<T, Observer>>>::type {
- auto r = subscriber<T, observer<T, Observer>>(std::move(id), cs, o);
- trace_activity().connect(r, scbr);
- return r;
-}
-template<class T, class OtherT, class OtherObserver, class OnNext>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type {
- auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(trace_id::make_next_id_subscriber(), cs,
- observer<T, detail::stateless_observer_tag, OnNext>(on));
- trace_activity().connect(r, scbr);
- return r;
-}
-template<class T, class OtherT, class OtherObserver, class OnNext>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type {
- auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(std::move(id), cs,
- observer<T, detail::stateless_observer_tag, OnNext>(on));
- trace_activity().connect(r, scbr);
- return r;
-}
-template<class T, class OtherT, class OtherObserver, class OnNext, class OnError>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnError& oe)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value &&
- detail::is_on_error<OnError>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type {
- auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(trace_id::make_next_id_subscriber(), cs,
- observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe));
- trace_activity().connect(r, scbr);
- return r;
-}
-template<class T, class OtherT, class OtherObserver, class OnNext, class OnError>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value &&
- detail::is_on_error<OnError>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type {
- auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(std::move(id), cs,
- observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe));
- trace_activity().connect(r, scbr);
- return r;
-}
-template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value &&
- detail::is_on_completed<OnCompleted>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type {
- auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(trace_id::make_next_id_subscriber(), cs,
- observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc));
- trace_activity().connect(r, scbr);
- return r;
-}
-template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value &&
- detail::is_on_completed<OnCompleted>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type {
- auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(std::move(id), cs,
- observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc));
- trace_activity().connect(r, scbr);
- return r;
-}
-template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value &&
- detail::is_on_error<OnError>::value &&
- detail::is_on_completed<OnCompleted>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type {
- auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(trace_id::make_next_id_subscriber(), cs,
- observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc));
- trace_activity().connect(r, scbr);
- return r;
-}
-template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted>
-auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc)
- -> typename std::enable_if<
- detail::is_on_next_of<T, OnNext>::value &&
- detail::is_on_error<OnError>::value &&
- detail::is_on_completed<OnCompleted>::value,
- subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type {
- auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(std::move(id), cs,
- observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc));
- trace_activity().connect(r, scbr);
- return r;
-}
-
-template<class T, class Observer>
-auto make_subscriber(const subscriber<T, Observer>& scbr, const composite_subscription& cs)
- -> subscriber<T, Observer> {
- auto r = subscriber<T, Observer>(scbr.get_id(), cs, scbr.get_observer());
- trace_activity().connect(r, scbr);
- return r;
-}
-template<class T, class Observer>
-auto make_subscriber(const subscriber<T, Observer>& scbr, trace_id id, const composite_subscription& cs)
- -> subscriber<T, Observer> {
- auto r = subscriber<T, Observer>(std::move(id), cs, scbr.get_observer());
- trace_activity().connect(r, scbr);
- return r;
-}
-
-template<class T, class Observer>
-auto make_subscriber(const subscriber<T, Observer>& scbr, trace_id id)
- -> subscriber<T, Observer> {
- auto r = subscriber<T, Observer>(std::move(id), scbr.get_subscription(), scbr.get_observer());
- trace_activity().connect(r, scbr);
- return r;
-}
-
-}
-
-#endif