diff options
Diffstat (limited to 'Rx/v2/src/rxcpp/rx-subscriber.hpp')
-rw-r--r-- | Rx/v2/src/rxcpp/rx-subscriber.hpp | 835 |
1 files changed, 0 insertions, 835 deletions
diff --git a/Rx/v2/src/rxcpp/rx-subscriber.hpp b/Rx/v2/src/rxcpp/rx-subscriber.hpp deleted file mode 100644 index 3d6c515..0000000 --- a/Rx/v2/src/rxcpp/rx-subscriber.hpp +++ /dev/null @@ -1,835 +0,0 @@ -// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. - -#pragma once - -#if !defined(RXCPP_RX_SUBSCRIBER_HPP) -#define RXCPP_RX_SUBSCRIBER_HPP - -#include "rx-includes.hpp" - -namespace rxcpp { - -template<class T> -struct subscriber_base : public observer_base<T>, public subscription_base -{ - typedef tag_subscriber subscriber_tag; -}; - -/*! - \brief binds an observer that consumes values with a composite_subscription that controls lifetime. - - \ingroup group-core - -*/ -template<class T, class Observer = observer<T>> -class subscriber : public subscriber_base<T> -{ - static_assert(!is_subscriber<Observer>::value, "not allowed to nest subscribers"); - static_assert(is_observer<Observer>::value, "subscriber must contain an observer<T, ...>"); - typedef subscriber<T, Observer> this_type; - typedef rxu::decay_t<Observer> observer_type; - - composite_subscription lifetime; - observer_type destination; - trace_id id; - - struct nextdetacher - { - ~nextdetacher() - { - trace_activity().on_next_return(*that); - if (do_unsubscribe) { - that->unsubscribe(); - } - } - nextdetacher(const this_type* that) - : that(that) - , do_unsubscribe(true) - { - } - template<class U> - void operator()(U u) { - trace_activity().on_next_enter(*that, u); - RXCPP_TRY { - that->destination.on_next(std::move(u)); - do_unsubscribe = false; - } RXCPP_CATCH(...) { - auto ex = rxu::current_exception(); - trace_activity().on_error_enter(*that, ex); - that->destination.on_error(std::move(ex)); - trace_activity().on_error_return(*that); - } - } - const this_type* that; - volatile bool do_unsubscribe; - }; - - struct errordetacher - { - ~errordetacher() - { - trace_activity().on_error_return(*that); - that->unsubscribe(); - } - errordetacher(const this_type* that) - : that(that) - { - } - inline void operator()(rxu::error_ptr ex) { - trace_activity().on_error_enter(*that, ex); - that->destination.on_error(std::move(ex)); - } - const this_type* that; - }; - - struct completeddetacher - { - ~completeddetacher() - { - trace_activity().on_completed_return(*that); - that->unsubscribe(); - } - completeddetacher(const this_type* that) - : that(that) - { - } - inline void operator()() { - trace_activity().on_completed_enter(*that); - that->destination.on_completed(); - } - const this_type* that; - }; - - subscriber(); -public: - typedef typename composite_subscription::weak_subscription weak_subscription; - - subscriber(const this_type& o) - : lifetime(o.lifetime) - , destination(o.destination) - , id(o.id) - { - } - subscriber(this_type&& o) - : lifetime(std::move(o.lifetime)) - , destination(std::move(o.destination)) - , id(std::move(o.id)) - { - } - - template<class U, class O> - friend class subscriber; - - template<class O> - subscriber( - const subscriber<T, O>& o, - typename std::enable_if< - !std::is_same<O, observer<T>>::value && - std::is_same<Observer, observer<T>>::value, void**>::type = nullptr) - : lifetime(o.lifetime) - , destination(o.destination.as_dynamic()) - , id(o.id) - { - } - - template<class U> - subscriber(trace_id id, composite_subscription cs, U&& o) - : lifetime(std::move(cs)) - , destination(std::forward<U>(o)) - , id(std::move(id)) - { - static_assert(!is_subscriber<U>::value, "cannot nest subscribers"); - static_assert(is_observer<U>::value, "must pass observer to subscriber"); - trace_activity().create_subscriber(*this); - } - - this_type& operator=(this_type o) { - lifetime = std::move(o.lifetime); - destination = std::move(o.destination); - id = std::move(o.id); - return *this; - } - - const observer_type& get_observer() const { - return destination; - } - observer_type& get_observer() { - return destination; - } - const composite_subscription& get_subscription() const { - return lifetime; - } - composite_subscription& get_subscription() { - return lifetime; - } - trace_id get_id() const { - return id; - } - - subscriber<T> as_dynamic() const { - return subscriber<T>(id, lifetime, destination.as_dynamic()); - } - - // observer - // - template<class V> - void on_next(V&& v) const { - if (!is_subscribed()) { - return; - } - nextdetacher protect(this); - protect(std::forward<V>(v)); - } - void on_error(rxu::error_ptr e) const { - if (!is_subscribed()) { - return; - } - errordetacher protect(this); - protect(std::move(e)); - } - void on_completed() const { - if (!is_subscribed()) { - return; - } - completeddetacher protect(this); - protect(); - } - - // composite_subscription - // - bool is_subscribed() const { - return lifetime.is_subscribed(); - } - weak_subscription add(subscription s) const { - return lifetime.add(std::move(s)); - } - template<class F> - auto add(F f) const - -> typename std::enable_if<detail::is_unsubscribe_function<F>::value, weak_subscription>::type { - return lifetime.add(make_subscription(std::move(f))); - } - void remove(weak_subscription w) const { - return lifetime.remove(std::move(w)); - } - void clear() const { - return lifetime.clear(); - } - void unsubscribe() const { - return lifetime.unsubscribe(); - } - -}; - -template<class T, class Observer> -auto make_subscriber( - subscriber<T, Observer> o) - -> subscriber<T, Observer> { - return subscriber<T, Observer>(std::move(o)); -} - -// observer -// - -template<class T> -auto make_subscriber() - -> typename std::enable_if< - detail::is_on_next_of<T, detail::OnNextEmpty<T>>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>>>::type { - return subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>>(trace_id::make_next_id_subscriber(), composite_subscription(), - observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>())); -} - -template<class T, class I> -auto make_subscriber( - const observer<T, I>& o) - -> subscriber<T, observer<T, I>> { - return subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), composite_subscription(), o); -} -template<class T, class Observer> -auto make_subscriber(const Observer& o) - -> typename std::enable_if< - is_observer<Observer>::value && - !is_subscriber<Observer>::value, - subscriber<T, Observer>>::type { - return subscriber<T, Observer>(trace_id::make_next_id_subscriber(), composite_subscription(), o); -} -template<class T, class Observer> -auto make_subscriber(const Observer& o) - -> typename std::enable_if< - !detail::is_on_next_of<T, Observer>::value && - !is_subscriber<Observer>::value && - !is_subscription<Observer>::value && - !is_observer<Observer>::value, - subscriber<T, observer<T, Observer>>>::type { - return subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), composite_subscription(), o); -} -template<class T, class OnNext> -auto make_subscriber(const OnNext& on) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type { - return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(trace_id::make_next_id_subscriber(), composite_subscription(), - observer<T, detail::stateless_observer_tag, OnNext>(on)); -} -template<class T, class OnNext, class OnError> -auto make_subscriber(const OnNext& on, const OnError& oe) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value && - detail::is_on_error<OnError>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type { - return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(trace_id::make_next_id_subscriber(), composite_subscription(), - observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe)); -} -template<class T, class OnNext, class OnCompleted> -auto make_subscriber(const OnNext& on, const OnCompleted& oc) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value && - detail::is_on_completed<OnCompleted>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type { - return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(trace_id::make_next_id_subscriber(), composite_subscription(), - observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)); -} -template<class T, class OnNext, class OnError, class OnCompleted> -auto make_subscriber(const OnNext& on, const OnError& oe, const OnCompleted& oc) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value && - detail::is_on_error<OnError>::value && - detail::is_on_completed<OnCompleted>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type { - return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(trace_id::make_next_id_subscriber(), composite_subscription(), - observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc)); -} - -// explicit lifetime -// - -template<class T> -auto make_subscriber(const composite_subscription& cs) - -> subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>> { - return subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>>(trace_id::make_next_id_subscriber(), cs, - observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>())); -} - -template<class T, class I> -auto make_subscriber(const composite_subscription& cs, - const observer<T, I>& o) - -> subscriber<T, observer<T, I>> { - return subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), cs, o); -} -template<class T, class I> -auto make_subscriber(const composite_subscription& cs, - const subscriber<T, I>& s) - -> subscriber<T, I> { - return subscriber<T, I>(trace_id::make_next_id_subscriber(), cs, s.get_observer()); -} -template<class T, class Observer> -auto make_subscriber(const composite_subscription& cs, const Observer& o) - -> typename std::enable_if< - !is_subscriber<Observer>::value && - is_observer<Observer>::value, - subscriber<T, Observer>>::type { - return subscriber<T, Observer>(trace_id::make_next_id_subscriber(), cs, o); -} -template<class T, class Observer> -auto make_subscriber(const composite_subscription& cs, const Observer& o) - -> typename std::enable_if< - !detail::is_on_next_of<T, Observer>::value && - !is_subscriber<Observer>::value && - !is_subscription<Observer>::value && - !is_observer<Observer>::value, - subscriber<T, observer<T, Observer>>>::type { - return subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), cs, make_observer<T>(o)); -} -template<class T, class OnNext> -auto make_subscriber(const composite_subscription& cs, const OnNext& on) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type { - return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(trace_id::make_next_id_subscriber(), cs, - observer<T, detail::stateless_observer_tag, OnNext>(on)); -} -template<class T, class OnNext, class OnError> -auto make_subscriber(const composite_subscription& cs, const OnNext& on, const OnError& oe) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value && - detail::is_on_error<OnError>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type { - return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(trace_id::make_next_id_subscriber(), cs, - observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe)); -} -template<class T, class OnNext, class OnCompleted> -auto make_subscriber(const composite_subscription& cs, const OnNext& on, const OnCompleted& oc) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value && - detail::is_on_completed<OnCompleted>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type { - return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(trace_id::make_next_id_subscriber(), cs, - observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)); -} -template<class T, class OnNext, class OnError, class OnCompleted> -auto make_subscriber(const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value && - detail::is_on_error<OnError>::value && - detail::is_on_completed<OnCompleted>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type { - return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(trace_id::make_next_id_subscriber(), cs, - observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc)); -} - -// explicit id -// - -template<class T> -auto make_subscriber(trace_id id) - -> subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>> { - return subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>>(std::move(id), composite_subscription(), - observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>())); -} - -template<class T> -auto make_subscriber(trace_id id, const composite_subscription& cs) - -> subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>> { - return subscriber<T, observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>>(std::move(id), cs, - observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>>(detail::OnNextEmpty<T>())); -} - -template<class T, class I> -auto make_subscriber(trace_id id, - const observer<T, I>& o) - -> subscriber<T, observer<T, I>> { - return subscriber<T, observer<T, I>>(std::move(id), composite_subscription(), o); -} -template<class T, class I> -auto make_subscriber(trace_id id, const composite_subscription& cs, - const observer<T, I>& o) - -> subscriber<T, observer<T, I>> { - return subscriber<T, observer<T, I>>(std::move(id), cs, o); -} -template<class T, class Observer> -auto make_subscriber(trace_id id, const Observer& o) - -> typename std::enable_if< - is_observer<Observer>::value, - subscriber<T, Observer>>::type { - return subscriber<T, Observer>(std::move(id), composite_subscription(), o); -} -template<class T, class Observer> -auto make_subscriber(trace_id id, const composite_subscription& cs, const Observer& o) - -> typename std::enable_if< - is_observer<Observer>::value, - subscriber<T, Observer>>::type { - return subscriber<T, Observer>(std::move(id), cs, o); -} -template<class T, class Observer> -auto make_subscriber(trace_id id, const Observer& o) - -> typename std::enable_if< - !detail::is_on_next_of<T, Observer>::value && - !is_subscriber<Observer>::value && - !is_subscription<Observer>::value && - !is_observer<Observer>::value, - subscriber<T, observer<T, Observer>>>::type { - return subscriber<T, observer<T, Observer>>(std::move(id), composite_subscription(), o); -} -template<class T, class Observer> -auto make_subscriber(trace_id id, const composite_subscription& cs, const Observer& o) - -> typename std::enable_if< - !detail::is_on_next_of<T, Observer>::value && - !is_subscriber<Observer>::value && - !is_subscription<Observer>::value && - !is_observer<Observer>::value, - subscriber<T, observer<T, Observer>>>::type { - return subscriber<T, observer<T, Observer>>(std::move(id), cs, o); -} -template<class T, class OnNext> -auto make_subscriber(trace_id id, const OnNext& on) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type { - return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(std::move(id), composite_subscription(), - observer<T, detail::stateless_observer_tag, OnNext>(on)); -} -template<class T, class OnNext> -auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type { - return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(std::move(id), cs, - observer<T, detail::stateless_observer_tag, OnNext>(on)); -} -template<class T, class OnNext, class OnError> -auto make_subscriber(trace_id id, const OnNext& on, const OnError& oe) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value && - detail::is_on_error<OnError>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type { - return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(std::move(id), composite_subscription(), - observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe)); -} -template<class T, class OnNext, class OnError> -auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value && - detail::is_on_error<OnError>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type { - return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(std::move(id), cs, - observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe)); -} -template<class T, class OnNext, class OnCompleted> -auto make_subscriber(trace_id id, const OnNext& on, const OnCompleted& oc) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value && - detail::is_on_completed<OnCompleted>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type { - return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(std::move(id), composite_subscription(), - observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)); -} -template<class T, class OnNext, class OnCompleted> -auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value && - detail::is_on_completed<OnCompleted>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type { - return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(std::move(id), cs, - observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)); -} -template<class T, class OnNext, class OnError, class OnCompleted> -auto make_subscriber(trace_id id, const OnNext& on, const OnError& oe, const OnCompleted& oc) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value && - detail::is_on_error<OnError>::value && - detail::is_on_completed<OnCompleted>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type { - return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(std::move(id), composite_subscription(), - observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc)); -} -template<class T, class OnNext, class OnError, class OnCompleted> -auto make_subscriber(trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value && - detail::is_on_error<OnError>::value && - detail::is_on_completed<OnCompleted>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type { - return subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(std::move(id), cs, - observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc)); -} - -// chain defaults from subscriber -// - -template<class T, class OtherT, class OtherObserver, class I> -auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, - const observer<T, I>& o) - -> subscriber<T, observer<T, I>> { - auto r = subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), o); - trace_activity().connect(r, scbr); - return r; -} -template<class T, class OtherT, class OtherObserver, class I> -auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, - const observer<T, I>& o) - -> subscriber<T, observer<T, I>> { - auto r = subscriber<T, observer<T, I>>(std::move(id), scbr.get_subscription(), o); - trace_activity().connect(r, scbr); - return r; -} -template<class T, class OtherT, class OtherObserver, class Observer> -auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const Observer& o) - -> typename std::enable_if< - is_observer<Observer>::value, - subscriber<T, Observer>>::type { - auto r = subscriber<T, Observer>(std::move(id), scbr.get_subscription(), o); - trace_activity().connect(r, scbr); - return r; -} -template<class T, class OtherT, class OtherObserver, class Observer> -auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const Observer& o) - -> typename std::enable_if< - !is_subscription<Observer>::value && - is_observer<Observer>::value, - subscriber<T, Observer>>::type { - auto r = subscriber<T, Observer>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), o); - trace_activity().connect(r, scbr); - return r; -} -template<class T, class OtherT, class OtherObserver, class Observer> -auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const Observer& o) - -> typename std::enable_if< - !detail::is_on_next_of<T, Observer>::value && - !is_subscriber<Observer>::value && - !is_subscription<Observer>::value && - !is_observer<Observer>::value, - subscriber<T, observer<T, Observer>>>::type { - auto r = subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), make_observer<T>(o)); - trace_activity().connect(r, scbr); - return r; -} -template<class T, class OtherT, class OtherObserver, class Observer> -auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const Observer& o) - -> typename std::enable_if< - !detail::is_on_next_of<T, Observer>::value && - !is_subscriber<Observer>::value && - !is_subscription<Observer>::value && - !is_observer<Observer>::value, - subscriber<T, observer<T, Observer>>>::type { - auto r = subscriber<T, observer<T, Observer>>(std::move(id), scbr.get_subscription(), o); - trace_activity().connect(r, scbr); - return r; -} -template<class T, class OtherT, class OtherObserver, class OnNext> -auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type { - auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), - observer<T, detail::stateless_observer_tag, OnNext>(on)); - trace_activity().connect(r, scbr); - return r; -} -template<class T, class OtherT, class OtherObserver, class OnNext> -auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type { - auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(std::move(id), scbr.get_subscription(), - observer<T, detail::stateless_observer_tag, OnNext>(on)); - trace_activity().connect(r, scbr); - return r; -} -template<class T, class OtherT, class OtherObserver, class OnNext, class OnError> -auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnError& oe) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value && - detail::is_on_error<OnError>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type { - auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), - observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe)); - trace_activity().connect(r, scbr); - return r; -} -template<class T, class OtherT, class OtherObserver, class OnNext, class OnError> -auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnError& oe) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value && - detail::is_on_error<OnError>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type { - auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(std::move(id), scbr.get_subscription(), - observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe)); - trace_activity().connect(r, scbr); - return r; -} -template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted> -auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnCompleted& oc) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value && - detail::is_on_completed<OnCompleted>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type { - auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), - observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)); - trace_activity().connect(r, scbr); - return r; -} -template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted> -auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnCompleted& oc) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value && - detail::is_on_completed<OnCompleted>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type { - auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(std::move(id), scbr.get_subscription(), - observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)); - trace_activity().connect(r, scbr); - return r; -} -template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted> -auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const OnNext& on, const OnError& oe, const OnCompleted& oc) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value && - detail::is_on_error<OnError>::value && - detail::is_on_completed<OnCompleted>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type { - auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(trace_id::make_next_id_subscriber(), scbr.get_subscription(), - observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc)); - trace_activity().connect(r, scbr); - return r; -} -template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted> -auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const OnNext& on, const OnError& oe, const OnCompleted& oc) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value && - detail::is_on_error<OnError>::value && - detail::is_on_completed<OnCompleted>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type { - auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(std::move(id), scbr.get_subscription(), - observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc)); - trace_activity().connect(r, scbr); - return r; -} - -template<class T, class OtherT, class OtherObserver, class I> -auto make_subscriber(const subscriber<OtherT, OtherObserver>& , const composite_subscription& cs, - const observer<T, I>& o) - -> subscriber<T, observer<T, I>> { - return subscriber<T, observer<T, I>>(trace_id::make_next_id_subscriber(), cs, o); -} -template<class T, class OtherT, class OtherObserver, class I> -auto make_subscriber(const subscriber<OtherT, OtherObserver>&, trace_id id, const composite_subscription& cs, - const observer<T, I>& o) - -> subscriber<T, observer<T, I>> { - return subscriber<T, observer<T, I>>(std::move(id), cs, o); -} -template<class T, class OtherT, class OtherObserver, class Observer> -auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const Observer& o) - -> typename std::enable_if< - is_observer<Observer>::value, - subscriber<T, Observer>>::type { - auto r = subscriber<T, Observer>(trace_id::make_next_id_subscriber(), cs, o); - trace_activity().connect(r, scbr); - return r; -} -template<class T, class OtherT, class OtherObserver, class Observer> -auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const Observer& o) - -> typename std::enable_if< - is_observer<Observer>::value, - subscriber<T, Observer>>::type { - auto r = subscriber<T, Observer>(std::move(id), cs, o); - trace_activity().connect(r, scbr); - return r; -} -template<class T, class OtherT, class OtherObserver, class Observer> -auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const Observer& o) - -> typename std::enable_if< - !detail::is_on_next_of<T, Observer>::value && - !is_subscriber<Observer>::value && - !is_subscription<Observer>::value && - !is_observer<Observer>::value, - subscriber<T, observer<T, Observer>>>::type { - auto r = subscriber<T, observer<T, Observer>>(trace_id::make_next_id_subscriber(), cs, o); - trace_activity().connect(r, scbr); - return r; -} -template<class T, class OtherT, class OtherObserver, class Observer> -auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const Observer& o) - -> typename std::enable_if< - !detail::is_on_next_of<T, Observer>::value && - !is_subscriber<Observer>::value && - !is_subscription<Observer>::value && - !is_observer<Observer>::value, - subscriber<T, observer<T, Observer>>>::type { - auto r = subscriber<T, observer<T, Observer>>(std::move(id), cs, o); - trace_activity().connect(r, scbr); - return r; -} -template<class T, class OtherT, class OtherObserver, class OnNext> -auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type { - auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(trace_id::make_next_id_subscriber(), cs, - observer<T, detail::stateless_observer_tag, OnNext>(on)); - trace_activity().connect(r, scbr); - return r; -} -template<class T, class OtherT, class OtherObserver, class OnNext> -auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>>::type { - auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext>>(std::move(id), cs, - observer<T, detail::stateless_observer_tag, OnNext>(on)); - trace_activity().connect(r, scbr); - return r; -} -template<class T, class OtherT, class OtherObserver, class OnNext, class OnError> -auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnError& oe) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value && - detail::is_on_error<OnError>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type { - auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(trace_id::make_next_id_subscriber(), cs, - observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe)); - trace_activity().connect(r, scbr); - return r; -} -template<class T, class OtherT, class OtherObserver, class OnNext, class OnError> -auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value && - detail::is_on_error<OnError>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>>::type { - auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError>>(std::move(id), cs, - observer<T, detail::stateless_observer_tag, OnNext, OnError>(on, oe)); - trace_activity().connect(r, scbr); - return r; -} -template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted> -auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value && - detail::is_on_completed<OnCompleted>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type { - auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(trace_id::make_next_id_subscriber(), cs, - observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)); - trace_activity().connect(r, scbr); - return r; -} -template<class T, class OtherT, class OtherObserver, class OnNext, class OnCompleted> -auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnCompleted& oc) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value && - detail::is_on_completed<OnCompleted>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>>::type { - auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>>(std::move(id), cs, - observer<T, detail::stateless_observer_tag, OnNext, detail::OnErrorEmpty, OnCompleted>(on, detail::OnErrorEmpty(), oc)); - trace_activity().connect(r, scbr); - return r; -} -template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted> -auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value && - detail::is_on_error<OnError>::value && - detail::is_on_completed<OnCompleted>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type { - auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(trace_id::make_next_id_subscriber(), cs, - observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc)); - trace_activity().connect(r, scbr); - return r; -} -template<class T, class OtherT, class OtherObserver, class OnNext, class OnError, class OnCompleted> -auto make_subscriber(const subscriber<OtherT, OtherObserver>& scbr, trace_id id, const composite_subscription& cs, const OnNext& on, const OnError& oe, const OnCompleted& oc) - -> typename std::enable_if< - detail::is_on_next_of<T, OnNext>::value && - detail::is_on_error<OnError>::value && - detail::is_on_completed<OnCompleted>::value, - subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>>::type { - auto r = subscriber<T, observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>(std::move(id), cs, - observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(on, oe, oc)); - trace_activity().connect(r, scbr); - return r; -} - -template<class T, class Observer> -auto make_subscriber(const subscriber<T, Observer>& scbr, const composite_subscription& cs) - -> subscriber<T, Observer> { - auto r = subscriber<T, Observer>(scbr.get_id(), cs, scbr.get_observer()); - trace_activity().connect(r, scbr); - return r; -} -template<class T, class Observer> -auto make_subscriber(const subscriber<T, Observer>& scbr, trace_id id, const composite_subscription& cs) - -> subscriber<T, Observer> { - auto r = subscriber<T, Observer>(std::move(id), cs, scbr.get_observer()); - trace_activity().connect(r, scbr); - return r; -} - -template<class T, class Observer> -auto make_subscriber(const subscriber<T, Observer>& scbr, trace_id id) - -> subscriber<T, Observer> { - auto r = subscriber<T, Observer>(std::move(id), scbr.get_subscription(), scbr.get_observer()); - trace_activity().connect(r, scbr); - return r; -} - -} - -#endif |