Skip to content

Commit a0290b0

Browse files
authored
2.x: Fix truncation bugs in replay() and ReplaySubject/Processor (#6602)
1 parent 70f25df commit a0290b0

File tree

7 files changed

+232
-2
lines changed

7 files changed

+232
-2
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -773,6 +773,11 @@ final void removeFirst() {
773773
}
774774

775775
setFirst(head);
776+
// correct the tail if all items have been removed
777+
head = get();
778+
if (head.get() == null) {
779+
tail = head;
780+
}
776781
}
777782
/**
778783
* Arranges the given node is the new head from now on.
@@ -1015,7 +1020,7 @@ void truncate() {
10151020
int e = 0;
10161021
for (;;) {
10171022
if (next != null) {
1018-
if (size > limit) {
1023+
if (size > limit && size > 1) { // never truncate the very last item just added
10191024
e++;
10201025
size--;
10211026
prev = next;

src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -638,6 +638,11 @@ final void trimHead() {
638638
}
639639

640640
setFirst(head);
641+
// correct the tail if all items have been removed
642+
head = get();
643+
if (head.get() == null) {
644+
tail = head;
645+
}
641646
}
642647
/**
643648
* Arranges the given node is the new head from now on.
@@ -839,7 +844,7 @@ void truncate() {
839844
int e = 0;
840845
for (;;) {
841846
if (next != null) {
842-
if (size > limit) {
847+
if (size > limit && size > 1) { // never truncate the very last item just added
843848
e++;
844849
size--;
845850
prev = next;

src/main/java/io/reactivex/processors/ReplayProcessor.java

+5
Original file line numberDiff line numberDiff line change
@@ -1070,6 +1070,10 @@ void trim() {
10701070
TimedNode<T> h = head;
10711071

10721072
for (;;) {
1073+
if (size <= 1) {
1074+
head = h;
1075+
break;
1076+
}
10731077
TimedNode<T> next = h.get();
10741078
if (next == null) {
10751079
head = h;
@@ -1082,6 +1086,7 @@ void trim() {
10821086
}
10831087

10841088
h = next;
1089+
size--;
10851090
}
10861091

10871092
}

src/main/java/io/reactivex/subjects/ReplaySubject.java

+5
Original file line numberDiff line numberDiff line change
@@ -1071,6 +1071,10 @@ void trim() {
10711071
TimedNode<Object> h = head;
10721072

10731073
for (;;) {
1074+
if (size <= 1) {
1075+
head = h;
1076+
break;
1077+
}
10741078
TimedNode<Object> next = h.get();
10751079
if (next == null) {
10761080
head = h;
@@ -1083,6 +1087,7 @@ void trim() {
10831087
}
10841088

10851089
h = next;
1090+
size--;
10861091
}
10871092

10881093
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex;
15+
16+
import java.util.concurrent.TimeUnit;
17+
18+
import io.reactivex.Scheduler;
19+
import io.reactivex.disposables.*;
20+
21+
/**
22+
* Basic scheduler that produces an ever increasing {@link #now(TimeUnit)} value.
23+
* Use this scheduler only as a time source!
24+
*/
25+
public class TimesteppingScheduler extends Scheduler {
26+
27+
final class TimesteppingWorker extends Worker {
28+
@Override
29+
public void dispose() {
30+
}
31+
32+
@Override
33+
public boolean isDisposed() {
34+
return false;
35+
}
36+
37+
@Override
38+
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
39+
run.run();
40+
return Disposables.disposed();
41+
}
42+
43+
@Override
44+
public long now(TimeUnit unit) {
45+
return time++;
46+
}
47+
}
48+
49+
long time;
50+
51+
@Override
52+
public Worker createWorker() {
53+
return new TimesteppingWorker();
54+
}
55+
56+
@Override
57+
public long now(TimeUnit unit) {
58+
return time++;
59+
}
60+
}

src/test/java/io/reactivex/processors/ReplayProcessorTest.java

+75
Original file line numberDiff line numberDiff line change
@@ -1750,4 +1750,79 @@ public void accept(byte[] v) throws Exception {
17501750
+ " -> " + after.get() / 1024.0 / 1024.0);
17511751
}
17521752
}
1753+
1754+
@Test
1755+
public void timeAndSizeNoTerminalTruncationOnTimechange() {
1756+
ReplayProcessor<Integer> rp = ReplayProcessor.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 1);
1757+
1758+
TestSubscriber<Integer> ts = rp.test();
1759+
1760+
rp.onNext(1);
1761+
rp.cleanupBuffer();
1762+
rp.onComplete();
1763+
1764+
ts.assertNoErrors()
1765+
.assertComplete();
1766+
}
1767+
1768+
@Test
1769+
public void timeAndSizeNoTerminalTruncationOnTimechange2() {
1770+
ReplayProcessor<Integer> rp = ReplayProcessor.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 1);
1771+
1772+
TestSubscriber<Integer> ts = rp.test();
1773+
1774+
rp.onNext(1);
1775+
rp.cleanupBuffer();
1776+
rp.onNext(2);
1777+
rp.cleanupBuffer();
1778+
rp.onComplete();
1779+
1780+
ts.assertNoErrors()
1781+
.assertComplete();
1782+
}
1783+
1784+
@Test
1785+
public void timeAndSizeNoTerminalTruncationOnTimechange3() {
1786+
ReplayProcessor<Integer> rp = ReplayProcessor.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 1);
1787+
1788+
TestSubscriber<Integer> ts = rp.test();
1789+
1790+
rp.onNext(1);
1791+
rp.onNext(2);
1792+
rp.onComplete();
1793+
1794+
ts.assertNoErrors()
1795+
.assertComplete();
1796+
}
1797+
1798+
@Test
1799+
public void timeAndSizeNoTerminalTruncationOnTimechange4() {
1800+
ReplayProcessor<Integer> rp = ReplayProcessor.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 10);
1801+
1802+
TestSubscriber<Integer> ts = rp.test();
1803+
1804+
rp.onNext(1);
1805+
rp.onNext(2);
1806+
rp.onComplete();
1807+
1808+
ts.assertNoErrors()
1809+
.assertComplete();
1810+
}
1811+
1812+
@Test
1813+
public void timeAndSizeRemoveCorrectNumberOfOld() {
1814+
TestScheduler scheduler = new TestScheduler();
1815+
ReplayProcessor<Integer> rp = ReplayProcessor.createWithTimeAndSize(1, TimeUnit.SECONDS, scheduler, 2);
1816+
1817+
rp.onNext(1);
1818+
rp.onNext(2);
1819+
rp.onNext(3);
1820+
1821+
scheduler.advanceTimeBy(2, TimeUnit.SECONDS);
1822+
1823+
rp.onNext(4);
1824+
rp.onNext(5);
1825+
1826+
rp.test().assertValuesOnly(4, 5);
1827+
}
17531828
}

src/test/java/io/reactivex/subjects/ReplaySubjectTest.java

+75
Original file line numberDiff line numberDiff line change
@@ -1342,4 +1342,79 @@ public void accept(byte[] v) throws Exception {
13421342
+ " -> " + after.get() / 1024.0 / 1024.0);
13431343
}
13441344
}
1345+
1346+
@Test
1347+
public void timeAndSizeNoTerminalTruncationOnTimechange() {
1348+
ReplaySubject<Integer> rs = ReplaySubject.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 1);
1349+
1350+
TestObserver<Integer> to = rs.test();
1351+
1352+
rs.onNext(1);
1353+
rs.cleanupBuffer();
1354+
rs.onComplete();
1355+
1356+
to.assertNoErrors()
1357+
.assertComplete();
1358+
}
1359+
1360+
@Test
1361+
public void timeAndSizeNoTerminalTruncationOnTimechange2() {
1362+
ReplaySubject<Integer> rs = ReplaySubject.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 1);
1363+
1364+
TestObserver<Integer> to = rs.test();
1365+
1366+
rs.onNext(1);
1367+
rs.cleanupBuffer();
1368+
rs.onNext(2);
1369+
rs.cleanupBuffer();
1370+
rs.onComplete();
1371+
1372+
to.assertNoErrors()
1373+
.assertComplete();
1374+
}
1375+
1376+
@Test
1377+
public void timeAndSizeNoTerminalTruncationOnTimechange3() {
1378+
ReplaySubject<Integer> rs = ReplaySubject.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 1);
1379+
1380+
TestObserver<Integer> to = rs.test();
1381+
1382+
rs.onNext(1);
1383+
rs.onNext(2);
1384+
rs.onComplete();
1385+
1386+
to.assertNoErrors()
1387+
.assertComplete();
1388+
}
1389+
1390+
@Test
1391+
public void timeAndSizeNoTerminalTruncationOnTimechange4() {
1392+
ReplaySubject<Integer> rs = ReplaySubject.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 10);
1393+
1394+
TestObserver<Integer> to = rs.test();
1395+
1396+
rs.onNext(1);
1397+
rs.onNext(2);
1398+
rs.onComplete();
1399+
1400+
to.assertNoErrors()
1401+
.assertComplete();
1402+
}
1403+
1404+
@Test
1405+
public void timeAndSizeRemoveCorrectNumberOfOld() {
1406+
TestScheduler scheduler = new TestScheduler();
1407+
ReplaySubject<Integer> rs = ReplaySubject.createWithTimeAndSize(1, TimeUnit.SECONDS, scheduler, 2);
1408+
1409+
rs.onNext(1);
1410+
rs.onNext(2);
1411+
rs.onNext(3); // remove 1 due to maxSize, size == 2
1412+
1413+
scheduler.advanceTimeBy(2, TimeUnit.SECONDS);
1414+
1415+
rs.onNext(4); // remove 2 due to maxSize, remove 3 due to age, size == 1
1416+
rs.onNext(5); // size == 2
1417+
1418+
rs.test().assertValuesOnly(4, 5);
1419+
}
13451420
}

0 commit comments

Comments
 (0)