Skip to content

Commit 8fdaad9

Browse files
committed
In ObservableConcatMapSingle, if dispose is called before innerError, forward errors to RxJavaPlugins.onError instead of swallowing them. Fixes ReactiveX#6587
1 parent ab6c4b3 commit 8fdaad9

File tree

2 files changed

+60
-14
lines changed

2 files changed

+60
-14
lines changed

src/main/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingle.java

+24-14
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public void onNext(T t) {
123123

124124
@Override
125125
public void onError(Throwable t) {
126-
if (errors.addThrowable(t)) {
126+
if (errors.addThrowable(t) && !cancelled) {
127127
if (errorMode == ErrorMode.IMMEDIATE) {
128128
inner.dispose();
129129
}
@@ -148,6 +148,10 @@ public void dispose() {
148148
if (getAndIncrement() == 0) {
149149
queue.clear();
150150
item = null;
151+
152+
if (errors.get() != null) {
153+
RxJavaPlugins.onError(errors.terminate());
154+
}
151155
}
152156
}
153157

@@ -163,7 +167,7 @@ void innerSuccess(R item) {
163167
}
164168

165169
void innerError(Throwable ex) {
166-
if (errors.addThrowable(ex)) {
170+
if (errors.addThrowable(ex) && !cancelled) {
167171
if (errorMode != ErrorMode.END) {
168172
upstream.dispose();
169173
}
@@ -188,25 +192,31 @@ void drain() {
188192
for (;;) {
189193

190194
for (;;) {
191-
if (cancelled) {
192-
queue.clear();
193-
item = null;
194-
break;
195-
}
196-
195+
boolean isCancelled = cancelled;
197196
int s = state;
198197

199198
if (errors.get() != null) {
200-
if (errorMode == ErrorMode.IMMEDIATE
201-
|| (errorMode == ErrorMode.BOUNDARY && s == STATE_INACTIVE)) {
202-
queue.clear();
203-
item = null;
204-
Throwable ex = errors.terminate();
205-
downstream.onError(ex);
199+
if (isCancelled) {
200+
RxJavaPlugins.onError(errors.terminate());
206201
return;
202+
} else {
203+
if (errorMode == ErrorMode.IMMEDIATE
204+
|| (errorMode == ErrorMode.BOUNDARY && s == STATE_INACTIVE)) {
205+
queue.clear();
206+
item = null;
207+
Throwable ex = errors.terminate();
208+
downstream.onError(ex);
209+
return;
210+
}
207211
}
208212
}
209213

214+
if (isCancelled) {
215+
queue.clear();
216+
item = null;
217+
break;
218+
}
219+
210220
if (s == STATE_INACTIVE) {
211221
boolean d = done;
212222
T v = queue.poll();

src/test/java/io/reactivex/internal/operators/mixed/ObservableConcatMapSingleTest.java

+36
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,42 @@ protected void subscribeActual(
212212
}
213213
}
214214

215+
@Test
216+
public void innerErrorAfterMainDispose() {
217+
List<Throwable> errors = TestHelper.trackPluginErrors();
218+
try {
219+
final PublishSubject<Integer> ps = PublishSubject.create();
220+
221+
final AtomicReference<SingleObserver<? super Integer>> obs = new AtomicReference<SingleObserver<? super Integer>>();
222+
223+
TestObserverEx<Integer> to = ps.concatMapSingle(
224+
new Function<Integer, SingleSource<Integer>>() {
225+
@Override
226+
public SingleSource<Integer> apply(Integer v)
227+
throws Exception {
228+
return new Single<Integer>() {
229+
@Override
230+
protected void subscribeActual(
231+
SingleObserver<? super Integer> observer) {
232+
observer.onSubscribe(Disposables.empty());
233+
obs.set(observer);
234+
}
235+
};
236+
}
237+
}
238+
).to(TestHelper.<Integer>testConsumer());
239+
240+
ps.onNext(1);
241+
242+
to.dispose();
243+
obs.get().onError(new TestException("inner"));
244+
245+
TestHelper.assertUndeliverable(errors, 0, TestException.class, "inner");
246+
} finally {
247+
RxJavaPlugins.reset();
248+
}
249+
}
250+
215251
@Test
216252
public void delayAllErrors() {
217253
TestObserverEx<Object> to = Observable.range(1, 5)

0 commit comments

Comments
 (0)