Skip to content

Commit bbaea42

Browse files
authored
2.x: Fix window(time) possible interrupts while terminating (#6684)
1 parent 90ae2a3 commit bbaea42

File tree

4 files changed

+503
-59
lines changed

4 files changed

+503
-59
lines changed

Diff for: src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java

+11-29
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,6 @@ public void onError(Throwable t) {
160160
}
161161

162162
downstream.onError(t);
163-
dispose();
164163
}
165164

166165
@Override
@@ -171,7 +170,6 @@ public void onComplete() {
171170
}
172171

173172
downstream.onComplete();
174-
dispose();
175173
}
176174

177175
@Override
@@ -184,22 +182,15 @@ public void cancel() {
184182
cancelled = true;
185183
}
186184

187-
public void dispose() {
188-
DisposableHelper.dispose(timer);
189-
}
190-
191185
@Override
192186
public void run() {
193-
194187
if (cancelled) {
195188
terminated = true;
196-
dispose();
197189
}
198190
queue.offer(NEXT);
199191
if (enter()) {
200192
drainLoop();
201193
}
202-
203194
}
204195

205196
void drainLoop() {
@@ -221,13 +212,13 @@ void drainLoop() {
221212
if (d && (o == null || o == NEXT)) {
222213
window = null;
223214
q.clear();
224-
dispose();
225215
Throwable err = error;
226216
if (err != null) {
227217
w.onError(err);
228218
} else {
229219
w.onComplete();
230220
}
221+
timer.dispose();
231222
return;
232223
}
233224

@@ -251,8 +242,8 @@ void drainLoop() {
251242
window = null;
252243
queue.clear();
253244
upstream.cancel();
254-
dispose();
255245
a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests."));
246+
timer.dispose();
256247
return;
257248
}
258249
} else {
@@ -396,7 +387,7 @@ public void onNext(T t) {
396387
window = null;
397388
upstream.cancel();
398389
downstream.onError(new MissingBackpressureException("Could not deliver window due to lack of requests"));
399-
dispose();
390+
disposeTimer();
400391
return;
401392
}
402393
} else {
@@ -424,7 +415,6 @@ public void onError(Throwable t) {
424415
}
425416

426417
downstream.onError(t);
427-
dispose();
428418
}
429419

430420
@Override
@@ -435,7 +425,6 @@ public void onComplete() {
435425
}
436426

437427
downstream.onComplete();
438-
dispose();
439428
}
440429

441430
@Override
@@ -448,8 +437,8 @@ public void cancel() {
448437
cancelled = true;
449438
}
450439

451-
public void dispose() {
452-
DisposableHelper.dispose(timer);
440+
public void disposeTimer() {
441+
timer.dispose();
453442
Worker w = worker;
454443
if (w != null) {
455444
w.dispose();
@@ -468,7 +457,7 @@ void drainLoop() {
468457
if (terminated) {
469458
upstream.cancel();
470459
q.clear();
471-
dispose();
460+
disposeTimer();
472461
return;
473462
}
474463

@@ -488,7 +477,7 @@ void drainLoop() {
488477
} else {
489478
w.onComplete();
490479
}
491-
dispose();
480+
disposeTimer();
492481
return;
493482
}
494483

@@ -515,7 +504,7 @@ void drainLoop() {
515504
queue.clear();
516505
upstream.cancel();
517506
a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests."));
518-
dispose();
507+
disposeTimer();
519508
return;
520509
}
521510
}
@@ -554,7 +543,7 @@ void drainLoop() {
554543
window = null;
555544
upstream.cancel();
556545
downstream.onError(new MissingBackpressureException("Could not deliver window due to lack of requests"));
557-
dispose();
546+
disposeTimer();
558547
return;
559548
}
560549
} else {
@@ -585,7 +574,6 @@ public void run() {
585574
p.queue.offer(this);
586575
} else {
587576
p.terminated = true;
588-
p.dispose();
589577
}
590578
if (p.enter()) {
591579
p.drainLoop();
@@ -682,7 +670,6 @@ public void onError(Throwable t) {
682670
}
683671

684672
downstream.onError(t);
685-
dispose();
686673
}
687674

688675
@Override
@@ -693,7 +680,6 @@ public void onComplete() {
693680
}
694681

695682
downstream.onComplete();
696-
dispose();
697683
}
698684

699685
@Override
@@ -706,10 +692,6 @@ public void cancel() {
706692
cancelled = true;
707693
}
708694

709-
public void dispose() {
710-
worker.dispose();
711-
}
712-
713695
void complete(UnicastProcessor<T> w) {
714696
queue.offer(new SubjectWork<T>(w, false));
715697
if (enter()) {
@@ -730,9 +712,9 @@ void drainLoop() {
730712
for (;;) {
731713
if (terminated) {
732714
upstream.cancel();
733-
dispose();
734715
q.clear();
735716
ws.clear();
717+
worker.dispose();
736718
return;
737719
}
738720

@@ -756,7 +738,7 @@ void drainLoop() {
756738
}
757739
}
758740
ws.clear();
759-
dispose();
741+
worker.dispose();
760742
return;
761743
}
762744

Diff for: src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java

+9-26
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,13 @@
1515

1616
import java.util.*;
1717
import java.util.concurrent.TimeUnit;
18-
import java.util.concurrent.atomic.AtomicReference;
1918

2019
import io.reactivex.*;
2120
import io.reactivex.Observable;
2221
import io.reactivex.Observer;
2322
import io.reactivex.Scheduler.Worker;
2423
import io.reactivex.disposables.Disposable;
25-
import io.reactivex.internal.disposables.DisposableHelper;
24+
import io.reactivex.internal.disposables.*;
2625
import io.reactivex.internal.observers.QueueDrainObserver;
2726
import io.reactivex.internal.queue.MpscLinkedQueue;
2827
import io.reactivex.internal.util.NotificationLite;
@@ -85,7 +84,7 @@ static final class WindowExactUnboundedObserver<T>
8584

8685
UnicastSubject<T> window;
8786

88-
final AtomicReference<Disposable> timer = new AtomicReference<Disposable>();
87+
final SequentialDisposable timer = new SequentialDisposable();
8988

9089
static final Object NEXT = new Object();
9190

@@ -114,7 +113,7 @@ public void onSubscribe(Disposable d) {
114113

115114
if (!cancelled) {
116115
Disposable task = scheduler.schedulePeriodicallyDirect(this, timespan, timespan, unit);
117-
DisposableHelper.replace(timer, task);
116+
timer.replace(task);
118117
}
119118
}
120119
}
@@ -146,7 +145,6 @@ public void onError(Throwable t) {
146145
drainLoop();
147146
}
148147

149-
disposeTimer();
150148
downstream.onError(t);
151149
}
152150

@@ -157,7 +155,6 @@ public void onComplete() {
157155
drainLoop();
158156
}
159157

160-
disposeTimer();
161158
downstream.onComplete();
162159
}
163160

@@ -171,15 +168,10 @@ public boolean isDisposed() {
171168
return cancelled;
172169
}
173170

174-
void disposeTimer() {
175-
DisposableHelper.dispose(timer);
176-
}
177-
178171
@Override
179172
public void run() {
180173
if (cancelled) {
181174
terminated = true;
182-
disposeTimer();
183175
}
184176
queue.offer(NEXT);
185177
if (enter()) {
@@ -206,13 +198,13 @@ void drainLoop() {
206198
if (d && (o == null || o == NEXT)) {
207199
window = null;
208200
q.clear();
209-
disposeTimer();
210201
Throwable err = error;
211202
if (err != null) {
212203
w.onError(err);
213204
} else {
214205
w.onComplete();
215206
}
207+
timer.dispose();
216208
return;
217209
}
218210

@@ -266,7 +258,7 @@ static final class WindowExactBoundedObserver<T>
266258

267259
volatile boolean terminated;
268260

269-
final AtomicReference<Disposable> timer = new AtomicReference<Disposable>();
261+
final SequentialDisposable timer = new SequentialDisposable();
270262

271263
WindowExactBoundedObserver(
272264
Observer<? super Observable<T>> actual,
@@ -312,7 +304,7 @@ public void onSubscribe(Disposable d) {
312304
task = scheduler.schedulePeriodicallyDirect(consumerIndexHolder, timespan, timespan, unit);
313305
}
314306

315-
DisposableHelper.replace(timer, task);
307+
timer.replace(task);
316308
}
317309
}
318310

@@ -370,7 +362,6 @@ public void onError(Throwable t) {
370362
}
371363

372364
downstream.onError(t);
373-
disposeTimer();
374365
}
375366

376367
@Override
@@ -381,7 +372,6 @@ public void onComplete() {
381372
}
382373

383374
downstream.onComplete();
384-
disposeTimer();
385375
}
386376

387377
@Override
@@ -428,13 +418,13 @@ void drainLoop() {
428418
if (d && (empty || isHolder)) {
429419
window = null;
430420
q.clear();
431-
disposeTimer();
432421
Throwable err = error;
433422
if (err != null) {
434423
w.onError(err);
435424
} else {
436425
w.onComplete();
437426
}
427+
disposeTimer();
438428
return;
439429
}
440430

@@ -507,7 +497,6 @@ public void run() {
507497
p.queue.offer(this);
508498
} else {
509499
p.terminated = true;
510-
p.disposeTimer();
511500
}
512501
if (p.enter()) {
513502
p.drainLoop();
@@ -592,7 +581,6 @@ public void onError(Throwable t) {
592581
}
593582

594583
downstream.onError(t);
595-
disposeWorker();
596584
}
597585

598586
@Override
@@ -603,7 +591,6 @@ public void onComplete() {
603591
}
604592

605593
downstream.onComplete();
606-
disposeWorker();
607594
}
608595

609596
@Override
@@ -616,10 +603,6 @@ public boolean isDisposed() {
616603
return cancelled;
617604
}
618605

619-
void disposeWorker() {
620-
worker.dispose();
621-
}
622-
623606
void complete(UnicastSubject<T> w) {
624607
queue.offer(new SubjectWork<T>(w, false));
625608
if (enter()) {
@@ -640,9 +623,9 @@ void drainLoop() {
640623
for (;;) {
641624
if (terminated) {
642625
upstream.dispose();
643-
disposeWorker();
644626
q.clear();
645627
ws.clear();
628+
worker.dispose();
646629
return;
647630
}
648631

@@ -665,8 +648,8 @@ void drainLoop() {
665648
w.onComplete();
666649
}
667650
}
668-
disposeWorker();
669651
ws.clear();
652+
worker.dispose();
670653
return;
671654
}
672655

0 commit comments

Comments
 (0)