Skip to content

Commit 52a0ad7

Browse files
committed
Implement observable subject
1 parent 1bc054b commit 52a0ad7

File tree

4 files changed

+131
-0
lines changed

4 files changed

+131
-0
lines changed

observable.d.ts

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from "./dist/observable"

observable.js

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
module.exports = require("./dist/observable")

src/observable.ts

+46
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import Observable from "zen-observable"
2+
3+
const $observers = Symbol("observers")
4+
5+
/**
6+
* Observable subject. Implements the Observable interface, but also exposes
7+
* the `next()`, `error()`, `complete()` methods to initiate observable
8+
* updates "from the outside".
9+
*
10+
* Use `Observable.from(subject)` to derive an observable that proxies all
11+
* values, errors and the completion raised on this subject, but does not
12+
* expose the `next()`, `error()`, `complete()` methods.
13+
*/
14+
class Subject<T> extends Observable<T> implements ZenObservable.ObservableLike<T> {
15+
private [$observers]: Array<ZenObservable.SubscriptionObserver<T>>
16+
17+
constructor() {
18+
super(observer => {
19+
this[$observers] = [
20+
...(this[$observers] || []),
21+
observer
22+
]
23+
const unsubscribe = () => {
24+
this[$observers] = this[$observers].filter(someObserver => someObserver !== observer)
25+
}
26+
return unsubscribe
27+
})
28+
}
29+
30+
public complete() {
31+
this[$observers].forEach(observer => observer.complete())
32+
}
33+
34+
public error(error: any) {
35+
this[$observers].forEach(observer => observer.error(error))
36+
}
37+
38+
public next(value: T) {
39+
this[$observers].forEach(observer => observer.next(value))
40+
}
41+
}
42+
43+
export {
44+
Observable,
45+
Subject
46+
}

test/observable.test.ts

+83
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
import test from "ava"
2+
import { Observable, Subject } from "../src/observable"
3+
4+
test("Observable subject emits values and completion event", async t => {
5+
let completed1 = false
6+
const values1: number[] = []
7+
let completed2 = false
8+
const values2: number[] = []
9+
let completed3 = false
10+
const values3: number[] = []
11+
12+
const subject = new Subject<number>()
13+
const observable = Observable.from(subject)
14+
15+
const subscription1 = subject.subscribe(
16+
value => values1.push(value),
17+
undefined,
18+
() => completed1 = true
19+
)
20+
subject.subscribe(
21+
value => values2.push(value),
22+
undefined,
23+
() => completed2 = true
24+
)
25+
observable.subscribe(
26+
value => values3.push(value),
27+
undefined,
28+
() => completed3 = true
29+
)
30+
31+
subject.next(1)
32+
subscription1.unsubscribe()
33+
34+
subject.next(2)
35+
subject.complete()
36+
37+
t.deepEqual(values1, [1])
38+
t.deepEqual(values2, [1, 2])
39+
t.deepEqual(values3, [1, 2])
40+
t.is(completed1, false)
41+
t.is(completed2, true)
42+
t.is(completed3, true)
43+
})
44+
45+
test("Observable subject propagates errors", async t => {
46+
let completed1 = false
47+
let error1: Error | undefined
48+
let completed2 = false
49+
let error2: Error | undefined
50+
let completed3 = false
51+
let error3: Error | undefined
52+
53+
const subject = new Subject<number>()
54+
const observable = Observable.from(subject)
55+
56+
const subscription1 = subject.subscribe(
57+
() => undefined,
58+
error => error1 = error,
59+
() => completed1 = true
60+
)
61+
subject.subscribe(
62+
() => undefined,
63+
error => error2 = error,
64+
() => completed2 = true
65+
)
66+
observable.subscribe(
67+
() => undefined,
68+
error => error3 = error,
69+
() => completed3 = true
70+
)
71+
72+
const testingError = Error("Test, test!")
73+
74+
subscription1.unsubscribe()
75+
subject.error(testingError)
76+
77+
t.is(completed1, false)
78+
t.is(error1, undefined)
79+
t.is(completed2, false)
80+
t.is(error2, testingError)
81+
t.is(completed3, false)
82+
t.is(error3, testingError)
83+
})

0 commit comments

Comments
 (0)