Skip to content

Added singleInstance operator #185

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions demo/share/singleInstance.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php

require_once __DIR__ . '/../bootstrap.php';

$interval = Rx\Observable::interval(1000);

$source = $interval
->take(2)
->do(function () {
echo 'Side effect', PHP_EOL;
});

$single = $source->singleInstance();

// two simultaneous subscriptions, lasting 2 seconds
$single->subscribe($createStdoutObserver('SourceA '));
$single->subscribe($createStdoutObserver('SourceB '));

\Rx\Observable::timer(5000)->subscribe(function () use ($single, &$createStdoutObserver) {
// resubscribe two times again, more than 5 seconds later,
// long after the original two subscriptions have ended
$single->subscribe($createStdoutObserver('SourceC '));
$single->subscribe($createStdoutObserver('SourceD '));
});
16 changes: 16 additions & 0 deletions demo/share/singleInstance.php.expect
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
Side effect
SourceA Next value: 0
SourceB Next value: 0
Side effect
SourceA Next value: 1
SourceB Next value: 1
SourceA Complete!
SourceB Complete!
Side effect
SourceC Next value: 0
SourceD Next value: 0
Side effect
SourceC Next value: 1
SourceD Next value: 1
SourceC Complete!
SourceD Complete!
38 changes: 37 additions & 1 deletion src/Observable.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
use Rx\Observable\ConnectableObservable;
use Rx\Observable\EmptyObservable;
use Rx\Observable\ErrorObservable;
use Rx\Observable\FromPromiseObservable;
use Rx\Observable\ForkJoinObservable;
use Rx\Observable\IntervalObservable;
use Rx\Observable\IteratorObservable;
Expand Down Expand Up @@ -1312,6 +1311,43 @@ public function share(): RefCountObservable
return $this->publish()->refCount();
}

/**
* Returns an observable sequence that shares a single subscription to the underlying sequence. This observable sequence
* can be resubscribed to, even if all prior subscriptions have ended.
*
* This operator behaves like share() in RxJS 5
*
* @return \Rx\Observable An observable sequence that contains the elements of a sequence
* produced by multicasting the source sequence.
*
* @demo share/singleInstance.php
* @operator
* @reactivex refcount
*/
public function singleInstance(): Observable
{
$hasObservable = false;
$observable = null;
$source = $this;

$getObservable = function () use (&$hasObservable, &$observable, $source): Observable {
if (!$hasObservable) {
$hasObservable = true;
$observable = $source
->finally(function () use (&$hasObservable) {
$hasObservable = false;
})
->publish()
->refCount();
}
return $observable;
};

return new Observable\AnonymousObservable(function (ObserverInterface $o) use ($getObservable) {
return $getObservable()->subscribe($o);
});
}

/**
* Returns an observable sequence that shares a single subscription to the underlying sequence and starts with an
* initialValue.
Expand Down
120 changes: 120 additions & 0 deletions test/Rx/Functional/Operator/SingleInstanceTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
<?php

declare(strict_types=1);

namespace Rx\Functional\Operator;

use Rx\Disposable\CompositeDisposable;
use Rx\Disposable\SerialDisposable;
use Rx\Functional\FunctionalTestCase;

class SingleInstanceTest extends FunctionalTestCase
{
/**
* @test
*/
public function singleInstance_basic()
{
$xs = $this->createColdObservable([
onNext(100, 1),
onNext(150, 2),
onNext(200, 3),
onCompleted(250)
]);

$ys = null;
$results1 = $this->scheduler->createObserver();
$results2 = $this->scheduler->createObserver();
$disposable = null;

$this->scheduler->scheduleAbsolute($this->scheduler::CREATED, function () use (&$ys, $xs) {
$ys = $xs->singleInstance();
});

$this->scheduler->scheduleAbsolute($this->scheduler::SUBSCRIBED, function () use (&$ys, &$disposable, $results1, $results2) {
$disposable = new CompositeDisposable([
$ys->subscribe($results1),
$ys->subscribe($results2)
]);
});

$this->scheduler->scheduleAbsolute($this->scheduler::DISPOSED, function () use (&$disposable) {
$disposable->dispose();
});

$this->scheduler->start();

$this->assertMessages([
onNext(300, 1),
onNext(350, 2),
onNext(400, 3),
onCompleted(450)
], $results1->getMessages());

$this->assertMessages([
onNext(300, 1),
onNext(350, 2),
onNext(400, 3),
onCompleted(450)
], $results2->getMessages());

$this->assertSubscriptions([
subscribe(200, 450)
], $xs->getSubscriptions());
}

/**
* @test
*/
public function singleInstance_subscribe_after_stopped()
{
$xs = $this->createColdObservable([
onNext(100, 1),
onNext(150, 2),
onNext(200, 3),
onCompleted(250)
]);

$ys = null;
$results1 = $this->scheduler->createObserver();
$results2 = $this->scheduler->createObserver();
$disposable = new SerialDisposable();

$this->scheduler->scheduleAbsolute(100, function () use (&$ys, $xs) {
$ys = $xs->singleInstance();
});

$this->scheduler->scheduleAbsolute(200, function () use (&$ys, $disposable, $results1) {
$disposable->setDisposable($ys->subscribe($results1));
});

$this->scheduler->scheduleAbsolute(600, function () use (&$ys, $disposable, $results2) {
$disposable->setDisposable($ys->subscribe($results2));
});

$this->scheduler->scheduleAbsolute(900, function () use (&$disposable) {
$disposable->dispose();
});

$this->scheduler->start();

$this->assertMessages([
onNext(300, 1),
onNext(350, 2),
onNext(400, 3),
onCompleted(450)
], $results1->getMessages());

$this->assertMessages([
onNext(700, 1),
onNext(750, 2),
onNext(800, 3),
onCompleted(850)
], $results2->getMessages());

$this->assertSubscriptions([
subscribe(200, 450),
subscribe(600, 850)
], $xs->getSubscriptions());
}
}