-
Notifications
You must be signed in to change notification settings - Fork 7.6k
/
Copy pathSafeSubscriber.java
206 lines (194 loc) · 8.27 KB
/
SafeSubscriber.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.observers;
import java.util.Arrays;
import rx.Subscriber;
import rx.exceptions.*;
import rx.plugins.*;
/**
* {@code SafeSubscriber} is a wrapper around {@code Subscriber} that ensures that the {@code Subscriber}
* complies with <a href="http://reactivex.io/documentation/contract.html">the Observable contract</a>.
* <p>
* The following is taken from <a href="http://go.microsoft.com/fwlink/?LinkID=205219">the Rx Design Guidelines
* document</a>:
* <blockquote><p>
* Messages sent to instances of the {@code IObserver} interface follow the following grammar:
* </p><blockquote><p> {@code OnNext* (OnCompleted | OnError)?} </p></blockquote><p>
* This grammar allows observable sequences to send any amount (0 or more) of {@code OnNext} messages to the
* subscriber, optionally followed by a single success ({@code OnCompleted}) or failure ({@code OnError})
* message.
* </p><p>
* The single message indicating that an observable sequence has finished ensures that consumers of the
* observable sequence can deterministically establish that it is safe to perform cleanup operations.
* </p><p>
* A single failure further ensures that abort semantics can be maintained for operators that work on
* multiple observable sequences (see paragraph 6.6).
* </p></blockquote>
* <p>
* This wrapper does the following:
* <ul>
* <li>Allows only single execution of either {@code onError} or {@code onCompleted}.</li>
* <li>Ensures that once an {@code onCompleted} or {@code onError} is performed, no further calls can be executed</li>
* <li>If {@code unsubscribe} is called, the upstream {@code Observable} is notified and the event delivery will be stopped in a
* best effort manner (i.e., further onXXX calls may still slip through).</li>
* <li>When {@code onError} or {@code onCompleted} occur, unsubscribes from the {@code Observable} (if executing asynchronously).</li>
* </ul>
* {@code SafeSubscriber} will not synchronize {@code onNext} execution. Use {@link SerializedSubscriber} to do
* that.
*
* @param <T>
* the type of item expected by the {@link Subscriber}
*/
public class SafeSubscriber<T> extends Subscriber<T> {
private final Subscriber<? super T> actual;
boolean done;
public SafeSubscriber(Subscriber<? super T> actual) {
super(actual);
this.actual = actual;
}
/**
* Notifies the Subscriber that the {@code Observable} has finished sending push-based notifications.
* <p>
* The {@code Observable} will not call this method if it calls {@link #onError}.
*/
@Override
public void onCompleted() {
if (!done) {
done = true;
try {
actual.onCompleted();
} catch (Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwIfFatal(e);
RxJavaHooks.onError(e);
throw new OnCompletedFailedException(e.getMessage(), e);
} finally { // NOPMD
try {
// Similarly to onError if failure occurs in unsubscribe then Rx contract is broken
// and we throw an UnsubscribeFailureException.
unsubscribe();
} catch (Throwable e) {
RxJavaHooks.onError(e);
throw new UnsubscribeFailedException(e.getMessage(), e);
}
}
}
}
/**
* Notifies the Subscriber that the {@code Observable} has experienced an error condition.
* <p>
* If the {@code Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onCompleted}.
*
* @param e
* the exception encountered by the Observable
*/
@Override
public void onError(Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwIfFatal(e);
if (!done) {
done = true;
_onError(e);
}
}
/**
* Provides the Subscriber with a new item to observe.
* <p>
* The {@code Observable} may call this method 0 or more times.
* <p>
* The {@code Observable} will not call this method again after it calls either {@link #onCompleted} or
* {@link #onError}.
*
* @param t
* the item emitted by the Observable
*/
@Override
public void onNext(T t) {
try {
if (!done) {
actual.onNext(t);
}
} catch (Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwOrReport(e, this);
}
}
/**
* The logic for {@code onError} without the {@code isFinished} check so it can be called from within
* {@code onCompleted}.
*
* @see <a href="https://github.com/ReactiveX/RxJava/issues/630">the report of this bug</a>
*/
@SuppressWarnings("deprecation")
protected void _onError(Throwable e) { // NOPMD
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
try {
actual.onError(e);
} catch (OnErrorNotImplementedException e2) { // NOPMD
/*
* onError isn't implemented so throw
*
* https://github.com/ReactiveX/RxJava/issues/198
*
* Rx Design Guidelines 5.2
*
* "when calling the Subscribe method that only has an onNext argument, the OnError behavior
* will be to rethrow the exception on the thread that the message comes out from the observable
* sequence. The OnCompleted behavior in this case is to do nothing."
*/
try {
unsubscribe();
} catch (Throwable unsubscribeException) {
RxJavaHooks.onError(unsubscribeException);
throw new OnErrorNotImplementedException("Observer.onError not implemented and error while unsubscribing.", new CompositeException(Arrays.asList(e, unsubscribeException))); // NOPMD
}
throw e2;
} catch (Throwable e2) {
/*
* throw since the Rx contract is broken if onError failed
*
* https://github.com/ReactiveX/RxJava/issues/198
*/
RxJavaHooks.onError(e2);
try {
unsubscribe();
} catch (Throwable unsubscribeException) {
RxJavaHooks.onError(unsubscribeException);
throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError and during unsubscription.", new CompositeException(Arrays.asList(e, e2, unsubscribeException)));
}
throw new OnErrorFailedException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2)));
}
// if we did not throw above we will unsubscribe here, if onError failed then unsubscribe happens in the catch
try {
unsubscribe();
} catch (Throwable unsubscribeException) {
RxJavaHooks.onError(unsubscribeException);
throw new OnErrorFailedException(unsubscribeException);
}
}
/**
* Returns the {@link Subscriber} underlying this {@code SafeSubscriber}.
*
* @return the {@link Subscriber} that was used to create this {@code SafeSubscriber}
*/
public Subscriber<? super T> getActual() {
return actual;
}
}