diff options
Diffstat (limited to 'Rx/v2/test/operators/replay.cpp')
-rw-r--r-- | Rx/v2/test/operators/replay.cpp | 698 |
1 files changed, 0 insertions, 698 deletions
diff --git a/Rx/v2/test/operators/replay.cpp b/Rx/v2/test/operators/replay.cpp deleted file mode 100644 index 62ab372..0000000 --- a/Rx/v2/test/operators/replay.cpp +++ /dev/null @@ -1,698 +0,0 @@ -#include "../test.h" -#include <rxcpp/operators/rx-replay.hpp> - -SCENARIO("replay basic", "[replay][multicast][subject][operators]"){ - GIVEN("a test hot observable of ints"){ - auto sc = rxsc::make_test(); - auto w = sc.create_worker(); - const rxsc::test::messages<int> on; - - auto xs = sc.make_hot_observable({ - on.next(110, 0), - on.next(220, 1), - on.next(280, 2), - on.next(290, 3), - on.next(340, 4), - on.next(360, 5), - on.next(370, 6), - on.next(390, 7), - on.next(410, 8), - on.next(430, 9), - on.next(450, 10), - on.next(520, 11), - on.next(560, 12), - on.completed(600) - }); - - auto res = w.make_subscriber<int>(); - - rx::connectable_observable<int> ys; - - WHEN("subscribed and then connected"){ - - w.schedule_absolute(rxsc::test::created_time, - [&ys, &xs](const rxsc::schedulable&){ - ys = xs.replay().as_dynamic(); - }); - - w.schedule_absolute(rxsc::test::subscribed_time, - [&ys, &res](const rxsc::schedulable&){ - ys.subscribe(res); - }); - - w.schedule_absolute(rxsc::test::unsubscribed_time, - [&res](const rxsc::schedulable&){ - res.unsubscribe(); - }); - - { - rx::composite_subscription connection; - - w.schedule_absolute(300, - [connection, &ys](const rxsc::schedulable&){ - ys.connect(connection); - }); - w.schedule_absolute(400, - [connection](const rxsc::schedulable&){ - connection.unsubscribe(); - }); - } - - { - rx::composite_subscription connection; - - w.schedule_absolute(500, - [connection, &ys](const rxsc::schedulable&){ - ys.connect(connection); - }); - w.schedule_absolute(550, - [connection](const rxsc::schedulable&){ - connection.unsubscribe(); - }); - } - - { - rx::composite_subscription connection; - - w.schedule_absolute(650, - [connection, &ys](const rxsc::schedulable&){ - ys.connect(connection); - }); - w.schedule_absolute(800, - [connection](const rxsc::schedulable&){ - connection.unsubscribe(); - }); - } - - w.start(); - - THEN("the output only contains items sent while subscribed"){ - auto required = rxu::to_vector({ - on.next(340, 4), - on.next(360, 5), - on.next(370, 6), - on.next(390, 7), - on.next(520, 11) - }); - auto actual = res.get_observer().messages(); - REQUIRE(required == actual); - } - - THEN("there were 3 subscription/unsubscription"){ - auto required = rxu::to_vector({ - on.subscribe(300, 400), - on.subscribe(500, 550), - on.subscribe(650, 800) - }); - auto actual = xs.subscriptions(); - REQUIRE(required == actual); - } - - } - } -} - -SCENARIO("replay error", "[replay][error][multicast][subject][operators]"){ - GIVEN("a test hot observable of ints"){ - auto sc = rxsc::make_test(); - auto w = sc.create_worker(); - const rxsc::test::messages<int> on; - - std::runtime_error ex("publish on_error"); - - auto xs = sc.make_hot_observable({ - on.next(110, 0), - on.next(220, 1), - on.next(280, 2), - on.next(290, 3), - on.next(340, 4), - on.next(360, 5), - on.next(370, 6), - on.next(390, 7), - on.next(410, 8), - on.next(430, 9), - on.next(450, 10), - on.next(520, 11), - on.next(560, 12), - on.error(600, ex) - }); - - auto res = w.make_subscriber<int>(); - - rx::connectable_observable<int> ys; - - WHEN("subscribed and then connected"){ - - w.schedule_absolute(rxsc::test::created_time, - [&ys, &xs](const rxsc::schedulable&){ - ys = xs.replay().as_dynamic(); - }); - - w.schedule_absolute(rxsc::test::subscribed_time, - [&ys, &res](const rxsc::schedulable&){ - ys.subscribe(res); - }); - - w.schedule_absolute(rxsc::test::unsubscribed_time, - [&res](const rxsc::schedulable&){ - res.unsubscribe(); - }); - - { - rx::composite_subscription connection; - - w.schedule_absolute(300, - [connection, &ys](const rxsc::schedulable&){ - ys.connect(connection); - }); - w.schedule_absolute(400, - [connection](const rxsc::schedulable&){ - connection.unsubscribe(); - }); - } - - { - rx::composite_subscription connection; - - w.schedule_absolute(500, - [connection, &ys](const rxsc::schedulable&){ - ys.connect(connection); - }); - w.schedule_absolute(800, - [connection](const rxsc::schedulable&){ - connection.unsubscribe(); - }); - } - - w.start(); - - THEN("the output only contains items sent while subscribed"){ - auto required = rxu::to_vector({ - on.next(340, 4), - on.next(360, 5), - on.next(370, 6), - on.next(390, 7), - on.next(520, 11), - on.next(560, 12), - on.error(600, ex) - }); - auto actual = res.get_observer().messages(); - REQUIRE(required == actual); - } - - THEN("there were 2 subscription/unsubscription"){ - auto required = rxu::to_vector({ - on.subscribe(300, 400), - on.subscribe(500, 600) - }); - auto actual = xs.subscriptions(); - REQUIRE(required == actual); - } - - } - } -} - -SCENARIO("replay multiple subscriptions", "[replay][multicast][subject][operators]"){ - GIVEN("a test hot observable of ints"){ - auto sc = rxsc::make_test(); - auto w = sc.create_worker(); - const rxsc::test::messages<int> on; - - auto xs = sc.make_hot_observable({ - on.next(110, 0), - on.next(220, 1), - on.next(280, 2), - on.next(290, 3), - on.next(340, 4), - on.next(360, 5), - on.next(370, 6), - on.next(390, 7), - on.next(410, 8), - on.next(430, 9), - on.next(450, 10), - on.next(520, 11), - on.next(560, 12), - on.completed(650) - }); - - rx::connectable_observable<int> ys; - - WHEN("subscribed and then connected"){ - - // Create connectable observable - w.schedule_absolute(rxsc::test::created_time, - [&ys, &xs](const rxsc::schedulable&){ - ys = xs.replay().as_dynamic(); - }); - - // Manage connection - rx::composite_subscription connection; - w.schedule_absolute(rxsc::test::subscribed_time, - [connection, &ys](const rxsc::schedulable&){ - ys.connect(connection); - }); - w.schedule_absolute(rxsc::test::unsubscribed_time, - [connection](const rxsc::schedulable&){ - connection.unsubscribe(); - }); - - // Subscribe before the first item emitted - auto res1 = w.make_subscriber<int>(); - w.schedule_absolute(200, [&ys, &res1](const rxsc::schedulable&){ys.subscribe(res1);}); - - // Subscribe in the middle of emitting - auto res2 = w.make_subscriber<int>(); - w.schedule_absolute(400, [&ys, &res2](const rxsc::schedulable&){ys.subscribe(res2);}); - - // Subscribe after the last item emitted - auto res3 = w.make_subscriber<int>(); - w.schedule_absolute(600, [&ys, &res3](const rxsc::schedulable&){ys.subscribe(res3);}); - - w.start(); - - THEN("the output only contains items sent while subscribed"){ - auto required = rxu::to_vector({ - on.next(220, 1), - on.next(280, 2), - on.next(290, 3), - on.next(340, 4), - on.next(360, 5), - on.next(370, 6), - on.next(390, 7), - on.next(410, 8), - on.next(430, 9), - on.next(450, 10), - on.next(520, 11), - on.next(560, 12), - on.completed(650) - }); - auto actual = res1.get_observer().messages(); - REQUIRE(required == actual); - } - - THEN("the output only contains items sent while subscribed"){ - auto required = rxu::to_vector({ - on.next(400, 1), - on.next(400, 2), - on.next(400, 3), - on.next(400, 4), - on.next(400, 5), - on.next(400, 6), - on.next(400, 7), - on.next(410, 8), - on.next(430, 9), - on.next(450, 10), - on.next(520, 11), - on.next(560, 12), - on.completed(650) - }); - auto actual = res2.get_observer().messages(); - REQUIRE(required == actual); - } - - THEN("the output only contains items sent while subscribed"){ - auto required = rxu::to_vector({ - on.next(600, 1), - on.next(600, 2), - on.next(600, 3), - on.next(600, 4), - on.next(600, 5), - on.next(600, 6), - on.next(600, 7), - on.next(600, 8), - on.next(600, 9), - on.next(600, 10), - on.next(600, 11), - on.next(600, 12), - on.completed(650) - }); - auto actual = res3.get_observer().messages(); - REQUIRE(required == actual); - } - - THEN("there was 1 subscription/unsubscription"){ - auto required = rxu::to_vector({ - on.subscribe(200, 650) - }); - auto actual = xs.subscriptions(); - REQUIRE(required == actual); - } - - } - } -} - -SCENARIO("replay multiple subscriptions with count", "[replay][multicast][subject][operators]"){ - GIVEN("a test hot observable of ints"){ - auto sc = rxsc::make_test(); - auto w = sc.create_worker(); - const rxsc::test::messages<int> on; - - auto xs = sc.make_hot_observable({ - on.next(110, 0), - on.next(220, 1), - on.next(280, 2), - on.next(290, 3), - on.next(340, 4), - on.next(360, 5), - on.next(370, 6), - on.next(390, 7), - on.next(410, 8), - on.next(430, 9), - on.next(450, 10), - on.next(520, 11), - on.next(560, 12), - on.completed(650) - }); - - rx::connectable_observable<int> ys; - - WHEN("subscribed and then connected"){ - - // Create connectable observable - w.schedule_absolute(rxsc::test::created_time, - [&ys, &xs](const rxsc::schedulable&){ - ys = xs.replay(3).as_dynamic(); - }); - - // Manage connection - rx::composite_subscription connection; - w.schedule_absolute(rxsc::test::subscribed_time, - [connection, &ys](const rxsc::schedulable&){ - ys.connect(connection); - }); - w.schedule_absolute(rxsc::test::unsubscribed_time, - [connection](const rxsc::schedulable&){ - connection.unsubscribe(); - }); - - // Subscribe before the first item emitted - auto res1 = w.make_subscriber<int>(); - w.schedule_absolute(200, [&ys, &res1](const rxsc::schedulable&){ys.subscribe(res1);}); - - // Subscribe in the middle of emitting - auto res2 = w.make_subscriber<int>(); - w.schedule_absolute(400, [&ys, &res2](const rxsc::schedulable&){ys.subscribe(res2);}); - - // Subscribe after the last item emitted - auto res3 = w.make_subscriber<int>(); - w.schedule_absolute(600, [&ys, &res3](const rxsc::schedulable&){ys.subscribe(res3);}); - - w.start(); - - THEN("the output only contains items sent while subscribed"){ - auto required = rxu::to_vector({ - on.next(220, 1), - on.next(280, 2), - on.next(290, 3), - on.next(340, 4), - on.next(360, 5), - on.next(370, 6), - on.next(390, 7), - on.next(410, 8), - on.next(430, 9), - on.next(450, 10), - on.next(520, 11), - on.next(560, 12), - on.completed(650) - }); - auto actual = res1.get_observer().messages(); - REQUIRE(required == actual); - } - - THEN("the output only contains items sent while subscribed"){ - auto required = rxu::to_vector({ - on.next(400, 5), - on.next(400, 6), - on.next(400, 7), - on.next(410, 8), - on.next(430, 9), - on.next(450, 10), - on.next(520, 11), - on.next(560, 12), - on.completed(650) - }); - auto actual = res2.get_observer().messages(); - REQUIRE(required == actual); - } - - THEN("the output only contains items sent while subscribed"){ - auto required = rxu::to_vector({ - on.next(600, 10), - on.next(600, 11), - on.next(600, 12), - on.completed(650) - }); - auto actual = res3.get_observer().messages(); - REQUIRE(required == actual); - } - - THEN("there was 1 subscription/unsubscription"){ - auto required = rxu::to_vector({ - on.subscribe(200, 650) - }); - auto actual = xs.subscriptions(); - REQUIRE(required == actual); - } - - } - } -} - -SCENARIO("replay multiple subscriptions with time", "[replay][multicast][subject][operators]"){ - GIVEN("a test hot observable of ints"){ - auto sc = rxsc::make_test(); - auto w = sc.create_worker(); - auto so = rx::identity_one_worker(sc); - const rxsc::test::messages<int> on; - - auto xs = sc.make_hot_observable({ - on.next(110, 0), - on.next(220, 1), - on.next(240, 2), - on.next(260, 3), - on.next(340, 4), - on.next(360, 5), - on.next(370, 6), - on.next(390, 7), - on.next(410, 8), - on.next(430, 9), - on.next(450, 10), - on.next(520, 11), - on.next(560, 12), - on.completed(650) - }); - - rx::connectable_observable<int> ys; - - WHEN("subscribed and then connected"){ - using namespace std::chrono; - - // Create connectable observable - w.schedule_absolute(rxsc::test::created_time, - [&](const rxsc::schedulable&){ - ys = xs.replay(milliseconds(100), so).as_dynamic(); - }); - - // Manage connection - rx::composite_subscription connection; - w.schedule_absolute(rxsc::test::subscribed_time, - [connection, &ys](const rxsc::schedulable&){ - ys.connect(connection); - }); - w.schedule_absolute(rxsc::test::unsubscribed_time, - [connection](const rxsc::schedulable&){ - connection.unsubscribe(); - }); - - // Subscribe before the first item emitted - auto res1 = w.make_subscriber<int>(); - w.schedule_absolute(200, [&ys, &res1](const rxsc::schedulable&){ys.subscribe(res1);}); - - // Subscribe in the middle of emitting - auto res2 = w.make_subscriber<int>(); - w.schedule_absolute(400, [&ys, &res2](const rxsc::schedulable&){ys.subscribe(res2);}); - - // Subscribe after the last item emitted - auto res3 = w.make_subscriber<int>(); - w.schedule_absolute(600, [&ys, &res3](const rxsc::schedulable&){ys.subscribe(res3);}); - - w.start(); - - THEN("the output only contains items sent while subscribed"){ - auto required = rxu::to_vector({ - on.next(220, 1), - on.next(240, 2), - on.next(260, 3), - on.next(340, 4), - on.next(360, 5), - on.next(370, 6), - on.next(390, 7), - on.next(410, 8), - on.next(430, 9), - on.next(450, 10), - on.next(520, 11), - on.next(560, 12), - on.completed(650) - }); - auto actual = res1.get_observer().messages(); - REQUIRE(required == actual); - } - - THEN("the output only contains items sent while subscribed"){ - auto required = rxu::to_vector({ - on.next(400, 4), - on.next(400, 5), - on.next(400, 6), - on.next(400, 7), - on.next(410, 8), - on.next(430, 9), - on.next(450, 10), - on.next(520, 11), - on.next(560, 12), - on.completed(650) - }); - auto actual = res2.get_observer().messages(); - REQUIRE(required == actual); - } - - THEN("the output only contains items sent while subscribed"){ - auto required = rxu::to_vector({ - on.next(600, 11), - on.next(600, 12), - on.completed(650) - }); - auto actual = res3.get_observer().messages(); - REQUIRE(required == actual); - } - - THEN("there was 1 subscription/unsubscription"){ - auto required = rxu::to_vector({ - on.subscribe(200, 650) - }); - auto actual = xs.subscriptions(); - REQUIRE(required == actual); - } - - } - } -} - -SCENARIO("replay multiple subscriptions with count and time", "[replay][multicast][subject][operators]"){ - GIVEN("a test hot observable of ints"){ - auto sc = rxsc::make_test(); - auto w = sc.create_worker(); - auto so = rx::identity_one_worker(sc); - const rxsc::test::messages<int> on; - - auto xs = sc.make_hot_observable({ - on.next(110, 0), - on.next(220, 1), - on.next(240, 2), - on.next(260, 3), - on.next(340, 4), - on.next(360, 5), - on.next(370, 6), - on.next(390, 7), - on.next(410, 8), - on.next(430, 9), - on.next(450, 10), - on.next(520, 11), - on.next(560, 12), - on.completed(650) - }); - - rx::connectable_observable<int> ys; - - WHEN("subscribed and then connected"){ - using namespace std::chrono; - - // Create connectable observable - w.schedule_absolute(rxsc::test::created_time, - [&](const rxsc::schedulable&){ - ys = xs.replay(3, milliseconds(100), so).as_dynamic(); - }); - - // Manage connection - rx::composite_subscription connection; - w.schedule_absolute(rxsc::test::subscribed_time, - [connection, &ys](const rxsc::schedulable&){ - ys.connect(connection); - }); - w.schedule_absolute(rxsc::test::unsubscribed_time, - [connection](const rxsc::schedulable&){ - connection.unsubscribe(); - }); - - // Subscribe before the first item emitted - auto res1 = w.make_subscriber<int>(); - w.schedule_absolute(200, [&ys, &res1](const rxsc::schedulable&){ys.subscribe(res1);}); - - // Subscribe in the middle of emitting - auto res2 = w.make_subscriber<int>(); - w.schedule_absolute(400, [&ys, &res2](const rxsc::schedulable&){ys.subscribe(res2);}); - - // Subscribe after the last item emitted - auto res3 = w.make_subscriber<int>(); - w.schedule_absolute(600, [&ys, &res3](const rxsc::schedulable&){ys.subscribe(res3);}); - - w.start(); - - THEN("the output only contains items sent while subscribed"){ - auto required = rxu::to_vector({ - on.next(220, 1), - on.next(240, 2), - on.next(260, 3), - on.next(340, 4), - on.next(360, 5), - on.next(370, 6), - on.next(390, 7), - on.next(410, 8), - on.next(430, 9), - on.next(450, 10), - on.next(520, 11), - on.next(560, 12), - on.completed(650) - }); - auto actual = res1.get_observer().messages(); - REQUIRE(required == actual); - } - - THEN("the output only contains items sent while subscribed"){ - auto required = rxu::to_vector({ - on.next(400, 5), - on.next(400, 6), - on.next(400, 7), - on.next(410, 8), - on.next(430, 9), - on.next(450, 10), - on.next(520, 11), - on.next(560, 12), - on.completed(650) - }); - auto actual = res2.get_observer().messages(); - REQUIRE(required == actual); - } - - THEN("the output only contains items sent while subscribed"){ - auto required = rxu::to_vector({ - on.next(600, 11), - on.next(600, 12), - on.completed(650) - }); - auto actual = res3.get_observer().messages(); - REQUIRE(required == actual); - } - - THEN("there was 1 subscription/unsubscription"){ - auto required = rxu::to_vector({ - on.subscribe(200, 650) - }); - auto actual = xs.subscriptions(); - REQUIRE(required == actual); - } - - } - } -} |