diff options
Diffstat (limited to 'Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp')
-rw-r--r-- | Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp | 186 |
1 files changed, 0 insertions, 186 deletions
diff --git a/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp b/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp deleted file mode 100644 index 020b0f8..0000000 --- a/Rx/v2/src/rxcpp/subjects/rx-replaysubject.hpp +++ /dev/null @@ -1,186 +0,0 @@ -// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. - -#pragma once - -#if !defined(RXCPP_RX_REPLAYSUBJECT_HPP) -#define RXCPP_RX_REPLAYSUBJECT_HPP - -#include "../rx-includes.hpp" - -namespace rxcpp { - -namespace subjects { - -namespace detail { - -template<class Coordination> -struct replay_traits -{ - typedef rxu::maybe<std::size_t> count_type; - typedef rxu::maybe<rxsc::scheduler::clock_type::duration> period_type; - typedef rxsc::scheduler::clock_type::time_point time_point_type; - typedef rxu::decay_t<Coordination> coordination_type; - typedef typename coordination_type::coordinator_type coordinator_type; -}; - -template<class T, class Coordination> -class replay_observer : public detail::multicast_observer<T> -{ - typedef replay_observer<T, Coordination> this_type; - typedef detail::multicast_observer<T> base_type; - - typedef replay_traits<Coordination> traits; - typedef typename traits::count_type count_type; - typedef typename traits::period_type period_type; - typedef typename traits::time_point_type time_point_type; - typedef typename traits::coordination_type coordination_type; - typedef typename traits::coordinator_type coordinator_type; - - class replay_observer_state : public std::enable_shared_from_this<replay_observer_state> - { - mutable std::mutex lock; - mutable std::list<T> values; - mutable std::list<time_point_type> time_points; - mutable count_type count; - mutable period_type period; - mutable composite_subscription replayLifetime; - public: - mutable coordination_type coordination; - mutable coordinator_type coordinator; - - private: - void remove_oldest() const { - values.pop_front(); - if (!period.empty()) { - time_points.pop_front(); - } - } - - public: - ~replay_observer_state(){ - replayLifetime.unsubscribe(); - } - explicit replay_observer_state(count_type _count, period_type _period, coordination_type _coordination, coordinator_type _coordinator, composite_subscription _replayLifetime) - : count(_count) - , period(_period) - , replayLifetime(_replayLifetime) - , coordination(std::move(_coordination)) - , coordinator(std::move(_coordinator)) - { - } - - void add(T v) const { - std::unique_lock<std::mutex> guard(lock); - - if (!count.empty()) { - if (values.size() == count.get()) - remove_oldest(); - } - - if (!period.empty()) { - auto now = coordination.now(); - while (!time_points.empty() && (now - time_points.front() > period.get())) - remove_oldest(); - time_points.push_back(now); - } - - values.push_back(std::move(v)); - } - std::list<T> get() const { - std::unique_lock<std::mutex> guard(lock); - return values; - } - }; - - std::shared_ptr<replay_observer_state> state; - -public: - replay_observer(count_type count, period_type period, coordination_type coordination, composite_subscription replayLifetime, composite_subscription subscriberLifetime) - : base_type(subscriberLifetime) - { - replayLifetime.add(subscriberLifetime); - auto coordinator = coordination.create_coordinator(replayLifetime); - state = std::make_shared<replay_observer_state>(std::move(count), std::move(period), std::move(coordination), std::move(coordinator), std::move(replayLifetime)); - } - - subscriber<T> get_subscriber() const { - return make_subscriber<T>(this->get_id(), this->get_subscription(), observer<T, detail::replay_observer<T, Coordination>>(*this)).as_dynamic(); - } - - std::list<T> get_values() const { - return state->get(); - } - - coordinator_type& get_coordinator() const { - return state->coordinator; - } - - template<class V> - void on_next(V v) const { - state->add(v); - base_type::on_next(std::move(v)); - } -}; - -} - -template<class T, class Coordination> -class replay -{ - typedef detail::replay_traits<Coordination> traits; - typedef typename traits::count_type count_type; - typedef typename traits::period_type period_type; - typedef typename traits::time_point_type time_point_type; - - detail::replay_observer<T, Coordination> s; - -public: - explicit replay(Coordination cn, composite_subscription cs = composite_subscription()) - : s(count_type(), period_type(), cn, cs, composite_subscription{}) - { - } - - replay(std::size_t count, Coordination cn, composite_subscription cs = composite_subscription()) - : s(count_type(std::move(count)), period_type(), cn, cs, composite_subscription{}) - { - } - - replay(rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs = composite_subscription()) - : s(count_type(), period_type(period), cn, cs, composite_subscription{}) - { - } - - replay(std::size_t count, rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs = composite_subscription()) - : s(count_type(count), period_type(period), cn, cs, composite_subscription{}) - { - } - - bool has_observers() const { - return s.has_observers(); - } - - std::list<T> get_values() const { - return s.get_values(); - } - - subscriber<T> get_subscriber() const { - return s.get_subscriber(); - } - - observable<T> get_observable() const { - auto keepAlive = s; - auto observable = make_observable_dynamic<T>([=](subscriber<T> o){ - for (auto&& value: get_values()) { - o.on_next(value); - } - keepAlive.add(keepAlive.get_subscriber(), std::move(o)); - }); - return s.get_coordinator().in(observable); - } -}; - -} - -} - -#endif |