summaryrefslogtreecommitdiff
path: root/Rx/v2/test/operators/replay.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'Rx/v2/test/operators/replay.cpp')
-rw-r--r--Rx/v2/test/operators/replay.cpp698
1 files changed, 0 insertions, 698 deletions
diff --git a/Rx/v2/test/operators/replay.cpp b/Rx/v2/test/operators/replay.cpp
deleted file mode 100644
index 62ab372..0000000
--- a/Rx/v2/test/operators/replay.cpp
+++ /dev/null
@@ -1,698 +0,0 @@
-#include "../test.h"
-#include <rxcpp/operators/rx-replay.hpp>
-
-SCENARIO("replay basic", "[replay][multicast][subject][operators]"){
- GIVEN("a test hot observable of ints"){
- auto sc = rxsc::make_test();
- auto w = sc.create_worker();
- const rxsc::test::messages<int> on;
-
- auto xs = sc.make_hot_observable({
- on.next(110, 0),
- on.next(220, 1),
- on.next(280, 2),
- on.next(290, 3),
- on.next(340, 4),
- on.next(360, 5),
- on.next(370, 6),
- on.next(390, 7),
- on.next(410, 8),
- on.next(430, 9),
- on.next(450, 10),
- on.next(520, 11),
- on.next(560, 12),
- on.completed(600)
- });
-
- auto res = w.make_subscriber<int>();
-
- rx::connectable_observable<int> ys;
-
- WHEN("subscribed and then connected"){
-
- w.schedule_absolute(rxsc::test::created_time,
- [&ys, &xs](const rxsc::schedulable&){
- ys = xs.replay().as_dynamic();
- });
-
- w.schedule_absolute(rxsc::test::subscribed_time,
- [&ys, &res](const rxsc::schedulable&){
- ys.subscribe(res);
- });
-
- w.schedule_absolute(rxsc::test::unsubscribed_time,
- [&res](const rxsc::schedulable&){
- res.unsubscribe();
- });
-
- {
- rx::composite_subscription connection;
-
- w.schedule_absolute(300,
- [connection, &ys](const rxsc::schedulable&){
- ys.connect(connection);
- });
- w.schedule_absolute(400,
- [connection](const rxsc::schedulable&){
- connection.unsubscribe();
- });
- }
-
- {
- rx::composite_subscription connection;
-
- w.schedule_absolute(500,
- [connection, &ys](const rxsc::schedulable&){
- ys.connect(connection);
- });
- w.schedule_absolute(550,
- [connection](const rxsc::schedulable&){
- connection.unsubscribe();
- });
- }
-
- {
- rx::composite_subscription connection;
-
- w.schedule_absolute(650,
- [connection, &ys](const rxsc::schedulable&){
- ys.connect(connection);
- });
- w.schedule_absolute(800,
- [connection](const rxsc::schedulable&){
- connection.unsubscribe();
- });
- }
-
- w.start();
-
- THEN("the output only contains items sent while subscribed"){
- auto required = rxu::to_vector({
- on.next(340, 4),
- on.next(360, 5),
- on.next(370, 6),
- on.next(390, 7),
- on.next(520, 11)
- });
- auto actual = res.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- THEN("there were 3 subscription/unsubscription"){
- auto required = rxu::to_vector({
- on.subscribe(300, 400),
- on.subscribe(500, 550),
- on.subscribe(650, 800)
- });
- auto actual = xs.subscriptions();
- REQUIRE(required == actual);
- }
-
- }
- }
-}
-
-SCENARIO("replay error", "[replay][error][multicast][subject][operators]"){
- GIVEN("a test hot observable of ints"){
- auto sc = rxsc::make_test();
- auto w = sc.create_worker();
- const rxsc::test::messages<int> on;
-
- std::runtime_error ex("publish on_error");
-
- auto xs = sc.make_hot_observable({
- on.next(110, 0),
- on.next(220, 1),
- on.next(280, 2),
- on.next(290, 3),
- on.next(340, 4),
- on.next(360, 5),
- on.next(370, 6),
- on.next(390, 7),
- on.next(410, 8),
- on.next(430, 9),
- on.next(450, 10),
- on.next(520, 11),
- on.next(560, 12),
- on.error(600, ex)
- });
-
- auto res = w.make_subscriber<int>();
-
- rx::connectable_observable<int> ys;
-
- WHEN("subscribed and then connected"){
-
- w.schedule_absolute(rxsc::test::created_time,
- [&ys, &xs](const rxsc::schedulable&){
- ys = xs.replay().as_dynamic();
- });
-
- w.schedule_absolute(rxsc::test::subscribed_time,
- [&ys, &res](const rxsc::schedulable&){
- ys.subscribe(res);
- });
-
- w.schedule_absolute(rxsc::test::unsubscribed_time,
- [&res](const rxsc::schedulable&){
- res.unsubscribe();
- });
-
- {
- rx::composite_subscription connection;
-
- w.schedule_absolute(300,
- [connection, &ys](const rxsc::schedulable&){
- ys.connect(connection);
- });
- w.schedule_absolute(400,
- [connection](const rxsc::schedulable&){
- connection.unsubscribe();
- });
- }
-
- {
- rx::composite_subscription connection;
-
- w.schedule_absolute(500,
- [connection, &ys](const rxsc::schedulable&){
- ys.connect(connection);
- });
- w.schedule_absolute(800,
- [connection](const rxsc::schedulable&){
- connection.unsubscribe();
- });
- }
-
- w.start();
-
- THEN("the output only contains items sent while subscribed"){
- auto required = rxu::to_vector({
- on.next(340, 4),
- on.next(360, 5),
- on.next(370, 6),
- on.next(390, 7),
- on.next(520, 11),
- on.next(560, 12),
- on.error(600, ex)
- });
- auto actual = res.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- THEN("there were 2 subscription/unsubscription"){
- auto required = rxu::to_vector({
- on.subscribe(300, 400),
- on.subscribe(500, 600)
- });
- auto actual = xs.subscriptions();
- REQUIRE(required == actual);
- }
-
- }
- }
-}
-
-SCENARIO("replay multiple subscriptions", "[replay][multicast][subject][operators]"){
- GIVEN("a test hot observable of ints"){
- auto sc = rxsc::make_test();
- auto w = sc.create_worker();
- const rxsc::test::messages<int> on;
-
- auto xs = sc.make_hot_observable({
- on.next(110, 0),
- on.next(220, 1),
- on.next(280, 2),
- on.next(290, 3),
- on.next(340, 4),
- on.next(360, 5),
- on.next(370, 6),
- on.next(390, 7),
- on.next(410, 8),
- on.next(430, 9),
- on.next(450, 10),
- on.next(520, 11),
- on.next(560, 12),
- on.completed(650)
- });
-
- rx::connectable_observable<int> ys;
-
- WHEN("subscribed and then connected"){
-
- // Create connectable observable
- w.schedule_absolute(rxsc::test::created_time,
- [&ys, &xs](const rxsc::schedulable&){
- ys = xs.replay().as_dynamic();
- });
-
- // Manage connection
- rx::composite_subscription connection;
- w.schedule_absolute(rxsc::test::subscribed_time,
- [connection, &ys](const rxsc::schedulable&){
- ys.connect(connection);
- });
- w.schedule_absolute(rxsc::test::unsubscribed_time,
- [connection](const rxsc::schedulable&){
- connection.unsubscribe();
- });
-
- // Subscribe before the first item emitted
- auto res1 = w.make_subscriber<int>();
- w.schedule_absolute(200, [&ys, &res1](const rxsc::schedulable&){ys.subscribe(res1);});
-
- // Subscribe in the middle of emitting
- auto res2 = w.make_subscriber<int>();
- w.schedule_absolute(400, [&ys, &res2](const rxsc::schedulable&){ys.subscribe(res2);});
-
- // Subscribe after the last item emitted
- auto res3 = w.make_subscriber<int>();
- w.schedule_absolute(600, [&ys, &res3](const rxsc::schedulable&){ys.subscribe(res3);});
-
- w.start();
-
- THEN("the output only contains items sent while subscribed"){
- auto required = rxu::to_vector({
- on.next(220, 1),
- on.next(280, 2),
- on.next(290, 3),
- on.next(340, 4),
- on.next(360, 5),
- on.next(370, 6),
- on.next(390, 7),
- on.next(410, 8),
- on.next(430, 9),
- on.next(450, 10),
- on.next(520, 11),
- on.next(560, 12),
- on.completed(650)
- });
- auto actual = res1.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- THEN("the output only contains items sent while subscribed"){
- auto required = rxu::to_vector({
- on.next(400, 1),
- on.next(400, 2),
- on.next(400, 3),
- on.next(400, 4),
- on.next(400, 5),
- on.next(400, 6),
- on.next(400, 7),
- on.next(410, 8),
- on.next(430, 9),
- on.next(450, 10),
- on.next(520, 11),
- on.next(560, 12),
- on.completed(650)
- });
- auto actual = res2.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- THEN("the output only contains items sent while subscribed"){
- auto required = rxu::to_vector({
- on.next(600, 1),
- on.next(600, 2),
- on.next(600, 3),
- on.next(600, 4),
- on.next(600, 5),
- on.next(600, 6),
- on.next(600, 7),
- on.next(600, 8),
- on.next(600, 9),
- on.next(600, 10),
- on.next(600, 11),
- on.next(600, 12),
- on.completed(650)
- });
- auto actual = res3.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- THEN("there was 1 subscription/unsubscription"){
- auto required = rxu::to_vector({
- on.subscribe(200, 650)
- });
- auto actual = xs.subscriptions();
- REQUIRE(required == actual);
- }
-
- }
- }
-}
-
-SCENARIO("replay multiple subscriptions with count", "[replay][multicast][subject][operators]"){
- GIVEN("a test hot observable of ints"){
- auto sc = rxsc::make_test();
- auto w = sc.create_worker();
- const rxsc::test::messages<int> on;
-
- auto xs = sc.make_hot_observable({
- on.next(110, 0),
- on.next(220, 1),
- on.next(280, 2),
- on.next(290, 3),
- on.next(340, 4),
- on.next(360, 5),
- on.next(370, 6),
- on.next(390, 7),
- on.next(410, 8),
- on.next(430, 9),
- on.next(450, 10),
- on.next(520, 11),
- on.next(560, 12),
- on.completed(650)
- });
-
- rx::connectable_observable<int> ys;
-
- WHEN("subscribed and then connected"){
-
- // Create connectable observable
- w.schedule_absolute(rxsc::test::created_time,
- [&ys, &xs](const rxsc::schedulable&){
- ys = xs.replay(3).as_dynamic();
- });
-
- // Manage connection
- rx::composite_subscription connection;
- w.schedule_absolute(rxsc::test::subscribed_time,
- [connection, &ys](const rxsc::schedulable&){
- ys.connect(connection);
- });
- w.schedule_absolute(rxsc::test::unsubscribed_time,
- [connection](const rxsc::schedulable&){
- connection.unsubscribe();
- });
-
- // Subscribe before the first item emitted
- auto res1 = w.make_subscriber<int>();
- w.schedule_absolute(200, [&ys, &res1](const rxsc::schedulable&){ys.subscribe(res1);});
-
- // Subscribe in the middle of emitting
- auto res2 = w.make_subscriber<int>();
- w.schedule_absolute(400, [&ys, &res2](const rxsc::schedulable&){ys.subscribe(res2);});
-
- // Subscribe after the last item emitted
- auto res3 = w.make_subscriber<int>();
- w.schedule_absolute(600, [&ys, &res3](const rxsc::schedulable&){ys.subscribe(res3);});
-
- w.start();
-
- THEN("the output only contains items sent while subscribed"){
- auto required = rxu::to_vector({
- on.next(220, 1),
- on.next(280, 2),
- on.next(290, 3),
- on.next(340, 4),
- on.next(360, 5),
- on.next(370, 6),
- on.next(390, 7),
- on.next(410, 8),
- on.next(430, 9),
- on.next(450, 10),
- on.next(520, 11),
- on.next(560, 12),
- on.completed(650)
- });
- auto actual = res1.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- THEN("the output only contains items sent while subscribed"){
- auto required = rxu::to_vector({
- on.next(400, 5),
- on.next(400, 6),
- on.next(400, 7),
- on.next(410, 8),
- on.next(430, 9),
- on.next(450, 10),
- on.next(520, 11),
- on.next(560, 12),
- on.completed(650)
- });
- auto actual = res2.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- THEN("the output only contains items sent while subscribed"){
- auto required = rxu::to_vector({
- on.next(600, 10),
- on.next(600, 11),
- on.next(600, 12),
- on.completed(650)
- });
- auto actual = res3.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- THEN("there was 1 subscription/unsubscription"){
- auto required = rxu::to_vector({
- on.subscribe(200, 650)
- });
- auto actual = xs.subscriptions();
- REQUIRE(required == actual);
- }
-
- }
- }
-}
-
-SCENARIO("replay multiple subscriptions with time", "[replay][multicast][subject][operators]"){
- GIVEN("a test hot observable of ints"){
- auto sc = rxsc::make_test();
- auto w = sc.create_worker();
- auto so = rx::identity_one_worker(sc);
- const rxsc::test::messages<int> on;
-
- auto xs = sc.make_hot_observable({
- on.next(110, 0),
- on.next(220, 1),
- on.next(240, 2),
- on.next(260, 3),
- on.next(340, 4),
- on.next(360, 5),
- on.next(370, 6),
- on.next(390, 7),
- on.next(410, 8),
- on.next(430, 9),
- on.next(450, 10),
- on.next(520, 11),
- on.next(560, 12),
- on.completed(650)
- });
-
- rx::connectable_observable<int> ys;
-
- WHEN("subscribed and then connected"){
- using namespace std::chrono;
-
- // Create connectable observable
- w.schedule_absolute(rxsc::test::created_time,
- [&](const rxsc::schedulable&){
- ys = xs.replay(milliseconds(100), so).as_dynamic();
- });
-
- // Manage connection
- rx::composite_subscription connection;
- w.schedule_absolute(rxsc::test::subscribed_time,
- [connection, &ys](const rxsc::schedulable&){
- ys.connect(connection);
- });
- w.schedule_absolute(rxsc::test::unsubscribed_time,
- [connection](const rxsc::schedulable&){
- connection.unsubscribe();
- });
-
- // Subscribe before the first item emitted
- auto res1 = w.make_subscriber<int>();
- w.schedule_absolute(200, [&ys, &res1](const rxsc::schedulable&){ys.subscribe(res1);});
-
- // Subscribe in the middle of emitting
- auto res2 = w.make_subscriber<int>();
- w.schedule_absolute(400, [&ys, &res2](const rxsc::schedulable&){ys.subscribe(res2);});
-
- // Subscribe after the last item emitted
- auto res3 = w.make_subscriber<int>();
- w.schedule_absolute(600, [&ys, &res3](const rxsc::schedulable&){ys.subscribe(res3);});
-
- w.start();
-
- THEN("the output only contains items sent while subscribed"){
- auto required = rxu::to_vector({
- on.next(220, 1),
- on.next(240, 2),
- on.next(260, 3),
- on.next(340, 4),
- on.next(360, 5),
- on.next(370, 6),
- on.next(390, 7),
- on.next(410, 8),
- on.next(430, 9),
- on.next(450, 10),
- on.next(520, 11),
- on.next(560, 12),
- on.completed(650)
- });
- auto actual = res1.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- THEN("the output only contains items sent while subscribed"){
- auto required = rxu::to_vector({
- on.next(400, 4),
- on.next(400, 5),
- on.next(400, 6),
- on.next(400, 7),
- on.next(410, 8),
- on.next(430, 9),
- on.next(450, 10),
- on.next(520, 11),
- on.next(560, 12),
- on.completed(650)
- });
- auto actual = res2.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- THEN("the output only contains items sent while subscribed"){
- auto required = rxu::to_vector({
- on.next(600, 11),
- on.next(600, 12),
- on.completed(650)
- });
- auto actual = res3.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- THEN("there was 1 subscription/unsubscription"){
- auto required = rxu::to_vector({
- on.subscribe(200, 650)
- });
- auto actual = xs.subscriptions();
- REQUIRE(required == actual);
- }
-
- }
- }
-}
-
-SCENARIO("replay multiple subscriptions with count and time", "[replay][multicast][subject][operators]"){
- GIVEN("a test hot observable of ints"){
- auto sc = rxsc::make_test();
- auto w = sc.create_worker();
- auto so = rx::identity_one_worker(sc);
- const rxsc::test::messages<int> on;
-
- auto xs = sc.make_hot_observable({
- on.next(110, 0),
- on.next(220, 1),
- on.next(240, 2),
- on.next(260, 3),
- on.next(340, 4),
- on.next(360, 5),
- on.next(370, 6),
- on.next(390, 7),
- on.next(410, 8),
- on.next(430, 9),
- on.next(450, 10),
- on.next(520, 11),
- on.next(560, 12),
- on.completed(650)
- });
-
- rx::connectable_observable<int> ys;
-
- WHEN("subscribed and then connected"){
- using namespace std::chrono;
-
- // Create connectable observable
- w.schedule_absolute(rxsc::test::created_time,
- [&](const rxsc::schedulable&){
- ys = xs.replay(3, milliseconds(100), so).as_dynamic();
- });
-
- // Manage connection
- rx::composite_subscription connection;
- w.schedule_absolute(rxsc::test::subscribed_time,
- [connection, &ys](const rxsc::schedulable&){
- ys.connect(connection);
- });
- w.schedule_absolute(rxsc::test::unsubscribed_time,
- [connection](const rxsc::schedulable&){
- connection.unsubscribe();
- });
-
- // Subscribe before the first item emitted
- auto res1 = w.make_subscriber<int>();
- w.schedule_absolute(200, [&ys, &res1](const rxsc::schedulable&){ys.subscribe(res1);});
-
- // Subscribe in the middle of emitting
- auto res2 = w.make_subscriber<int>();
- w.schedule_absolute(400, [&ys, &res2](const rxsc::schedulable&){ys.subscribe(res2);});
-
- // Subscribe after the last item emitted
- auto res3 = w.make_subscriber<int>();
- w.schedule_absolute(600, [&ys, &res3](const rxsc::schedulable&){ys.subscribe(res3);});
-
- w.start();
-
- THEN("the output only contains items sent while subscribed"){
- auto required = rxu::to_vector({
- on.next(220, 1),
- on.next(240, 2),
- on.next(260, 3),
- on.next(340, 4),
- on.next(360, 5),
- on.next(370, 6),
- on.next(390, 7),
- on.next(410, 8),
- on.next(430, 9),
- on.next(450, 10),
- on.next(520, 11),
- on.next(560, 12),
- on.completed(650)
- });
- auto actual = res1.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- THEN("the output only contains items sent while subscribed"){
- auto required = rxu::to_vector({
- on.next(400, 5),
- on.next(400, 6),
- on.next(400, 7),
- on.next(410, 8),
- on.next(430, 9),
- on.next(450, 10),
- on.next(520, 11),
- on.next(560, 12),
- on.completed(650)
- });
- auto actual = res2.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- THEN("the output only contains items sent while subscribed"){
- auto required = rxu::to_vector({
- on.next(600, 11),
- on.next(600, 12),
- on.completed(650)
- });
- auto actual = res3.get_observer().messages();
- REQUIRE(required == actual);
- }
-
- THEN("there was 1 subscription/unsubscription"){
- auto required = rxu::to_vector({
- on.subscribe(200, 650)
- });
- auto actual = xs.subscriptions();
- REQUIRE(required == actual);
- }
-
- }
- }
-}