Skip to content

Commit ab6c4b3

Browse files
authored
3.x: Fix truncation bugs in replay() and ReplaySubject/Processor (ReactiveX#6582)
* 3.x: fix truncation bugs in replay() and ReplaySubject/Processor * Remove impossible condition now that size is correctly checked/computed * Undo the last commit as it somehow hangs the CI, offline is okay.
1 parent 028d33e commit ab6c4b3

File tree

9 files changed

+265
-22
lines changed

9 files changed

+265
-22
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java

+14-12
Original file line numberDiff line numberDiff line change
@@ -767,6 +767,11 @@ final void removeFirst() {
767767
}
768768

769769
setFirst(head);
770+
// correct the tail if all items have been removed
771+
head = get();
772+
if (head.get() == null) {
773+
tail = head;
774+
}
770775
}
771776
/**
772777
* Arranges the given node is the new head from now on.
@@ -775,35 +780,31 @@ final void removeFirst() {
775780
final void setFirst(Node n) {
776781
if (eagerTruncate) {
777782
Node m = new Node(null, n.index);
778-
Node nextNode = n.get();
779-
if (nextNode == null) {
780-
tail = m;
781-
}
782-
m.lazySet(nextNode);
783+
m.lazySet(n.get());
783784
n = m;
784785
}
785786
set(n);
786787
}
787788

788789
@Override
789790
public final void next(T value) {
790-
Object o = enterTransform(NotificationLite.next(value));
791+
Object o = enterTransform(NotificationLite.next(value), false);
791792
Node n = new Node(o, ++index);
792793
addLast(n);
793794
truncate();
794795
}
795796

796797
@Override
797798
public final void error(Throwable e) {
798-
Object o = enterTransform(NotificationLite.error(e));
799+
Object o = enterTransform(NotificationLite.error(e), true);
799800
Node n = new Node(o, ++index);
800801
addLast(n);
801802
truncateFinal();
802803
}
803804

804805
@Override
805806
public final void complete() {
806-
Object o = enterTransform(NotificationLite.complete());
807+
Object o = enterTransform(NotificationLite.complete(), true);
807808
Node n = new Node(o, ++index);
808809
addLast(n);
809810
truncateFinal();
@@ -897,9 +898,10 @@ public final void replay(InnerSubscription<T> output) {
897898
* Override this to wrap the NotificationLite object into a
898899
* container to be used later by truncate.
899900
* @param value the value to transform into the internal representation
901+
* @param terminal is this a terminal value?
900902
* @return the transformed value
901903
*/
902-
Object enterTransform(Object value) {
904+
Object enterTransform(Object value, boolean terminal) {
903905
return value;
904906
}
905907
/**
@@ -1001,8 +1003,8 @@ static final class SizeAndTimeBoundReplayBuffer<T> extends BoundedReplayBuffer<T
10011003
}
10021004

10031005
@Override
1004-
Object enterTransform(Object value) {
1005-
return new Timed<Object>(value, scheduler.now(unit), unit);
1006+
Object enterTransform(Object value, boolean terminal) {
1007+
return new Timed<Object>(value, terminal ? Long.MAX_VALUE : scheduler.now(unit), unit);
10061008
}
10071009

10081010
@Override
@@ -1019,7 +1021,7 @@ void truncate() {
10191021

10201022
int e = 0;
10211023
for (;;) {
1022-
if (next != null) {
1024+
if (next != null && size > 1) { // never truncate the very last item just added
10231025
if (size > limit) {
10241026
e++;
10251027
size--;

src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -633,6 +633,11 @@ final void trimHead() {
633633
}
634634

635635
setFirst(head);
636+
// correct the tail if all items have been removed
637+
head = get();
638+
if (head.get() == null) {
639+
tail = head;
640+
}
636641
}
637642
/**
638643
* Arranges the given node is the new head from now on.
@@ -641,11 +646,7 @@ final void trimHead() {
641646
final void setFirst(Node n) {
642647
if (eagerTruncate) {
643648
Node m = new Node(null);
644-
Node nextNode = n.get();
645-
if (nextNode == null) {
646-
tail = m;
647-
}
648-
m.lazySet(nextNode);
649+
m.lazySet(n.get());
649650
n = m;
650651
}
651652
set(n);
@@ -845,7 +846,7 @@ void truncate() {
845846

846847
int e = 0;
847848
for (;;) {
848-
if (next != null) {
849+
if (next != null && size > 1) { // never truncate the very last item just added
849850
if (size > limit) {
850851
e++;
851852
size--;

src/main/java/io/reactivex/processors/ReplayProcessor.java

+5
Original file line numberDiff line numberDiff line change
@@ -1070,6 +1070,10 @@ void trim() {
10701070
TimedNode<T> h = head;
10711071

10721072
for (;;) {
1073+
if (size <= 1) {
1074+
head = h;
1075+
break;
1076+
}
10731077
TimedNode<T> next = h.get();
10741078
if (next == null) {
10751079
head = h;
@@ -1082,6 +1086,7 @@ void trim() {
10821086
}
10831087

10841088
h = next;
1089+
size--;
10851090
}
10861091

10871092
}

src/main/java/io/reactivex/subjects/ReplaySubject.java

+5
Original file line numberDiff line numberDiff line change
@@ -1071,6 +1071,10 @@ void trim() {
10711071
TimedNode<Object> h = head;
10721072

10731073
for (;;) {
1074+
if (size <= 1) {
1075+
head = h;
1076+
break;
1077+
}
10741078
TimedNode<Object> next = h.get();
10751079
if (next == null) {
10761080
head = h;
@@ -1083,6 +1087,7 @@ void trim() {
10831087
}
10841088

10851089
h = next;
1090+
size--;
10861091
}
10871092

10881093
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayEagerTruncateTest.java

+10-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import io.reactivex.*;
2929
import io.reactivex.Scheduler.Worker;
3030
import io.reactivex.annotations.NonNull;
31-
import io.reactivex.disposables.Disposable;
31+
import io.reactivex.disposables.*;
3232
import io.reactivex.exceptions.TestException;
3333
import io.reactivex.flowables.ConnectableFlowable;
3434
import io.reactivex.functions.*;
@@ -2258,4 +2258,13 @@ public void timeAndSizeBoundSelectorEagerTruncate() throws Exception {
22582258
+ " -> " + after / 1024.0 / 1024.0);
22592259
}
22602260
}
2261+
2262+
@Test
2263+
public void timeAndSizeNoTerminalTruncationOnTimechange() {
2264+
Flowable.just(1).replay(1, 1, TimeUnit.SECONDS, new TimesteppingScheduler(), true)
2265+
.autoConnect()
2266+
.test()
2267+
.assertComplete()
2268+
.assertNoErrors();
2269+
}
22612270
}

src/test/java/io/reactivex/internal/operators/observable/ObservableReplayEagerTruncateTest.java

+9
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.reactivex.internal.operators.observable;
1515

1616
import static org.junit.Assert.*;
17+
import static org.mockito.ArgumentMatchers.*;
1718
import static org.mockito.Mockito.*;
1819

1920
import java.lang.management.*;
@@ -1997,4 +1998,12 @@ public void timeAndSizeSelectorBoundEagerTruncate() throws Exception {
19971998
}
19981999
}
19992000

2001+
@Test
2002+
public void timeAndSizeNoTerminalTruncationOnTimechange() {
2003+
Observable.just(1).replay(1, 1, TimeUnit.SECONDS, new TimesteppingScheduler(), true)
2004+
.autoConnect()
2005+
.test()
2006+
.assertComplete()
2007+
.assertNoErrors();
2008+
}
20002009
}

src/test/java/io/reactivex/processors/ReplayProcessorTest.java

+78-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
package io.reactivex.processors;
1515

1616
import static org.junit.Assert.*;
17+
import static org.mockito.ArgumentMatchers.*;
1718
import static org.mockito.Mockito.*;
1819

1920
import java.lang.management.*;
@@ -25,15 +26,15 @@
2526
import org.mockito.*;
2627
import org.reactivestreams.*;
2728

28-
import io.reactivex.*;
29+
import io.reactivex.Flowable;
2930
import io.reactivex.disposables.Disposable;
3031
import io.reactivex.exceptions.TestException;
3132
import io.reactivex.functions.*;
3233
import io.reactivex.internal.subscriptions.BooleanSubscription;
3334
import io.reactivex.processors.ReplayProcessor.*;
3435
import io.reactivex.schedulers.*;
3536
import io.reactivex.subscribers.*;
36-
import io.reactivex.testsupport.TestHelper;
37+
import io.reactivex.testsupport.*;
3738

3839
public class ReplayProcessorTest extends FlowableProcessorTest<Object> {
3940

@@ -1751,4 +1752,79 @@ public void accept(byte[] v) throws Exception {
17511752
+ " -> " + after.get() / 1024.0 / 1024.0);
17521753
}
17531754
}
1755+
1756+
@Test
1757+
public void timeAndSizeNoTerminalTruncationOnTimechange() {
1758+
ReplayProcessor<Integer> rp = ReplayProcessor.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 1);
1759+
1760+
TestSubscriber<Integer> ts = rp.test();
1761+
1762+
rp.onNext(1);
1763+
rp.cleanupBuffer();
1764+
rp.onComplete();
1765+
1766+
ts.assertNoErrors()
1767+
.assertComplete();
1768+
}
1769+
1770+
@Test
1771+
public void timeAndSizeNoTerminalTruncationOnTimechange2() {
1772+
ReplayProcessor<Integer> rp = ReplayProcessor.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 1);
1773+
1774+
TestSubscriber<Integer> ts = rp.test();
1775+
1776+
rp.onNext(1);
1777+
rp.cleanupBuffer();
1778+
rp.onNext(2);
1779+
rp.cleanupBuffer();
1780+
rp.onComplete();
1781+
1782+
ts.assertNoErrors()
1783+
.assertComplete();
1784+
}
1785+
1786+
@Test
1787+
public void timeAndSizeNoTerminalTruncationOnTimechange3() {
1788+
ReplayProcessor<Integer> rp = ReplayProcessor.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 1);
1789+
1790+
TestSubscriber<Integer> ts = rp.test();
1791+
1792+
rp.onNext(1);
1793+
rp.onNext(2);
1794+
rp.onComplete();
1795+
1796+
ts.assertNoErrors()
1797+
.assertComplete();
1798+
}
1799+
1800+
@Test
1801+
public void timeAndSizeNoTerminalTruncationOnTimechange4() {
1802+
ReplayProcessor<Integer> rp = ReplayProcessor.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 10);
1803+
1804+
TestSubscriber<Integer> ts = rp.test();
1805+
1806+
rp.onNext(1);
1807+
rp.onNext(2);
1808+
rp.onComplete();
1809+
1810+
ts.assertNoErrors()
1811+
.assertComplete();
1812+
}
1813+
1814+
@Test
1815+
public void timeAndSizeRemoveCorrectNumberOfOld() {
1816+
TestScheduler scheduler = new TestScheduler();
1817+
ReplayProcessor<Integer> rp = ReplayProcessor.createWithTimeAndSize(1, TimeUnit.SECONDS, scheduler, 2);
1818+
1819+
rp.onNext(1);
1820+
rp.onNext(2);
1821+
rp.onNext(3);
1822+
1823+
scheduler.advanceTimeBy(2, TimeUnit.SECONDS);
1824+
1825+
rp.onNext(4);
1826+
rp.onNext(5);
1827+
1828+
rp.test().assertValuesOnly(4, 5);
1829+
}
17541830
}

0 commit comments

Comments
 (0)