Skip to content

Commit 6030d83

Browse files
authored
3.x: Fix Flowable.flatMap not canceling the inner sources on outer error (#6826)
1 parent 84677c5 commit 6030d83

File tree

3 files changed

+74
-1
lines changed

3 files changed

+74
-1
lines changed

Diff for: src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMap.java

+5
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,11 @@ public void onError(Throwable t) {
321321
}
322322
if (errors.tryAddThrowableOrReport(t)) {
323323
done = true;
324+
if (!delayErrors) {
325+
for (InnerSubscriber<?, ?> a : subscribers.getAndSet(CANCELLED)) {
326+
a.dispose();
327+
}
328+
}
324329
drain();
325330
}
326331
}

Diff for: src/test/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMapTest.java

+34
Original file line numberDiff line numberDiff line change
@@ -1116,4 +1116,38 @@ public Publisher<Integer> apply(Integer v) throws Throwable {
11161116
}
11171117
});
11181118
}
1119+
1120+
@Test
1121+
public void mainErrorsInnerCancelled() {
1122+
PublishProcessor<Integer> pp1 = PublishProcessor.create();
1123+
PublishProcessor<Integer> pp2 = PublishProcessor.create();
1124+
1125+
pp1
1126+
.flatMap(v -> pp2)
1127+
.test();
1128+
1129+
pp1.onNext(1);
1130+
assertTrue("No subscribers?", pp2.hasSubscribers());
1131+
1132+
pp1.onError(new TestException());
1133+
1134+
assertFalse("Has subscribers?", pp2.hasSubscribers());
1135+
}
1136+
1137+
@Test
1138+
public void innerErrorsMainCancelled() {
1139+
PublishProcessor<Integer> pp1 = PublishProcessor.create();
1140+
PublishProcessor<Integer> pp2 = PublishProcessor.create();
1141+
1142+
pp1
1143+
.flatMap(v -> pp2)
1144+
.test();
1145+
1146+
pp1.onNext(1);
1147+
assertTrue("No subscribers?", pp2.hasSubscribers());
1148+
1149+
pp2.onError(new TestException());
1150+
1151+
assertFalse("Has subscribers?", pp1.hasSubscribers());
1152+
}
11191153
}

Diff for: src/test/java/io/reactivex/rxjava3/internal/operators/observable/ObservableFlatMapTest.java

+35-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import io.reactivex.rxjava3.core.*;
2727
import io.reactivex.rxjava3.core.Observable;
2828
import io.reactivex.rxjava3.core.Observer;
29-
import io.reactivex.rxjava3.disposables.*;
29+
import io.reactivex.rxjava3.disposables.Disposable;
3030
import io.reactivex.rxjava3.exceptions.*;
3131
import io.reactivex.rxjava3.functions.*;
3232
import io.reactivex.rxjava3.internal.functions.Functions;
@@ -1079,4 +1079,38 @@ public Observable<Integer> apply(Integer v) throws Throwable {
10791079
}
10801080
});
10811081
}
1082+
1083+
@Test
1084+
public void mainErrorsInnerCancelled() {
1085+
PublishSubject<Integer> ps1 = PublishSubject.create();
1086+
PublishSubject<Integer> ps2 = PublishSubject.create();
1087+
1088+
ps1
1089+
.flatMap(v -> ps2)
1090+
.test();
1091+
1092+
ps1.onNext(1);
1093+
assertTrue("No subscribers?", ps2.hasObservers());
1094+
1095+
ps1.onError(new TestException());
1096+
1097+
assertFalse("Has subscribers?", ps2.hasObservers());
1098+
}
1099+
1100+
@Test
1101+
public void innerErrorsMainCancelled() {
1102+
PublishSubject<Integer> ps1 = PublishSubject.create();
1103+
PublishSubject<Integer> ps2 = PublishSubject.create();
1104+
1105+
ps1
1106+
.flatMap(v -> ps2)
1107+
.test();
1108+
1109+
ps1.onNext(1);
1110+
assertTrue("No subscribers?", ps2.hasObservers());
1111+
1112+
ps2.onError(new TestException());
1113+
1114+
assertFalse("Has subscribers?", ps1.hasObservers());
1115+
}
10821116
}

0 commit comments

Comments
 (0)