summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/rx-scheduler.hpp
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/v2/src/rxcpp/rx-scheduler.hpp')
-rw-r--r--Rx/v2/src/rxcpp/rx-scheduler.hpp961
1 files changed, 0 insertions, 961 deletions
diff --git a/Rx/v2/src/rxcpp/rx-scheduler.hpp b/Rx/v2/src/rxcpp/rx-scheduler.hpp
deleted file mode 100644
index fc68979..0000000
--- a/Rx/v2/src/rxcpp/rx-scheduler.hpp
+++ /dev/null
@@ -1,961 +0,0 @@
-// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
-
-#pragma once
-
-#if !defined(RXCPP_RX_SCHEDULER_HPP)
-#define RXCPP_RX_SCHEDULER_HPP
-
-#include "rx-includes.hpp"
-
-namespace rxcpp {
-
-namespace schedulers {
-
-class worker_interface;
-class scheduler_interface;
-
-namespace detail {
-
-class action_type;
-typedef std::shared_ptr<action_type> action_ptr;
-
-typedef std::shared_ptr<worker_interface> worker_interface_ptr;
-typedef std::shared_ptr<const worker_interface> const_worker_interface_ptr;
-
-typedef std::weak_ptr<worker_interface> worker_interface_weak_ptr;
-typedef std::weak_ptr<const worker_interface> const_worker_interface_weak_ptr;
-
-typedef std::shared_ptr<scheduler_interface> scheduler_interface_ptr;
-typedef std::shared_ptr<const scheduler_interface> const_scheduler_interface_ptr;
-
-inline action_ptr shared_empty() {
- static action_ptr shared_empty = std::make_shared<detail::action_type>();
- return shared_empty;
-}
-
-}
-
-// It is essential to keep virtual function calls out of an inner loop.
-// To make tail-recursion work efficiently the recursion objects create
-// a space on the stack inside the virtual function call in the actor that
-// allows the callback and the scheduler to share stack space that records
-// the request and the allowance without any virtual calls in the loop.
-
-/// recursed is set on a schedulable by the action to allow the called
-/// function to request to be rescheduled.
-class recursed
-{
- bool& isrequested;
- recursed operator=(const recursed&);
-public:
- explicit recursed(bool& r)
- : isrequested(r)
- {
- }
- /// request to be rescheduled
- inline void operator()() const {
- isrequested = true;
- }
-};
-
-/// recurse is passed to the action by the scheduler.
-/// the action uses recurse to coordinate the scheduler and the function.
-class recurse
-{
- bool& isallowed;
- mutable bool isrequested;
- recursed requestor;
- recurse operator=(const recurse&);
-public:
- explicit recurse(bool& a)
- : isallowed(a)
- , isrequested(true)
- , requestor(isrequested)
- {
- }
- /// does the scheduler allow tail-recursion now?
- inline bool is_allowed() const {
- return isallowed;
- }
- /// did the function request to be recursed?
- inline bool is_requested() const {
- return isrequested;
- }
- /// reset the function request. call before each call to the function.
- inline void reset() const {
- isrequested = false;
- }
- /// get the recursed to set into the schedulable for the function to use to request recursion
- inline const recursed& get_recursed() const {
- return requestor;
- }
-};
-
-/// recursion is used by the scheduler to signal to each action whether tail recursion is allowed.
-class recursion
-{
- mutable bool isallowed;
- recurse recursor;
- recursion operator=(const recursion&);
-public:
- recursion()
- : isallowed(true)
- , recursor(isallowed)
- {
- }
- explicit recursion(bool b)
- : isallowed(b)
- , recursor(isallowed)
- {
- }
- /// set whether tail-recursion is allowed
- inline void reset(bool b = true) const {
- isallowed = b;
- }
- /// get the recurse to pass into each action being called
- inline const recurse& get_recurse() const {
- return recursor;
- }
-};
-
-
-struct action_base
-{
- typedef tag_action action_tag;
-};
-
-class schedulable;
-
-/// action provides type-forgetting for a potentially recursive set of calls to a function that takes a schedulable
-class action : public action_base
-{
- typedef action this_type;
- detail::action_ptr inner;
-public:
- action()
- {
- }
- explicit action(detail::action_ptr i)
- : inner(std::move(i))
- {
- }
-
- /// return the empty action
- inline static action empty() {
- return action(detail::shared_empty());
- }
-
- /// call the function
- inline void operator()(const schedulable& s, const recurse& r) const;
-};
-
-struct scheduler_base
-{
- typedef std::chrono::steady_clock clock_type;
- typedef tag_scheduler scheduler_tag;
-};
-
-struct worker_base : public subscription_base
-{
- typedef tag_worker worker_tag;
-};
-
-class worker_interface
- : public std::enable_shared_from_this<worker_interface>
-{
- typedef worker_interface this_type;
-
-public:
- typedef scheduler_base::clock_type clock_type;
-
- virtual ~worker_interface() {}
-
- virtual clock_type::time_point now() const = 0;
-
- virtual void schedule(const schedulable& scbl) const = 0;
- virtual void schedule(clock_type::time_point when, const schedulable& scbl) const = 0;
-};
-
-namespace detail {
-
-template<class F>
-struct is_action_function
-{
- struct not_void {};
- template<class CF>
- static auto check(int) -> decltype((*(CF*)nullptr)(*(schedulable*)nullptr));
- template<class CF>
- static not_void check(...);
-
- static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
-};
-
-}
-
-class weak_worker;
-
-/// a worker ensures that all scheduled actions on the same instance are executed in-order with no overlap
-/// a worker ensures that all scheduled actions are unsubscribed when it is unsubscribed
-/// some inner implementations will impose additional constraints on the execution of items.
-class worker : public worker_base
-{
- typedef worker this_type;
- detail::worker_interface_ptr inner;
- composite_subscription lifetime;
- friend bool operator==(const worker&, const worker&);
- friend class weak_worker;
-public:
- typedef scheduler_base::clock_type clock_type;
- typedef composite_subscription::weak_subscription weak_subscription;
-
- worker()
- {
- }
- worker(composite_subscription cs, detail::const_worker_interface_ptr i)
- : inner(std::const_pointer_cast<worker_interface>(i))
- , lifetime(std::move(cs))
- {
- }
- worker(composite_subscription cs, worker o)
- : inner(o.inner)
- , lifetime(std::move(cs))
- {
- }
-
- inline const composite_subscription& get_subscription() const {
- return lifetime;
- }
- inline composite_subscription& get_subscription() {
- return lifetime;
- }
-
- // composite_subscription
- //
- inline bool is_subscribed() const {
- return lifetime.is_subscribed();
- }
- inline weak_subscription add(subscription s) const {
- return lifetime.add(std::move(s));
- }
- inline void remove(weak_subscription w) const {
- return lifetime.remove(std::move(w));
- }
- inline void clear() const {
- return lifetime.clear();
- }
- inline void unsubscribe() const {
- return lifetime.unsubscribe();
- }
-
- // worker_interface
- //
- /// return the current time for this worker
- inline clock_type::time_point now() const {
- return inner->now();
- }
-
- /// insert the supplied schedulable to be run as soon as possible
- inline void schedule(const schedulable& scbl) const {
- // force rebinding scbl to this worker
- schedule_rebind(scbl);
- }
-
- /// insert the supplied schedulable to be run at the time specified
- inline void schedule(clock_type::time_point when, const schedulable& scbl) const {
- // force rebinding scbl to this worker
- schedule_rebind(when, scbl);
- }
-
- // helpers
- //
-
- /// insert the supplied schedulable to be run at now() + the delay specified
- inline void schedule(clock_type::duration when, const schedulable& scbl) const {
- // force rebinding scbl to this worker
- schedule_rebind(now() + when, scbl);
- }
-
- /// insert the supplied schedulable to be run at the initial time specified and then again at initial + (N * period)
- /// this will continue until the worker or schedulable is unsubscribed.
- inline void schedule_periodically(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl) const {
- // force rebinding scbl to this worker
- schedule_periodically_rebind(initial, period, scbl);
- }
-
- /// insert the supplied schedulable to be run at now() + the initial delay specified and then again at now() + initial + (N * period)
- /// this will continue until the worker or schedulable is unsubscribed.
- inline void schedule_periodically(clock_type::duration initial, clock_type::duration period, const schedulable& scbl) const {
- // force rebinding scbl to this worker
- schedule_periodically_rebind(now() + initial, period, scbl);
- }
-
- /// use the supplied arguments to make a schedulable and then insert it to be run
- template<class Arg0, class... ArgN>
- auto schedule(Arg0&& a0, ArgN&&... an) const
- -> typename std::enable_if<
- (detail::is_action_function<Arg0>::value ||
- is_subscription<Arg0>::value) &&
- !is_schedulable<Arg0>::value>::type;
- template<class... ArgN>
- /// use the supplied arguments to make a schedulable and then insert it to be run
- void schedule_rebind(const schedulable& scbl, ArgN&&... an) const;
-
- /// use the supplied arguments to make a schedulable and then insert it to be run
- template<class Arg0, class... ArgN>
- auto schedule(clock_type::time_point when, Arg0&& a0, ArgN&&... an) const
- -> typename std::enable_if<
- (detail::is_action_function<Arg0>::value ||
- is_subscription<Arg0>::value) &&
- !is_schedulable<Arg0>::value>::type;
- /// use the supplied arguments to make a schedulable and then insert it to be run
- template<class... ArgN>
- void schedule_rebind(clock_type::time_point when, const schedulable& scbl, ArgN&&... an) const;
-
- /// use the supplied arguments to make a schedulable and then insert it to be run
- template<class Arg0, class... ArgN>
- auto schedule_periodically(clock_type::time_point initial, clock_type::duration period, Arg0&& a0, ArgN&&... an) const
- -> typename std::enable_if<
- (detail::is_action_function<Arg0>::value ||
- is_subscription<Arg0>::value) &&
- !is_schedulable<Arg0>::value>::type;
- /// use the supplied arguments to make a schedulable and then insert it to be run
- template<class... ArgN>
- void schedule_periodically_rebind(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl, ArgN&&... an) const;
-};
-
-inline bool operator==(const worker& lhs, const worker& rhs) {
- return lhs.inner == rhs.inner && lhs.lifetime == rhs.lifetime;
-}
-inline bool operator!=(const worker& lhs, const worker& rhs) {
- return !(lhs == rhs);
-}
-
-class weak_worker
-{
- detail::worker_interface_weak_ptr inner;
- composite_subscription lifetime;
-
-public:
- weak_worker()
- {
- }
- explicit weak_worker(worker& owner)
- : inner(owner.inner)
- , lifetime(owner.lifetime)
- {
- }
-
- worker lock() const {
- return worker(lifetime, inner.lock());
- }
-};
-
-class scheduler_interface
- : public std::enable_shared_from_this<scheduler_interface>
-{
- typedef scheduler_interface this_type;
-
-public:
- typedef scheduler_base::clock_type clock_type;
-
- virtual ~scheduler_interface() {}
-
- virtual clock_type::time_point now() const = 0;
-
- virtual worker create_worker(composite_subscription cs) const = 0;
-};
-
-
-struct schedulable_base :
- // public subscription_base, <- already in worker base
- public worker_base,
- public action_base
-{
- typedef tag_schedulable schedulable_tag;
-};
-
-/*!
- \brief allows functions to be called at specified times and possibly in other contexts.
-
- \ingroup group-core
-
-*/
-class scheduler : public scheduler_base
-{
- typedef scheduler this_type;
- detail::scheduler_interface_ptr inner;
- friend bool operator==(const scheduler&, const scheduler&);
-public:
- typedef scheduler_base::clock_type clock_type;
-
- scheduler()
- {
- }
- explicit scheduler(detail::scheduler_interface_ptr i)
- : inner(std::move(i))
- {
- }
- explicit scheduler(detail::const_scheduler_interface_ptr i)
- : inner(std::const_pointer_cast<scheduler_interface>(i))
- {
- }
-
- /// return the current time for this scheduler
- inline clock_type::time_point now() const {
- return inner->now();
- }
- /// create a worker with a lifetime.
- /// when the worker is unsubscribed all scheduled items will be unsubscribed.
- /// items scheduled to a worker will be run one at a time.
- /// scheduling order is preserved: when more than one item is scheduled for
- /// time T then at time T they will be run in the order that they were scheduled.
- inline worker create_worker(composite_subscription cs = composite_subscription()) const {
- return inner->create_worker(cs);
- }
-};
-
-template<class Scheduler, class... ArgN>
-inline scheduler make_scheduler(ArgN&&... an) {
- return scheduler(std::static_pointer_cast<scheduler_interface>(std::make_shared<Scheduler>(std::forward<ArgN>(an)...)));
-}
-
-inline scheduler make_scheduler(std::shared_ptr<scheduler_interface> si) {
- return scheduler(si);
-}
-
-class schedulable : public schedulable_base
-{
- typedef schedulable this_type;
-
- composite_subscription lifetime;
- weak_worker controller;
- action activity;
- bool scoped;
- composite_subscription::weak_subscription action_scope;
-
- struct detacher
- {
- ~detacher()
- {
- if (that) {
- that->unsubscribe();
- }
- }
- detacher(const this_type* that)
- : that(that)
- {
- }
- const this_type* that;
- };
-
- class recursed_scope_type
- {
- mutable const recursed* requestor;
-
- class exit_recursed_scope_type
- {
- const recursed_scope_type* that;
- public:
- ~exit_recursed_scope_type()
- {
- if (that != nullptr) {
- that->requestor = nullptr;
- }
- }
- exit_recursed_scope_type(const recursed_scope_type* that)
- : that(that)
- {
- }
- exit_recursed_scope_type(exit_recursed_scope_type && other) RXCPP_NOEXCEPT
- : that(other.that)
- {
- other.that = nullptr;
- }
- };
- public:
- recursed_scope_type()
- : requestor(nullptr)
- {
- }
- recursed_scope_type(const recursed_scope_type&)
- : requestor(nullptr)
- {
- // does not aquire recursion scope
- }
- recursed_scope_type& operator=(const recursed_scope_type& )
- {
- // no change in recursion scope
- return *this;
- }
- exit_recursed_scope_type reset(const recurse& r) const {
- requestor = std::addressof(r.get_recursed());
- return exit_recursed_scope_type(this);
- }
- bool is_recursed() const {
- return !!requestor;
- }
- void operator()() const {
- (*requestor)();
- }
- };
- recursed_scope_type recursed_scope;
-
-public:
- typedef composite_subscription::weak_subscription weak_subscription;
- typedef scheduler_base::clock_type clock_type;
-
- ~schedulable()
- {
- if (scoped) {
- controller.lock().remove(action_scope);
- }
- }
- schedulable()
- : scoped(false)
- {
- }
-
- /// action and worker share lifetime
- schedulable(worker q, action a)
- : lifetime(q.get_subscription())
- , controller(q)
- , activity(std::move(a))
- , scoped(false)
- {
- }
- /// action and worker have independent lifetimes
- schedulable(composite_subscription cs, worker q, action a)
- : lifetime(std::move(cs))
- , controller(q)
- , activity(std::move(a))
- , scoped(true)
- , action_scope(controller.lock().add(lifetime))
- {
- }
- /// inherit lifetimes
- schedulable(schedulable scbl, worker q, action a)
- : lifetime(scbl.get_subscription())
- , controller(q)
- , activity(std::move(a))
- , scoped(scbl.scoped)
- , action_scope(scbl.scoped ? controller.lock().add(lifetime) : weak_subscription())
- {
- }
-
- inline const composite_subscription& get_subscription() const {
- return lifetime;
- }
- inline composite_subscription& get_subscription() {
- return lifetime;
- }
- inline const worker get_worker() const {
- return controller.lock();
- }
- inline worker get_worker() {
- return controller.lock();
- }
- inline const action& get_action() const {
- return activity;
- }
- inline action& get_action() {
- return activity;
- }
-
- inline static schedulable empty(worker sc) {
- return schedulable(composite_subscription::empty(), sc, action::empty());
- }
-
- inline auto set_recursed(const recurse& r) const
- -> decltype(recursed_scope.reset(r)) {
- return recursed_scope.reset(r);
- }
-
- // recursed
- //
- bool is_recursed() const {
- return recursed_scope.is_recursed();
- }
- /// requests tail-recursion of the same action
- /// this will exit the process if called when
- /// is_recursed() is false.
- /// Note: to improve perf it is not required
- /// to call is_recursed() before calling this
- /// operator. Context is sufficient. The schedulable
- /// passed to the action by the scheduler will return
- /// true from is_recursed()
- inline void operator()() const {
- recursed_scope();
- }
-
- // composite_subscription
- //
- inline bool is_subscribed() const {
- return lifetime.is_subscribed();
- }
- inline weak_subscription add(subscription s) const {
- return lifetime.add(std::move(s));
- }
- template<class F>
- auto add(F f) const
- -> typename std::enable_if<rxcpp::detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
- return lifetime.add(make_subscription(std::move(f)));
- }
- inline void remove(weak_subscription w) const {
- return lifetime.remove(std::move(w));
- }
- inline void clear() const {
- return lifetime.clear();
- }
- inline void unsubscribe() const {
- return lifetime.unsubscribe();
- }
-
- // scheduler
- //
- inline clock_type::time_point now() const {
- return controller.lock().now();
- }
- /// put this on the queue of the stored scheduler to run asap
- inline void schedule() const {
- if (is_subscribed()) {
- get_worker().schedule(*this);
- }
- }
- /// put this on the queue of the stored scheduler to run at the specified time
- inline void schedule(clock_type::time_point when) const {
- if (is_subscribed()) {
- get_worker().schedule(when, *this);
- }
- }
- /// put this on the queue of the stored scheduler to run after a delay from now
- inline void schedule(clock_type::duration when) const {
- if (is_subscribed()) {
- get_worker().schedule(when, *this);
- }
- }
-
- // action
- //
- /// invokes the action
- inline void operator()(const recurse& r) const {
- if (!is_subscribed()) {
- return;
- }
- detacher protect(this);
- activity(*this, r);
- protect.that = nullptr;
- }
-};
-
-struct current_thread;
-
-namespace detail {
-
-class action_type
- : public std::enable_shared_from_this<action_type>
-{
- typedef action_type this_type;
-
-public:
- typedef std::function<void(const schedulable&, const recurse&)> function_type;
-
-private:
- function_type f;
-
-public:
- action_type()
- {
- }
-
- action_type(function_type f)
- : f(std::move(f))
- {
- }
-
- inline void operator()(const schedulable& s, const recurse& r) {
- if (!f) {
- std::terminate();
- }
- f(s, r);
- }
-};
-
-class action_tailrecurser
- : public std::enable_shared_from_this<action_type>
-{
- typedef action_type this_type;
-
-public:
- typedef std::function<void(const schedulable&)> function_type;
-
-private:
- function_type f;
-
-public:
- action_tailrecurser()
- {
- }
-
- action_tailrecurser(function_type f)
- : f(std::move(f))
- {
- }
-
- inline void operator()(const schedulable& s, const recurse& r) {
- if (!f) {
- std::terminate();
- }
- trace_activity().action_enter(s);
- auto scope = s.set_recursed(r);
- while (s.is_subscribed()) {
- r.reset();
- f(s);
- if (!r.is_allowed() || !r.is_requested()) {
- if (r.is_requested()) {
- s.schedule();
- }
- break;
- }
- trace_activity().action_recurse(s);
- }
- trace_activity().action_return(s);
- }
-};
-}
-
-inline void action::operator()(const schedulable& s, const recurse& r) const {
- (*inner)(s, r);
-}
-
-inline action make_action_empty() {
- return action::empty();
-}
-
-template<class F>
-inline action make_action(F&& f) {
- static_assert(detail::is_action_function<F>::value, "action function must be void(schedulable)");
- auto fn = std::forward<F>(f);
- return action(std::make_shared<detail::action_type>(detail::action_tailrecurser(fn)));
-}
-
-// copy
-inline auto make_schedulable(
- const schedulable& scbl)
- -> schedulable {
- return schedulable(scbl);
-}
-// move
-inline auto make_schedulable(
- schedulable&& scbl)
- -> schedulable {
- return schedulable(std::move(scbl));
-}
-
-inline schedulable make_schedulable(worker sc, action a) {
- return schedulable(sc, a);
-}
-inline schedulable make_schedulable(worker sc, composite_subscription cs, action a) {
- return schedulable(cs, sc, a);
-}
-
-template<class F>
-auto make_schedulable(worker sc, F&& f)
- -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
- return schedulable(sc, make_action(std::forward<F>(f)));
-}
-template<class F>
-auto make_schedulable(worker sc, composite_subscription cs, F&& f)
- -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
- return schedulable(cs, sc, make_action(std::forward<F>(f)));
-}
-template<class F>
-auto make_schedulable(schedulable scbl, composite_subscription cs, F&& f)
- -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
- return schedulable(cs, scbl.get_worker(), make_action(std::forward<F>(f)));
-}
-template<class F>
-auto make_schedulable(schedulable scbl, worker sc, F&& f)
- -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
- return schedulable(scbl, sc, make_action(std::forward<F>(f)));
-}
-template<class F>
-auto make_schedulable(schedulable scbl, F&& f)
- -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
- return schedulable(scbl, scbl.get_worker(), make_action(std::forward<F>(f)));
-}
-
-inline auto make_schedulable(schedulable scbl, composite_subscription cs)
- -> schedulable {
- return schedulable(cs, scbl.get_worker(), scbl.get_action());
-}
-inline auto make_schedulable(schedulable scbl, worker sc, composite_subscription cs)
- -> schedulable {
- return schedulable(cs, sc, scbl.get_action());
-}
-inline auto make_schedulable(schedulable scbl, worker sc)
- -> schedulable {
- return schedulable(scbl, sc, scbl.get_action());
-}
-
-template<class Arg0, class... ArgN>
-auto worker::schedule(Arg0&& a0, ArgN&&... an) const
- -> typename std::enable_if<
- (detail::is_action_function<Arg0>::value ||
- is_subscription<Arg0>::value) &&
- !is_schedulable<Arg0>::value>::type {
- auto scbl = make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...);
- trace_activity().schedule_enter(*inner.get(), scbl);
- inner->schedule(std::move(scbl));
- trace_activity().schedule_return(*inner.get());
-}
-template<class... ArgN>
-void worker::schedule_rebind(const schedulable& scbl, ArgN&&... an) const {
- auto rescbl = make_schedulable(scbl, *this, std::forward<ArgN>(an)...);
- trace_activity().schedule_enter(*inner.get(), rescbl);
- inner->schedule(std::move(rescbl));
- trace_activity().schedule_return(*inner.get());
-}
-
-template<class Arg0, class... ArgN>
-auto worker::schedule(clock_type::time_point when, Arg0&& a0, ArgN&&... an) const
- -> typename std::enable_if<
- (detail::is_action_function<Arg0>::value ||
- is_subscription<Arg0>::value) &&
- !is_schedulable<Arg0>::value>::type {
- auto scbl = make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...);
- trace_activity().schedule_when_enter(*inner.get(), when, scbl);
- inner->schedule(when, std::move(scbl));
- trace_activity().schedule_when_return(*inner.get());
-}
-template<class... ArgN>
-void worker::schedule_rebind(clock_type::time_point when, const schedulable& scbl, ArgN&&... an) const {
- auto rescbl = make_schedulable(scbl, *this, std::forward<ArgN>(an)...);
- trace_activity().schedule_when_enter(*inner.get(), when, rescbl);
- inner->schedule(when, std::move(rescbl));
- trace_activity().schedule_when_return(*inner.get());
-}
-
-template<class Arg0, class... ArgN>
-auto worker::schedule_periodically(clock_type::time_point initial, clock_type::duration period, Arg0&& a0, ArgN&&... an) const
- -> typename std::enable_if<
- (detail::is_action_function<Arg0>::value ||
- is_subscription<Arg0>::value) &&
- !is_schedulable<Arg0>::value>::type {
- schedule_periodically_rebind(initial, period, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
-}
-template<class... ArgN>
-void worker::schedule_periodically_rebind(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl, ArgN&&... an) const {
- auto keepAlive = *this;
- auto target = std::make_shared<clock_type::time_point>(initial);
- auto activity = make_schedulable(scbl, keepAlive, std::forward<ArgN>(an)...);
- auto periodic = make_schedulable(
- activity,
- [keepAlive, target, period, activity](schedulable self) {
- // any recursion requests will be pushed to the scheduler queue
- recursion r(false);
- // call action
- activity(r.get_recurse());
-
- // schedule next occurance (if the action took longer than 'period' target will be in the past)
- *target += period;
- self.schedule(*target);
- });
- trace_activity().schedule_when_enter(*inner.get(), *target, periodic);
- inner->schedule(*target, periodic);
- trace_activity().schedule_when_return(*inner.get());
-}
-
-namespace detail {
-
-template<class TimePoint>
-struct time_schedulable
-{
- typedef TimePoint time_point_type;
-
- time_schedulable(TimePoint when, schedulable a)
- : when(when)
- , what(std::move(a))
- {
- }
- TimePoint when;
- schedulable what;
-};
-
-
-// Sorts time_schedulable items in priority order sorted
-// on value of time_schedulable.when. Items with equal
-// values for when are sorted in fifo order.
-template<class TimePoint>
-class schedulable_queue {
-public:
- typedef time_schedulable<TimePoint> item_type;
- typedef std::pair<item_type, int64_t> elem_type;
- typedef std::vector<elem_type> container_type;
- typedef const item_type& const_reference;
-
-private:
- struct compare_elem
- {
- bool operator()(const elem_type& lhs, const elem_type& rhs) const {
- if (lhs.first.when == rhs.first.when) {
- return lhs.second > rhs.second;
- }
- else {
- return lhs.first.when > rhs.first.when;
- }
- }
- };
-
- typedef std::priority_queue<
- elem_type,
- container_type,
- compare_elem
- > queue_type;
-
- queue_type q;
-
- int64_t ordinal;
-public:
-
- schedulable_queue()
- : ordinal(0)
- {
- }
-
- const_reference top() const {
- return q.top().first;
- }
-
- void pop() {
- q.pop();
- }
-
- bool empty() const {
- return q.empty();
- }
-
- void push(const item_type& value) {
- q.push(elem_type(value, ordinal++));
- }
-
- void push(item_type&& value) {
- q.push(elem_type(std::move(value), ordinal++));
- }
-};
-
-}
-
-}
-namespace rxsc=schedulers;
-
-}
-
-#include "schedulers/rx-currentthread.hpp"
-#include "schedulers/rx-runloop.hpp"
-#include "schedulers/rx-newthread.hpp"
-#include "schedulers/rx-eventloop.hpp"
-#include "schedulers/rx-immediate.hpp"
-#include "schedulers/rx-virtualtime.hpp"
-#include "schedulers/rx-sameworker.hpp"
-
-#endif