diff options
Diffstat (limited to 'Rx/v2/src/rxcpp/operators/rx-observe_on.hpp')
-rw-r--r-- | Rx/v2/src/rxcpp/operators/rx-observe_on.hpp | 335 |
1 files changed, 0 insertions, 335 deletions
diff --git a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp b/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp deleted file mode 100644 index b50b773..0000000 --- a/Rx/v2/src/rxcpp/operators/rx-observe_on.hpp +++ /dev/null @@ -1,335 +0,0 @@ -// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. - -#pragma once - -/*! \file rx-observe_on.hpp - - \brief All values are queued and delivered using the scheduler from the supplied coordination. - - \tparam Coordination the type of the scheduler. - - \param cn the scheduler to notify observers on. - - \return The source observable modified so that its observers are notified on the specified scheduler. - - \sample - \snippet observe_on.cpp observe_on sample - \snippet output.txt observe_on sample - - Invoking rxcpp::observable::subscribe_on operator, instead of observe_on, gives following results: - \snippet output.txt subscribe_on sample -*/ - -#if !defined(RXCPP_OPERATORS_RX_OBSERVE_ON_HPP) -#define RXCPP_OPERATORS_RX_OBSERVE_ON_HPP - -#include "../rx-includes.hpp" - -namespace rxcpp { - -namespace operators { - -namespace detail { - -template<class... AN> -struct observe_on_invalid_arguments {}; - -template<class... AN> -struct observe_on_invalid : public rxo::operator_base<observe_on_invalid_arguments<AN...>> { - using type = observable<observe_on_invalid_arguments<AN...>, observe_on_invalid<AN...>>; -}; -template<class... AN> -using observe_on_invalid_t = typename observe_on_invalid<AN...>::type; - -template<class T, class Coordination> -struct observe_on -{ - typedef rxu::decay_t<T> source_value_type; - - typedef rxu::decay_t<Coordination> coordination_type; - typedef typename coordination_type::coordinator_type coordinator_type; - - coordination_type coordination; - - observe_on(coordination_type cn) - : coordination(std::move(cn)) - { - } - - template<class Subscriber> - struct observe_on_observer - { - typedef observe_on_observer<Subscriber> this_type; - typedef source_value_type value_type; - typedef rxu::decay_t<Subscriber> dest_type; - typedef observer<value_type, this_type> observer_type; - - typedef rxn::notification<T> notification_type; - typedef typename notification_type::type base_notification_type; - typedef std::deque<base_notification_type> queue_type; - - struct mode - { - enum type { - Invalid = 0, - Processing, - Empty, - Disposed, - Errored - }; - }; - struct observe_on_state : std::enable_shared_from_this<observe_on_state> - { - mutable std::mutex lock; - mutable queue_type fill_queue; - mutable queue_type drain_queue; - composite_subscription lifetime; - mutable typename mode::type current; - coordinator_type coordinator; - dest_type destination; - - observe_on_state(dest_type d, coordinator_type coor, composite_subscription cs) - : lifetime(std::move(cs)) - , current(mode::Empty) - , coordinator(std::move(coor)) - , destination(std::move(d)) - { - } - - void finish(std::unique_lock<std::mutex>& guard, typename mode::type end) const { - if (!guard.owns_lock()) { - std::terminate(); - } - if (current == mode::Errored || current == mode::Disposed) {return;} - current = end; - queue_type fill_expired; - swap(fill_expired, fill_queue); - queue_type drain_expired; - swap(drain_expired, drain_queue); - RXCPP_UNWIND_AUTO([&](){guard.lock();}); - guard.unlock(); - lifetime.unsubscribe(); - destination.unsubscribe(); - } - - void ensure_processing(std::unique_lock<std::mutex>& guard) const { - if (!guard.owns_lock()) { - std::terminate(); - } - if (current == mode::Empty) { - current = mode::Processing; - - if (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty()) { - finish(guard, mode::Disposed); - } - - auto keepAlive = this->shared_from_this(); - - auto drain = [keepAlive, this](const rxsc::schedulable& self){ - using std::swap; - RXCPP_TRY { - for (;;) { - if (drain_queue.empty() || !destination.is_subscribed()) { - std::unique_lock<std::mutex> guard(lock); - if (!destination.is_subscribed() || - (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty())) { - finish(guard, mode::Disposed); - return; - } - if (drain_queue.empty()) { - if (fill_queue.empty()) { - current = mode::Empty; - return; - } - swap(fill_queue, drain_queue); - } - } - auto notification = std::move(drain_queue.front()); - drain_queue.pop_front(); - notification->accept(destination); - std::unique_lock<std::mutex> guard(lock); - self(); - if (lifetime.is_subscribed()) break; - } - } - RXCPP_CATCH(...) { - destination.on_error(rxu::current_exception()); - std::unique_lock<std::mutex> guard(lock); - finish(guard, mode::Errored); - } - }; - - auto selectedDrain = on_exception( - [&](){return coordinator.act(drain);}, - destination); - if (selectedDrain.empty()) { - finish(guard, mode::Errored); - return; - } - - auto processor = coordinator.get_worker(); - - RXCPP_UNWIND_AUTO([&](){guard.lock();}); - guard.unlock(); - - processor.schedule(selectedDrain.get()); - } - } - }; - std::shared_ptr<observe_on_state> state; - - observe_on_observer(dest_type d, coordinator_type coor, composite_subscription cs) - : state(std::make_shared<observe_on_state>(std::move(d), std::move(coor), std::move(cs))) - { - } - - void on_next(source_value_type v) const { - std::unique_lock<std::mutex> guard(state->lock); - if (state->current == mode::Errored || state->current == mode::Disposed) { return; } - state->fill_queue.push_back(notification_type::on_next(std::move(v))); - state->ensure_processing(guard); - } - void on_error(rxu::error_ptr e) const { - std::unique_lock<std::mutex> guard(state->lock); - if (state->current == mode::Errored || state->current == mode::Disposed) { return; } - state->fill_queue.push_back(notification_type::on_error(e)); - state->ensure_processing(guard); - } - void on_completed() const { - std::unique_lock<std::mutex> guard(state->lock); - if (state->current == mode::Errored || state->current == mode::Disposed) { return; } - state->fill_queue.push_back(notification_type::on_completed()); - state->ensure_processing(guard); - } - - static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, coordination_type cn, composite_subscription cs = composite_subscription()) { - auto coor = cn.create_coordinator(d.get_subscription()); - d.add(cs); - - this_type o(d, std::move(coor), cs); - auto keepAlive = o.state; - cs.add([=](){ - std::unique_lock<std::mutex> guard(keepAlive->lock); - keepAlive->ensure_processing(guard); - }); - - return make_subscriber<value_type>(d, cs, make_observer<value_type>(std::move(o))); - } - }; - - template<class Subscriber> - auto operator()(Subscriber dest) const - -> decltype(observe_on_observer<decltype(dest.as_dynamic())>::make(dest.as_dynamic(), coordination)) { - return observe_on_observer<decltype(dest.as_dynamic())>::make(dest.as_dynamic(), coordination); - } -}; - -} - -/*! @copydoc rx-observe_on.hpp -*/ -template<class... AN> -auto observe_on(AN&&... an) - -> operator_factory<observe_on_tag, AN...> { - return operator_factory<observe_on_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); -} - -} - -template<> -struct member_overload<observe_on_tag> -{ - template<class Observable, class Coordination, - class Enabled = rxu::enable_if_all_true_type_t< - is_observable<Observable>, - is_coordination<Coordination>>, - class SourceValue = rxu::value_type_t<Observable>, - class ObserveOn = rxo::detail::observe_on<SourceValue, rxu::decay_t<Coordination>>> - static auto member(Observable&& o, Coordination&& cn) - -> decltype(o.template lift<SourceValue>(ObserveOn(std::forward<Coordination>(cn)))) { - return o.template lift<SourceValue>(ObserveOn(std::forward<Coordination>(cn))); - } - - template<class... AN> - static operators::detail::observe_on_invalid_t<AN...> member(AN...) { - std::terminate(); - return {}; - static_assert(sizeof...(AN) == 10000, "observe_on takes (Coordination)"); - } -}; - -class observe_on_one_worker : public coordination_base -{ - rxsc::scheduler factory; - - class input_type - { - rxsc::worker controller; - rxsc::scheduler factory; - identity_one_worker coordination; - public: - explicit input_type(rxsc::worker w) - : controller(w) - , factory(rxsc::make_same_worker(w)) - , coordination(factory) - { - } - inline rxsc::worker get_worker() const { - return controller; - } - inline rxsc::scheduler get_scheduler() const { - return factory; - } - inline rxsc::scheduler::clock_type::time_point now() const { - return factory.now(); - } - template<class Observable> - auto in(Observable o) const - -> decltype(o.observe_on(coordination)) { - return o.observe_on(coordination); - } - template<class Subscriber> - auto out(Subscriber s) const - -> Subscriber { - return s; - } - template<class F> - auto act(F f) const - -> F { - return f; - } - }; - -public: - - explicit observe_on_one_worker(rxsc::scheduler sc) : factory(sc) {} - - typedef coordinator<input_type> coordinator_type; - - inline rxsc::scheduler::clock_type::time_point now() const { - return factory.now(); - } - - inline coordinator_type create_coordinator(composite_subscription cs = composite_subscription()) const { - auto w = factory.create_worker(std::move(cs)); - return coordinator_type(input_type(std::move(w))); - } -}; - -inline observe_on_one_worker observe_on_run_loop(const rxsc::run_loop& rl) { - return observe_on_one_worker(rxsc::make_run_loop(rl)); -} - -inline observe_on_one_worker observe_on_event_loop() { - static observe_on_one_worker r(rxsc::make_event_loop()); - return r; -} - -inline observe_on_one_worker observe_on_new_thread() { - static observe_on_one_worker r(rxsc::make_new_thread()); - return r; -} - -} - -#endif |