Skip to content

Commit 9a36930

Browse files
authored
3.x: Fix groupBy not canceling upstream due to group abandonment (#6642)
* 3.x: Fix groupBy not canceling upstream due to group abandonment * Add codecov retry on connection refused * Retry connrefused * Connrefused not supported?
1 parent c9204b5 commit 9a36930

File tree

8 files changed

+320
-45
lines changed

8 files changed

+320
-45
lines changed

Diff for: .travis.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ script: gradle/buildViaTravis.sh
1313

1414
# Code coverage
1515
after_success:
16-
- bash <(curl -s https://codecov.io/bash)
16+
- bash <(curl -s --retry 10 https://codecov.io/bash)
1717
- bash gradle/push_javadoc.sh
1818

1919
# cache between builds

Diff for: src/main/java/io/reactivex/rxjava3/core/Flowable.java

+31
Original file line numberDiff line numberDiff line change
@@ -10414,6 +10414,11 @@ public final Disposable forEachWhile(final Predicate<? super T> onNext, final Co
1041410414
* {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} and overriding the default maximum concurrency
1041510415
* value to be greater or equal to the expected number of groups, possibly using
1041610416
* {@code Integer.MAX_VALUE} if the number of expected groups is unknown.
10417+
* <p>
10418+
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
10419+
* so-called group abandonment where a group will only contain one element and the group will be
10420+
* re-created over and over as new upstream items trigger a new group. The behavior is
10421+
* a tradeoff between no-dataloss, upstream cancellation and excessive group creation.
1041710422
*
1041810423
* <dl>
1041910424
* <dt><b>Backpressure:</b></dt>
@@ -10462,6 +10467,12 @@ public final <K> Flowable<GroupedFlowable<K, T>> groupBy(Function<? super T, ? e
1046210467
* {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} and overriding the default maximum concurrency
1046310468
* value to be greater or equal to the expected number of groups, possibly using
1046410469
* {@code Integer.MAX_VALUE} if the number of expected groups is unknown.
10470+
* <p>
10471+
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
10472+
* so-called group abandonment where a group will only contain one element and the group will be
10473+
* re-created over and over as new upstream items trigger a new group. The behavior is
10474+
* a tradeoff between no-dataloss, upstream cancellation and excessive group creation.
10475+
*
1046510476
* <dl>
1046610477
* <dt><b>Backpressure:</b></dt>
1046710478
* <dd>Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher}
@@ -10512,6 +10523,11 @@ public final <K> Flowable<GroupedFlowable<K, T>> groupBy(Function<? super T, ? e
1051210523
* {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} and overriding the default maximum concurrency
1051310524
* value to be greater or equal to the expected number of groups, possibly using
1051410525
* {@code Integer.MAX_VALUE} if the number of expected groups is unknown.
10526+
* <p>
10527+
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
10528+
* so-called group abandonment where a group will only contain one element and the group will be
10529+
* re-created over and over as new upstream items trigger a new group. The behavior is
10530+
* a tradeoff between no-dataloss, upstream cancellation and excessive group creation.
1051510531
*
1051610532
* <dl>
1051710533
* <dt><b>Backpressure:</b></dt>
@@ -10565,6 +10581,11 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
1056510581
* {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} and overriding the default maximum concurrency
1056610582
* value to be greater or equal to the expected number of groups, possibly using
1056710583
* {@code Integer.MAX_VALUE} if the number of expected groups is unknown.
10584+
* <p>
10585+
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
10586+
* so-called group abandonment where a group will only contain one element and the group will be
10587+
* re-created over and over as new upstream items trigger a new group. The behavior is
10588+
* a tradeoff between no-dataloss, upstream cancellation and excessive group creation.
1056810589
*
1056910590
* <dl>
1057010591
* <dt><b>Backpressure:</b></dt>
@@ -10621,6 +10642,11 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
1062110642
* {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} and overriding the default maximum concurrency
1062210643
* value to be greater or equal to the expected number of groups, possibly using
1062310644
* {@code Integer.MAX_VALUE} if the number of expected groups is unknown.
10645+
* <p>
10646+
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
10647+
* so-called group abandonment where a group will only contain one element and the group will be
10648+
* re-created over and over as new upstream items trigger a new group. The behavior is
10649+
* a tradeoff between no-dataloss, upstream cancellation and excessive group creation.
1062410650
*
1062510651
* <dl>
1062610652
* <dt><b>Backpressure:</b></dt>
@@ -10726,6 +10752,11 @@ public final <K, V> Flowable<GroupedFlowable<K, V>> groupBy(Function<? super T,
1072610752
* {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} and overriding the default maximum concurrency
1072710753
* value to be greater or equal to the expected number of groups, possibly using
1072810754
* {@code Integer.MAX_VALUE} if the number of expected groups is unknown.
10755+
* <p>
10756+
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
10757+
* so-called group abandonment where a group will only contain one element and the group will be
10758+
* re-created over and over as new upstream items trigger a new group. The behavior is
10759+
* a tradeoff between no-dataloss, upstream cancellation and excessive group creation.
1072910760
*
1073010761
* <dl>
1073110762
* <dt><b>Backpressure:</b></dt>

Diff for: src/main/java/io/reactivex/rxjava3/core/Observable.java

+30
Original file line numberDiff line numberDiff line change
@@ -9067,6 +9067,12 @@ public final Disposable forEachWhile(final Predicate<? super T> onNext, Consumer
90679067
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
90689068
* {@code GroupedObservableSource}s that do not concern you. Instead, you can signal to them that they may
90699069
* discard their buffers by applying an operator like {@link #ignoreElements} to them.
9070+
* <p>
9071+
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
9072+
* so-called group abandonment where a group will only contain one element and the group will be
9073+
* re-created over and over as new upstream items trigger a new group. The behavior is
9074+
* a tradeoff between no-dataloss, upstream cancellation and excessive group creation.
9075+
*
90709076
* <dl>
90719077
* <dt><b>Scheduler:</b></dt>
90729078
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -9101,6 +9107,12 @@ public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T,
91019107
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
91029108
* {@code GroupedObservableSource}s that do not concern you. Instead, you can signal to them that they may
91039109
* discard their buffers by applying an operator like {@link #ignoreElements} to them.
9110+
* <p>
9111+
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
9112+
* so-called group abandonment where a group will only contain one element and the group will be
9113+
* re-created over and over as new upstream items trigger a new group. The behavior is
9114+
* a tradeoff between no-dataloss, upstream cancellation and excessive group creation.
9115+
*
91049116
* <dl>
91059117
* <dt><b>Scheduler:</b></dt>
91069118
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -9138,6 +9150,12 @@ public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T,
91389150
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
91399151
* {@code GroupedObservableSource}s that do not concern you. Instead, you can signal to them that they may
91409152
* discard their buffers by applying an operator like {@link #ignoreElements} to them.
9153+
* <p>
9154+
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
9155+
* so-called group abandonment where a group will only contain one element and the group will be
9156+
* re-created over and over as new upstream items trigger a new group. The behavior is
9157+
* a tradeoff between no-dataloss, upstream cancellation and excessive group creation.
9158+
*
91419159
* <dl>
91429160
* <dt><b>Scheduler:</b></dt>
91439161
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -9176,6 +9194,12 @@ public final <K, V> Observable<GroupedObservable<K, V>> groupBy(Function<? super
91769194
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
91779195
* {@code GroupedObservableSource}s that do not concern you. Instead, you can signal to them that they may
91789196
* discard their buffers by applying an operator like {@link #ignoreElements} to them.
9197+
* <p>
9198+
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
9199+
* so-called group abandonment where a group will only contain one element and the group will be
9200+
* re-created over and over as new upstream items trigger a new group. The behavior is
9201+
* a tradeoff between no-dataloss, upstream cancellation and excessive group creation.
9202+
*
91799203
* <dl>
91809204
* <dt><b>Scheduler:</b></dt>
91819205
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>
@@ -9217,6 +9241,12 @@ public final <K, V> Observable<GroupedObservable<K, V>> groupBy(Function<? super
92179241
* is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those
92189242
* {@code GroupedObservableSource}s that do not concern you. Instead, you can signal to them that they may
92199243
* discard their buffers by applying an operator like {@link #ignoreElements} to them.
9244+
* <p>
9245+
* Note also that ignoring groups or subscribing later (i.e., on another thread) will result in
9246+
* so-called group abandonment where a group will only contain one element and the group will be
9247+
* re-created over and over as new upstream items trigger a new group. The behavior is
9248+
* a tradeoff between no-dataloss, upstream cancellation and excessive group creation.
9249+
*
92209250
* <dl>
92219251
* <dt><b>Scheduler:</b></dt>
92229252
* <dd>{@code groupBy} does not operate by default on a particular {@link Scheduler}.</dd>

Diff for: src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java

+49-12
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,13 @@ public void onNext(T t) {
179179
if (newGroup) {
180180
q.offer(group);
181181
drain();
182+
183+
if (group.state.tryAbandon()) {
184+
cancel(key);
185+
group.onComplete();
186+
187+
upstream.request(1);
188+
}
182189
}
183190
}
184191

@@ -489,12 +496,17 @@ static final class State<T, K> extends BasicIntQueueSubscription<T> implements P
489496

490497
final AtomicReference<Subscriber<? super T>> actual = new AtomicReference<Subscriber<? super T>>();
491498

492-
final AtomicBoolean once = new AtomicBoolean();
493-
494499
boolean outputFused;
495500

496501
int produced;
497502

503+
final AtomicInteger once = new AtomicInteger();
504+
505+
static final int FRESH = 0;
506+
static final int HAS_SUBSCRIBER = 1;
507+
static final int ABANDONED = 2;
508+
static final int ABANDONED_HAS_SUBSCRIBER = ABANDONED | HAS_SUBSCRIBER;
509+
498510
State(int bufferSize, GroupBySubscriber<?, K, T> parent, K key, boolean delayError) {
499511
this.queue = new SpscLinkedArrayQueue<T>(bufferSize);
500512
this.parent = parent;
@@ -513,19 +525,30 @@ public void request(long n) {
513525
@Override
514526
public void cancel() {
515527
if (cancelled.compareAndSet(false, true)) {
516-
parent.cancel(key);
528+
cancelParent();
517529
}
518530
}
519531

520532
@Override
521-
public void subscribe(Subscriber<? super T> s) {
522-
if (once.compareAndSet(false, true)) {
523-
s.onSubscribe(this);
524-
actual.lazySet(s);
525-
drain();
526-
} else {
527-
EmptySubscription.error(new IllegalStateException("Only one Subscriber allowed!"), s);
533+
public void subscribe(Subscriber<? super T> subscriber) {
534+
for (;;) {
535+
int s = once.get();
536+
if ((s & HAS_SUBSCRIBER) != 0) {
537+
break;
538+
}
539+
int u = s | HAS_SUBSCRIBER;
540+
if (once.compareAndSet(s, u)) {
541+
subscriber.onSubscribe(this);
542+
actual.lazySet(subscriber);
543+
if (cancelled.get()) {
544+
actual.lazySet(null);
545+
} else {
546+
drain();
547+
}
548+
return;
549+
}
528550
}
551+
EmptySubscription.error(new IllegalStateException("Only one Subscriber allowed!"), subscriber);
529552
}
530553

531554
public void onNext(T t) {
@@ -544,6 +567,16 @@ public void onComplete() {
544567
drain();
545568
}
546569

570+
void cancelParent() {
571+
if ((once.get() & ABANDONED) == 0) {
572+
parent.cancel(key);
573+
}
574+
}
575+
576+
boolean tryAbandon() {
577+
return once.get() == FRESH && once.compareAndSet(FRESH, ABANDONED);
578+
}
579+
547580
void drain() {
548581
if (getAndIncrement() != 0) {
549582
return;
@@ -640,7 +673,9 @@ void drainNormal() {
640673
if (r != Long.MAX_VALUE) {
641674
requested.addAndGet(-e);
642675
}
643-
parent.upstream.request(e);
676+
if ((once.get() & ABANDONED) == 0) {
677+
parent.upstream.request(e);
678+
}
644679
}
645680
}
646681

@@ -708,7 +743,9 @@ public T poll() {
708743
int p = produced;
709744
if (p != 0) {
710745
produced = 0;
711-
parent.upstream.request(p);
746+
if ((once.get() & ABANDONED) == 0) {
747+
parent.upstream.request(p);
748+
}
712749
}
713750
return null;
714751
}

Diff for: src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableGroupBy.java

+41-14
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,11 @@ public void onNext(T t) {
110110
getAndIncrement();
111111

112112
downstream.onNext(group);
113+
114+
if (group.state.tryAbandon()) {
115+
cancel(key);
116+
group.onComplete();
117+
}
113118
}
114119

115120
V v;
@@ -151,7 +156,7 @@ public void onComplete() {
151156

152157
@Override
153158
public void dispose() {
154-
// cancelling the main source means we don't want any more groups
159+
// canceling the main source means we don't want any more groups
155160
// but running groups still require new values
156161
if (cancelled.compareAndSet(false, true)) {
157162
if (decrementAndGet() == 0) {
@@ -220,10 +225,15 @@ static final class State<T, K> extends AtomicInteger implements Disposable, Obse
220225

221226
final AtomicBoolean cancelled = new AtomicBoolean();
222227

223-
final AtomicBoolean once = new AtomicBoolean();
224-
225228
final AtomicReference<Observer<? super T>> actual = new AtomicReference<Observer<? super T>>();
226229

230+
final AtomicInteger once = new AtomicInteger();
231+
232+
static final int FRESH = 0;
233+
static final int HAS_SUBSCRIBER = 1;
234+
static final int ABANDONED = 2;
235+
static final int ABANDONED_HAS_SUBSCRIBER = ABANDONED | HAS_SUBSCRIBER;
236+
227237
State(int bufferSize, GroupByObserver<?, K, T> parent, K key, boolean delayError) {
228238
this.queue = new SpscLinkedArrayQueue<T>(bufferSize);
229239
this.parent = parent;
@@ -236,7 +246,7 @@ public void dispose() {
236246
if (cancelled.compareAndSet(false, true)) {
237247
if (getAndIncrement() == 0) {
238248
actual.lazySet(null);
239-
parent.cancel(key);
249+
cancelParent();
240250
}
241251
}
242252
}
@@ -248,17 +258,24 @@ public boolean isDisposed() {
248258

249259
@Override
250260
public void subscribe(Observer<? super T> observer) {
251-
if (once.compareAndSet(false, true)) {
252-
observer.onSubscribe(this);
253-
actual.lazySet(observer);
254-
if (cancelled.get()) {
255-
actual.lazySet(null);
256-
} else {
257-
drain();
261+
for (;;) {
262+
int s = once.get();
263+
if ((s & HAS_SUBSCRIBER) != 0) {
264+
break;
265+
}
266+
int u = s | HAS_SUBSCRIBER;
267+
if (once.compareAndSet(s, u)) {
268+
observer.onSubscribe(this);
269+
actual.lazySet(observer);
270+
if (cancelled.get()) {
271+
actual.lazySet(null);
272+
} else {
273+
drain();
274+
}
275+
return;
258276
}
259-
} else {
260-
EmptyDisposable.error(new IllegalStateException("Only one Observer allowed!"), observer);
261277
}
278+
EmptyDisposable.error(new IllegalStateException("Only one Observer allowed!"), observer);
262279
}
263280

264281
public void onNext(T t) {
@@ -315,11 +332,21 @@ void drain() {
315332
}
316333
}
317334

335+
void cancelParent() {
336+
if ((once.get() & ABANDONED) == 0) {
337+
parent.cancel(key);
338+
}
339+
}
340+
341+
boolean tryAbandon() {
342+
return once.get() == FRESH && once.compareAndSet(FRESH, ABANDONED);
343+
}
344+
318345
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a, boolean delayError) {
319346
if (cancelled.get()) {
320347
queue.clear();
321-
parent.cancel(key);
322348
actual.lazySet(null);
349+
cancelParent();
323350
return true;
324351
}
325352

Diff for: src/test/java/io/reactivex/rxjava3/flowable/FlowableGroupByTests.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,13 @@ public Flowable<Integer> apply(GroupedFlowable<Integer, Integer> v) {
102102
}
103103
}).subscribe(ts);
104104

105-
ts.assertValues(0, 5, 10, 15, 1, 6, 11, 16, 2, 7, 12, 17, 3, 8, 13, 18, 4, 9, 14, 19);
105+
// Behavior change: this now counts as group abandonment because concatMap
106+
// doesn't subscribe to the 2nd+ emitted groups immediately
107+
ts.assertValues(
108+
0, 5, 10, 15, // First group is okay
109+
// any other group gets abandoned so we get 16 one-element group
110+
1, 2, 3, 4, 6, 7, 8, 9, 11, 12, 13, 14, 16, 17, 18, 19
111+
);
106112
ts.assertComplete();
107113
ts.assertNoErrors();
108114
}

0 commit comments

Comments
 (0)