summaryrefslogtreecommitdiff
path: root/Rx/v2/src/rxcpp/operators/rx-skip.hpp
blob: b77c4da96fed5e42fc7b8de8999f53b85546c207 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

#pragma once

/*! \file rx-skip.hpp

    \brief Make new observable with skipped first count items from this observable.

    \tparam Count the type of the items counter

    \param t the number of items to skip

    \return An observable that is identical to the source observable except that it does not emit the first t items that the source observable emits.

    \sample
    \snippet skip.cpp skip sample
    \snippet output.txt skip sample
*/

#if !defined(RXCPP_OPERATORS_RX_SKIP_HPP)
#define RXCPP_OPERATORS_RX_SKIP_HPP

#include "../rx-includes.hpp"

namespace rxcpp {

namespace operators {

namespace detail {

template<class... AN>
struct skip_invalid_arguments {};

template<class... AN>
struct skip_invalid : public rxo::operator_base<skip_invalid_arguments<AN...>> {
    using type = observable<skip_invalid_arguments<AN...>, skip_invalid<AN...>>;
};

template<class... AN>
using skip_invalid_t = typename skip_invalid<AN...>::type;

template<class T, class Observable, class Count>
struct skip : public operator_base<T>
{
    typedef rxu::decay_t<Observable> source_type;
    typedef rxu::decay_t<Count> count_type;
    struct values
    {
        values(source_type s, count_type t)
            : source(std::move(s))
            , count(std::move(t))
        {
        }
        source_type source;
        count_type count;
    };
    values initial;

    skip(source_type s, count_type t)
        : initial(std::move(s), std::move(t))
    {
    }

    struct mode
    {
        enum type {
            skipping,  // ignore messages
            triggered, // capture messages
            errored,   // error occured
            stopped    // observable completed
        };
    };

    template<class Subscriber>
    void on_subscribe(const Subscriber& s) const {

        typedef Subscriber output_type;
        struct state_type
            : public std::enable_shared_from_this<state_type>
            , public values
        {
            state_type(const values& i, const output_type& oarg)
                : values(i)
                , mode_value(i.count > 0 ? mode::skipping : mode::triggered)
                , out(oarg)
            {
            }
            typename mode::type mode_value;
            output_type out;
        };
        // take a copy of the values for each subscription
        auto state = std::make_shared<state_type>(initial, s);

        composite_subscription source_lifetime;

        s.add(source_lifetime);

        state->source.subscribe(
        // split subscription lifetime
            source_lifetime,
        // on_next
            [state](T t) {
                if (state->mode_value == mode::skipping) {
                    if (--state->count == 0) {
                        state->mode_value = mode::triggered;
                    }
                } else {
                    state->out.on_next(t);
                }
            },
        // on_error
            [state](rxu::error_ptr e) {
                state->mode_value = mode::errored;
                state->out.on_error(e);
            },
        // on_completed
            [state]() {
                state->mode_value = mode::stopped;
                state->out.on_completed();
            }
        );
    }
};

}

/*! @copydoc rx-skip.hpp
*/
template<class... AN>
auto skip(AN&&... an)
->     operator_factory<skip_tag, AN...> {
    return operator_factory<skip_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
}

}

template<>
struct member_overload<skip_tag>
{
    template<class Observable,
            class Count,
            class Enabled = rxu::enable_if_all_true_type_t<
            is_observable<Observable>>,
            class SourceValue = rxu::value_type_t<Observable>,
            class Skip = rxo::detail::skip<SourceValue, rxu::decay_t<Observable>, rxu::decay_t<Count>>,
            class Value = rxu::value_type_t<Skip>,
            class Result = observable<Value, Skip>>
    static Result member(Observable&& o, Count&& c) {
        return Result(Skip(std::forward<Observable>(o), std::forward<Count>(c)));
    }

    template<class... AN>
    static operators::detail::skip_invalid_t<AN...> member(AN...) {
        std::terminate();
        return {};
        static_assert(sizeof...(AN) == 10000, "skip takes (optional Count)");
    }
};

}

#endif