diff --git a/demo/share/singleInstance.php b/demo/share/singleInstance.php new file mode 100644 index 00000000..5ab9bf0e --- /dev/null +++ b/demo/share/singleInstance.php @@ -0,0 +1,24 @@ +take(2) + ->do(function () { + echo 'Side effect', PHP_EOL; + }); + +$single = $source->singleInstance(); + +// two simultaneous subscriptions, lasting 2 seconds +$single->subscribe($createStdoutObserver('SourceA ')); +$single->subscribe($createStdoutObserver('SourceB ')); + +\Rx\Observable::timer(5000)->subscribe(function () use ($single, &$createStdoutObserver) { + // resubscribe two times again, more than 5 seconds later, + // long after the original two subscriptions have ended + $single->subscribe($createStdoutObserver('SourceC ')); + $single->subscribe($createStdoutObserver('SourceD ')); +}); diff --git a/demo/share/singleInstance.php.expect b/demo/share/singleInstance.php.expect new file mode 100644 index 00000000..ef9f94ff --- /dev/null +++ b/demo/share/singleInstance.php.expect @@ -0,0 +1,16 @@ +Side effect +SourceA Next value: 0 +SourceB Next value: 0 +Side effect +SourceA Next value: 1 +SourceB Next value: 1 +SourceA Complete! +SourceB Complete! +Side effect +SourceC Next value: 0 +SourceD Next value: 0 +Side effect +SourceC Next value: 1 +SourceD Next value: 1 +SourceC Complete! +SourceD Complete! diff --git a/src/Observable.php b/src/Observable.php index 1130df88..5b2ab91c 100644 --- a/src/Observable.php +++ b/src/Observable.php @@ -11,7 +11,6 @@ use Rx\Observable\ConnectableObservable; use Rx\Observable\EmptyObservable; use Rx\Observable\ErrorObservable; -use Rx\Observable\FromPromiseObservable; use Rx\Observable\ForkJoinObservable; use Rx\Observable\IntervalObservable; use Rx\Observable\IteratorObservable; @@ -1312,6 +1311,43 @@ public function share(): RefCountObservable return $this->publish()->refCount(); } + /** + * Returns an observable sequence that shares a single subscription to the underlying sequence. This observable sequence + * can be resubscribed to, even if all prior subscriptions have ended. + * + * This operator behaves like share() in RxJS 5 + * + * @return \Rx\Observable An observable sequence that contains the elements of a sequence + * produced by multicasting the source sequence. + * + * @demo share/singleInstance.php + * @operator + * @reactivex refcount + */ + public function singleInstance(): Observable + { + $hasObservable = false; + $observable = null; + $source = $this; + + $getObservable = function () use (&$hasObservable, &$observable, $source): Observable { + if (!$hasObservable) { + $hasObservable = true; + $observable = $source + ->finally(function () use (&$hasObservable) { + $hasObservable = false; + }) + ->publish() + ->refCount(); + } + return $observable; + }; + + return new Observable\AnonymousObservable(function (ObserverInterface $o) use ($getObservable) { + return $getObservable()->subscribe($o); + }); + } + /** * Returns an observable sequence that shares a single subscription to the underlying sequence and starts with an * initialValue. diff --git a/test/Rx/Functional/Operator/SingleInstanceTest.php b/test/Rx/Functional/Operator/SingleInstanceTest.php new file mode 100644 index 00000000..49c16224 --- /dev/null +++ b/test/Rx/Functional/Operator/SingleInstanceTest.php @@ -0,0 +1,120 @@ +createColdObservable([ + onNext(100, 1), + onNext(150, 2), + onNext(200, 3), + onCompleted(250) + ]); + + $ys = null; + $results1 = $this->scheduler->createObserver(); + $results2 = $this->scheduler->createObserver(); + $disposable = null; + + $this->scheduler->scheduleAbsolute($this->scheduler::CREATED, function () use (&$ys, $xs) { + $ys = $xs->singleInstance(); + }); + + $this->scheduler->scheduleAbsolute($this->scheduler::SUBSCRIBED, function () use (&$ys, &$disposable, $results1, $results2) { + $disposable = new CompositeDisposable([ + $ys->subscribe($results1), + $ys->subscribe($results2) + ]); + }); + + $this->scheduler->scheduleAbsolute($this->scheduler::DISPOSED, function () use (&$disposable) { + $disposable->dispose(); + }); + + $this->scheduler->start(); + + $this->assertMessages([ + onNext(300, 1), + onNext(350, 2), + onNext(400, 3), + onCompleted(450) + ], $results1->getMessages()); + + $this->assertMessages([ + onNext(300, 1), + onNext(350, 2), + onNext(400, 3), + onCompleted(450) + ], $results2->getMessages()); + + $this->assertSubscriptions([ + subscribe(200, 450) + ], $xs->getSubscriptions()); + } + + /** + * @test + */ + public function singleInstance_subscribe_after_stopped() + { + $xs = $this->createColdObservable([ + onNext(100, 1), + onNext(150, 2), + onNext(200, 3), + onCompleted(250) + ]); + + $ys = null; + $results1 = $this->scheduler->createObserver(); + $results2 = $this->scheduler->createObserver(); + $disposable = new SerialDisposable(); + + $this->scheduler->scheduleAbsolute(100, function () use (&$ys, $xs) { + $ys = $xs->singleInstance(); + }); + + $this->scheduler->scheduleAbsolute(200, function () use (&$ys, $disposable, $results1) { + $disposable->setDisposable($ys->subscribe($results1)); + }); + + $this->scheduler->scheduleAbsolute(600, function () use (&$ys, $disposable, $results2) { + $disposable->setDisposable($ys->subscribe($results2)); + }); + + $this->scheduler->scheduleAbsolute(900, function () use (&$disposable) { + $disposable->dispose(); + }); + + $this->scheduler->start(); + + $this->assertMessages([ + onNext(300, 1), + onNext(350, 2), + onNext(400, 3), + onCompleted(450) + ], $results1->getMessages()); + + $this->assertMessages([ + onNext(700, 1), + onNext(750, 2), + onNext(800, 3), + onCompleted(850) + ], $results2->getMessages()); + + $this->assertSubscriptions([ + subscribe(200, 450), + subscribe(600, 850) + ], $xs->getSubscriptions()); + } +}