Skip to content

Commit 302febb

Browse files
committed
Implemented connectable observables
1 parent d48534f commit 302febb

15 files changed

+4155
-0
lines changed

demo/multicast/multicast.php

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
require_once __DIR__.'/../bootstrap.php';
4+
5+
$subject = new \Rx\Subject\Subject();
6+
$source = (new Rx\Observable\ArrayObservable(range(0, 2)))->multicast($subject);
7+
8+
$subscription = $source->subscribe($stdoutObserver);
9+
$subject->subscribe($stdoutObserver);
10+
11+
$connected = $source->connect();
12+
13+
$subscription->dispose();
14+
15+
//Next value: 0
16+
//Next value: 0
17+
//Next value: 1
18+
//Next value: 1
19+
//Next value: 2
20+
//Next value: 2
21+
//Complete!

demo/publish/publish.php

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
<?php
2+
3+
require_once __DIR__.'/../bootstrap.php';
4+
5+
/* Without publish */
6+
$interval = new Rx\Observable\ArrayObservable(range(0, 10));
7+
8+
9+
$source = $interval
10+
->take(2)
11+
->doOnNext(function ($x) {
12+
echo "Side effect\n";
13+
});
14+
15+
$source->subscribe($createStdoutObserver('SourceA '));
16+
$source->subscribe($createStdoutObserver('SourceB '));
17+
18+
//Side effect
19+
//SourceA Next value: 0
20+
//Side effect
21+
//SourceA Next value: 1
22+
//SourceA Complete!
23+
24+
25+
/* With publish */
26+
$interval = new Rx\Observable\ArrayObservable(range(0, 10));
27+
28+
29+
$source = $interval
30+
->take(2)
31+
->doOnNext(function ($x) {
32+
echo "Side effect\n";
33+
});
34+
35+
$published = $source->publish();
36+
37+
$published->subscribe($createStdoutObserver('SourceC '));
38+
$published->subscribe($createStdoutObserver('SourceD '));
39+
40+
41+
$published->connect();
42+
43+
//Side effect
44+
//SourceC Next value: 0
45+
//SourceD Next value: 0
46+
//Side effect
47+
//SourceC Next value: 1
48+
//SourceD Next value: 1
49+
//SourceC Complete!
50+
//SourceD Complete!

demo/refcount/refcount.php

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<?php
2+
3+
require_once __DIR__.'/../bootstrap.php';
4+
5+
6+
//todo
7+
+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
<?php
2+
3+
namespace Rx\Disposable;
4+
5+
use Rx\DisposableInterface;
6+
7+
/**
8+
* Class BinaryDisposable
9+
* @package Rx\Disposable
10+
*/
11+
class BinaryDisposable implements DisposableInterface
12+
{
13+
/** @var \Rx\DisposableInterface */
14+
private $first;
15+
16+
/** @var \Rx\DisposableInterface */
17+
private $second;
18+
19+
/** @var bool */
20+
protected $isDisposed = false;
21+
22+
/**
23+
* BinaryDisposable constructor.
24+
* @param $first
25+
* @param $second
26+
*/
27+
public function __construct(DisposableInterface $first, DisposableInterface $second)
28+
{
29+
$this->first = $first;
30+
$this->second = $second;
31+
}
32+
33+
/**
34+
*
35+
*/
36+
public function dispose()
37+
{
38+
if ($this->isDisposed) {
39+
return;
40+
}
41+
42+
$this->isDisposed = true;
43+
44+
$old1 = $this->first;
45+
$this->first = null;
46+
if ($old1) {
47+
$old1->dispose();
48+
}
49+
50+
$old2 = $this->second;
51+
$this->second = null;
52+
if ($old2) {
53+
$old2->dispose();
54+
}
55+
56+
}
57+
58+
/**
59+
* @return bool
60+
*/
61+
public function isDisposed()
62+
{
63+
return $this->isDisposed;
64+
}
65+
}

lib/Rx/Observable/BaseObservable.php

+123
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
use Rx\Disposable\SingleAssignmentDisposable;
2727
use Rx\SchedulerInterface;
2828
use Rx\Subject\AsyncSubject;
29+
use Rx\Subject\BehaviorSubject;
30+
use Rx\Subject\ReplaySubject;
2931
use Rx\Subject\Subject;
3032
use Rx\Disposable\RefCountDisposable;
3133
use Rx\Disposable\EmptyDisposable;
@@ -636,4 +638,125 @@ public static function defer($factory){
636638
return (new EmptyObservable())->lift(new DeferOperator($factory));
637639
}
638640

641+
/**
642+
* Multicasts the source sequence notifications through an instantiated subject into all uses of the sequence within a selector function. Each
643+
* subscription to the resulting sequence causes a separate multicast invocation, exposing the sequence resulting from the selector function's
644+
* invocation. For specializations with fixed subject types, see Publish, PublishLast, and Replay.
645+
*
646+
* @param $subjectOrSubjectSelector
647+
* @param null $selector
648+
* @return \Rx\Observable\ConnectableObservable|\Rx\Observable\MulticastObservable
649+
*/
650+
public function multicast($subjectOrSubjectSelector, $selector = null)
651+
{
652+
return is_callable($subjectOrSubjectSelector) ?
653+
new MulticastObservable($this, $subjectOrSubjectSelector, $selector) :
654+
new ConnectableObservable($this, $subjectOrSubjectSelector);
655+
}
656+
657+
/**
658+
* Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence.
659+
* This operator is a specialization of Multicast using a regular Subject.
660+
*
661+
* @param callable|null $selector
662+
* @return \Rx\Observable\ConnectableObservable|\Rx\Observable\MulticastObservable
663+
*/
664+
public function publish(callable $selector = null)
665+
{
666+
return $selector ?
667+
new MulticastObservable($this, function () {
668+
return new Subject();
669+
}, $selector) :
670+
$this->multicast(new Subject());
671+
}
672+
673+
/**
674+
* Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence containing only the last notification.
675+
* This operator is a specialization of Multicast using a AsyncSubject.
676+
*
677+
* @param callable|null $selector
678+
* @return \Rx\Observable\ConnectableObservable|\Rx\Observable\MulticastObservable
679+
*/
680+
public function publishLast(callable $selector = null)
681+
{
682+
return $selector ?
683+
new MulticastObservable($this, function () {
684+
return new AsyncSubject();
685+
}, $selector) :
686+
$this->multicast(new AsyncSubject());
687+
}
688+
689+
/**
690+
* Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence and starts with initialValue.
691+
* This operator is a specialization of Multicast using a BehaviorSubject.
692+
*
693+
* @param $initialValueOrSelector
694+
* @param null $initialValue
695+
* @return \Rx\Observable\ConnectableObservable|\Rx\Observable\MulticastObservable
696+
*/
697+
public function publishValue($initialValueOrSelector, $initialValue = null)
698+
{
699+
return $initialValue ?
700+
$this->multicast(function () use ($initialValue) {
701+
return new BehaviorSubject($initialValue);
702+
}, $initialValueOrSelector) :
703+
$this->multicast(new BehaviorSubject($initialValueOrSelector));
704+
}
705+
706+
/**
707+
* Returns an observable sequence that shares a single subscription to the underlying sequence.
708+
* This operator is a specialization of publish which creates a subscription when the number of observers goes from zero to one, then shares that subscription with all subsequent observers until the number of observers returns to zero, at which point the subscription is disposed.
709+
*
710+
* @return \Rx\Observable\RefCountObservable An observable sequence that contains the elements of a sequence produced by multicasting the source sequence.,mk
711+
*/
712+
public function share()
713+
{
714+
return $this->publish()->refCount();
715+
}
716+
717+
/**
718+
* Returns an observable sequence that shares a single subscription to the underlying sequence and starts with an initialValue.
719+
* This operator is a specialization of publishValue which creates a subscription when the number of observers goes from zero to one, then shares that subscription with all subsequent observers until the number of observers returns to zero, at which point the subscription is disposed.
720+
*
721+
* @param $initialValue
722+
* @return \Rx\Observable\RefCountObservable
723+
*/
724+
public function shareValue($initialValue)
725+
{
726+
return $this->publish($initialValue)->refCount();
727+
}
728+
729+
/**
730+
* Returns an observable sequence that shares a single subscription to the underlying sequence replaying notifications subject to a maximum time length for the replay buffer.
731+
* This operator is a specialization of replay which creates a subscription when the number of observers goes from zero to one, then shares that subscription with all subsequent observers until the number of observers returns to zero, at which point the subscription is disposed.
732+
*
733+
* @param $bufferSize
734+
* @param $windowSize
735+
* @param $scheduler
736+
* @return \Rx\Observable\RefCountObservable
737+
*/
738+
public function shareReplay($bufferSize, $windowSize, $scheduler)
739+
{
740+
return $this->replay(null, $bufferSize, $windowSize, $scheduler)->refCount();
741+
}
742+
743+
/**
744+
* Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence replaying notifications subject to a maximum time length for the replay buffer.
745+
* This operator is a specialization of Multicast using a ReplaySubject.
746+
*
747+
* @param callable|null $selector
748+
* @param null $bufferSize
749+
* @param null $windowSize
750+
* @param \Rx\SchedulerInterface|null $scheduler
751+
* @return \Rx\Observable\ConnectableObservable|\Rx\Observable\MulticastObservable
752+
*/
753+
public function replay(callable $selector = null, $bufferSize = null, $windowSize = null, SchedulerInterface $scheduler = null)
754+
{
755+
return $selector ?
756+
$this->multicast(function () use ($bufferSize, $windowSize, $scheduler) {
757+
return new ReplaySubject($bufferSize, $windowSize, $scheduler);
758+
}, $selector) :
759+
$this->multicast(new ReplaySubject($bufferSize, $windowSize, $scheduler));
760+
}
761+
639762
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
<?php
2+
3+
namespace Rx\Observable;
4+
5+
use Rx\Disposable\BinaryDisposable;
6+
use Rx\Disposable\CallbackDisposable;
7+
use Rx\ObservableInterface;
8+
use Rx\ObserverInterface;
9+
use Rx\Subject\Subject;
10+
11+
/**
12+
* Class ConnectableObservable
13+
* @package Rx\Observable
14+
*/
15+
class ConnectableObservable extends BaseObservable
16+
{
17+
/** @var \Rx\Subject\Subject */
18+
protected $subject;
19+
20+
/** @var BinaryDisposable */
21+
protected $subscription;
22+
23+
/** @var ObserverInterface */
24+
protected $sourceObservable;
25+
26+
/** @var bool */
27+
public $hasSubscription;
28+
29+
/**
30+
* ConnectableObservable constructor.
31+
* @param \Rx\ObservableInterface $source
32+
* @param \Rx\Subject\Subject $subject
33+
*/
34+
public function __construct(ObservableInterface $source, Subject $subject = null)
35+
{
36+
$this->sourceObservable = $source->asObservable();
37+
$this->subject = $subject ?: new Subject();
38+
$this->hasSubscription = false;
39+
}
40+
41+
/**
42+
* @param \Rx\ObserverInterface $observer
43+
* @param null $scheduler
44+
* @return \Rx\Disposable\CallbackDisposable|\Rx\Disposable\EmptyDisposable|\Rx\DisposableInterface|\Rx\Subject\InnerSubscriptionDisposable
45+
*/
46+
public function subscribe(ObserverInterface $observer, $scheduler = null)
47+
{
48+
return $this->subject->subscribe($observer, $scheduler);
49+
}
50+
51+
/**
52+
* @return \Rx\Disposable\BinaryDisposable
53+
*/
54+
public function connect()
55+
{
56+
57+
if ($this->hasSubscription) {
58+
return $this->subscription;
59+
}
60+
61+
$this->hasSubscription = true;
62+
63+
$isDisposed = false;
64+
65+
$connectableDisposable = new CallbackDisposable(function () use (&$isDisposed) {
66+
if ($isDisposed) {
67+
return;
68+
}
69+
$isDisposed = true;
70+
$this->hasSubscription = false;
71+
});
72+
73+
$this->subscription = new BinaryDisposable($this->sourceObservable->subscribe($this->subject), $connectableDisposable);
74+
75+
return $this->subscription;
76+
}
77+
78+
/**
79+
* @return \Rx\Observable\RefCountObservable
80+
*/
81+
public function refCount()
82+
{
83+
return new RefCountObservable($this);
84+
}
85+
86+
/**
87+
* @param $scheduler
88+
*/
89+
protected function doStart($scheduler)
90+
{
91+
}
92+
}

0 commit comments

Comments
 (0)