From df69937720ab1c649de31855f0bbd7ed527079e4 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 22 Jul 2019 16:05:05 +0200 Subject: [PATCH 1/3] 3.x: fix truncation bugs in replay() and ReplaySubject/Processor --- .../operators/flowable/FlowableReplay.java | 26 +++--- .../observable/ObservableReplay.java | 13 +-- .../reactivex/processors/ReplayProcessor.java | 5 ++ .../io/reactivex/subjects/ReplaySubject.java | 5 ++ .../FlowableReplayEagerTruncateTest.java | 11 ++- .../ObservableReplayEagerTruncateTest.java | 9 +++ .../processors/ReplayProcessorTest.java | 80 ++++++++++++++++++- .../reactivex/subjects/ReplaySubjectTest.java | 78 +++++++++++++++++- .../testsupport/TimesteppingScheduler.java | 60 ++++++++++++++ 9 files changed, 265 insertions(+), 22 deletions(-) create mode 100644 src/test/java/io/reactivex/testsupport/TimesteppingScheduler.java diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java index 6d57dffced..5bac8ffda9 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java @@ -767,6 +767,11 @@ final void removeFirst() { } setFirst(head); + // correct the tail if all items have been removed + head = get(); + if (head.get() == null) { + tail = head; + } } /** * Arranges the given node is the new head from now on. @@ -775,11 +780,7 @@ final void removeFirst() { final void setFirst(Node n) { if (eagerTruncate) { Node m = new Node(null, n.index); - Node nextNode = n.get(); - if (nextNode == null) { - tail = m; - } - m.lazySet(nextNode); + m.lazySet(n.get()); n = m; } set(n); @@ -787,7 +788,7 @@ final void setFirst(Node n) { @Override public final void next(T value) { - Object o = enterTransform(NotificationLite.next(value)); + Object o = enterTransform(NotificationLite.next(value), false); Node n = new Node(o, ++index); addLast(n); truncate(); @@ -795,7 +796,7 @@ public final void next(T value) { @Override public final void error(Throwable e) { - Object o = enterTransform(NotificationLite.error(e)); + Object o = enterTransform(NotificationLite.error(e), true); Node n = new Node(o, ++index); addLast(n); truncateFinal(); @@ -803,7 +804,7 @@ public final void error(Throwable e) { @Override public final void complete() { - Object o = enterTransform(NotificationLite.complete()); + Object o = enterTransform(NotificationLite.complete(), true); Node n = new Node(o, ++index); addLast(n); truncateFinal(); @@ -897,9 +898,10 @@ public final void replay(InnerSubscription output) { * Override this to wrap the NotificationLite object into a * container to be used later by truncate. * @param value the value to transform into the internal representation + * @param terminal is this a terminal value? * @return the transformed value */ - Object enterTransform(Object value) { + Object enterTransform(Object value, boolean terminal) { return value; } /** @@ -1001,8 +1003,8 @@ static final class SizeAndTimeBoundReplayBuffer extends BoundedReplayBuffer(value, scheduler.now(unit), unit); + Object enterTransform(Object value, boolean terminal) { + return new Timed(value, terminal ? Long.MAX_VALUE : scheduler.now(unit), unit); } @Override @@ -1019,7 +1021,7 @@ void truncate() { int e = 0; for (;;) { - if (next != null) { + if (next != null && size > 1) { // never truncate the very last item just added if (size > limit) { e++; size--; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java index 964727df6e..674ab2f5b4 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java @@ -633,6 +633,11 @@ final void trimHead() { } setFirst(head); + // correct the tail if all items have been removed + head = get(); + if (head.get() == null) { + tail = head; + } } /** * Arranges the given node is the new head from now on. @@ -641,11 +646,7 @@ final void trimHead() { final void setFirst(Node n) { if (eagerTruncate) { Node m = new Node(null); - Node nextNode = n.get(); - if (nextNode == null) { - tail = m; - } - m.lazySet(nextNode); + m.lazySet(n.get()); n = m; } set(n); @@ -845,7 +846,7 @@ void truncate() { int e = 0; for (;;) { - if (next != null) { + if (next != null && size > 1) { // never truncate the very last item just added if (size > limit) { e++; size--; diff --git a/src/main/java/io/reactivex/processors/ReplayProcessor.java b/src/main/java/io/reactivex/processors/ReplayProcessor.java index ff98ff25d9..b04a17d07e 100644 --- a/src/main/java/io/reactivex/processors/ReplayProcessor.java +++ b/src/main/java/io/reactivex/processors/ReplayProcessor.java @@ -1070,6 +1070,10 @@ void trim() { TimedNode h = head; for (;;) { + if (size <= 1) { + head = h; + break; + } TimedNode next = h.get(); if (next == null) { head = h; @@ -1082,6 +1086,7 @@ void trim() { } h = next; + size--; } } diff --git a/src/main/java/io/reactivex/subjects/ReplaySubject.java b/src/main/java/io/reactivex/subjects/ReplaySubject.java index 703692bd57..622854b5ec 100644 --- a/src/main/java/io/reactivex/subjects/ReplaySubject.java +++ b/src/main/java/io/reactivex/subjects/ReplaySubject.java @@ -1071,6 +1071,10 @@ void trim() { TimedNode h = head; for (;;) { + if (size <= 1) { + head = h; + break; + } TimedNode next = h.get(); if (next == null) { head = h; @@ -1083,6 +1087,7 @@ void trim() { } h = next; + size--; } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayEagerTruncateTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayEagerTruncateTest.java index 30abb35e9c..e09801a8e8 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayEagerTruncateTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayEagerTruncateTest.java @@ -28,7 +28,7 @@ import io.reactivex.*; import io.reactivex.Scheduler.Worker; import io.reactivex.annotations.NonNull; -import io.reactivex.disposables.Disposable; +import io.reactivex.disposables.*; import io.reactivex.exceptions.TestException; import io.reactivex.flowables.ConnectableFlowable; import io.reactivex.functions.*; @@ -2258,4 +2258,13 @@ public void timeAndSizeBoundSelectorEagerTruncate() throws Exception { + " -> " + after / 1024.0 / 1024.0); } } + + @Test + public void timeAndSizeNoTerminalTruncationOnTimechange() { + Flowable.just(1).replay(1, 1, TimeUnit.SECONDS, new TimesteppingScheduler(), true) + .autoConnect() + .test() + .assertComplete() + .assertNoErrors(); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayEagerTruncateTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayEagerTruncateTest.java index 4e6b8738c1..d41795c666 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayEagerTruncateTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayEagerTruncateTest.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.observable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; import java.lang.management.*; @@ -1997,4 +1998,12 @@ public void timeAndSizeSelectorBoundEagerTruncate() throws Exception { } } + @Test + public void timeAndSizeNoTerminalTruncationOnTimechange() { + Observable.just(1).replay(1, 1, TimeUnit.SECONDS, new TimesteppingScheduler(), true) + .autoConnect() + .test() + .assertComplete() + .assertNoErrors(); + } } diff --git a/src/test/java/io/reactivex/processors/ReplayProcessorTest.java b/src/test/java/io/reactivex/processors/ReplayProcessorTest.java index 247fecd02b..e7bc86a1e6 100644 --- a/src/test/java/io/reactivex/processors/ReplayProcessorTest.java +++ b/src/test/java/io/reactivex/processors/ReplayProcessorTest.java @@ -14,6 +14,7 @@ package io.reactivex.processors; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; import java.lang.management.*; @@ -25,7 +26,7 @@ import org.mockito.*; import org.reactivestreams.*; -import io.reactivex.*; +import io.reactivex.Flowable; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; @@ -33,7 +34,7 @@ import io.reactivex.processors.ReplayProcessor.*; import io.reactivex.schedulers.*; import io.reactivex.subscribers.*; -import io.reactivex.testsupport.TestHelper; +import io.reactivex.testsupport.*; public class ReplayProcessorTest extends FlowableProcessorTest { @@ -1751,4 +1752,79 @@ public void accept(byte[] v) throws Exception { + " -> " + after.get() / 1024.0 / 1024.0); } } + + @Test + public void timeAndSizeNoTerminalTruncationOnTimechange() { + ReplayProcessor rp = ReplayProcessor.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 1); + + TestSubscriber ts = rp.test(); + + rp.onNext(1); + rp.cleanupBuffer(); + rp.onComplete(); + + ts.assertNoErrors() + .assertComplete(); + } + + @Test + public void timeAndSizeNoTerminalTruncationOnTimechange2() { + ReplayProcessor rp = ReplayProcessor.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 1); + + TestSubscriber ts = rp.test(); + + rp.onNext(1); + rp.cleanupBuffer(); + rp.onNext(2); + rp.cleanupBuffer(); + rp.onComplete(); + + ts.assertNoErrors() + .assertComplete(); + } + + @Test + public void timeAndSizeNoTerminalTruncationOnTimechange3() { + ReplayProcessor rp = ReplayProcessor.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 1); + + TestSubscriber ts = rp.test(); + + rp.onNext(1); + rp.onNext(2); + rp.onComplete(); + + ts.assertNoErrors() + .assertComplete(); + } + + @Test + public void timeAndSizeNoTerminalTruncationOnTimechange4() { + ReplayProcessor rp = ReplayProcessor.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 10); + + TestSubscriber ts = rp.test(); + + rp.onNext(1); + rp.onNext(2); + rp.onComplete(); + + ts.assertNoErrors() + .assertComplete(); + } + + @Test + public void timeAndSizeRemoveCorrectNumberOfOld() { + TestScheduler scheduler = new TestScheduler(); + ReplayProcessor rp = ReplayProcessor.createWithTimeAndSize(1, TimeUnit.SECONDS, scheduler, 2); + + rp.onNext(1); + rp.onNext(2); + rp.onNext(3); + + scheduler.advanceTimeBy(2, TimeUnit.SECONDS); + + rp.onNext(4); + rp.onNext(5); + + rp.test().assertValuesOnly(4, 5); + } } diff --git a/src/test/java/io/reactivex/subjects/ReplaySubjectTest.java b/src/test/java/io/reactivex/subjects/ReplaySubjectTest.java index 6a86e01e46..3afb52d171 100644 --- a/src/test/java/io/reactivex/subjects/ReplaySubjectTest.java +++ b/src/test/java/io/reactivex/subjects/ReplaySubjectTest.java @@ -14,6 +14,7 @@ package io.reactivex.subjects; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; import java.lang.management.*; @@ -31,7 +32,7 @@ import io.reactivex.observers.*; import io.reactivex.schedulers.*; import io.reactivex.subjects.ReplaySubject.*; -import io.reactivex.testsupport.TestHelper; +import io.reactivex.testsupport.*; public class ReplaySubjectTest extends SubjectTest { @@ -1343,4 +1344,79 @@ public void accept(byte[] v) throws Exception { + " -> " + after.get() / 1024.0 / 1024.0); } } + + @Test + public void timeAndSizeNoTerminalTruncationOnTimechange() { + ReplaySubject rs = ReplaySubject.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 1); + + TestObserver to = rs.test(); + + rs.onNext(1); + rs.cleanupBuffer(); + rs.onComplete(); + + to.assertNoErrors() + .assertComplete(); + } + + @Test + public void timeAndSizeNoTerminalTruncationOnTimechange2() { + ReplaySubject rs = ReplaySubject.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 1); + + TestObserver to = rs.test(); + + rs.onNext(1); + rs.cleanupBuffer(); + rs.onNext(2); + rs.cleanupBuffer(); + rs.onComplete(); + + to.assertNoErrors() + .assertComplete(); + } + + @Test + public void timeAndSizeNoTerminalTruncationOnTimechange3() { + ReplaySubject rs = ReplaySubject.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 1); + + TestObserver to = rs.test(); + + rs.onNext(1); + rs.onNext(2); + rs.onComplete(); + + to.assertNoErrors() + .assertComplete(); + } + + @Test + public void timeAndSizeNoTerminalTruncationOnTimechange4() { + ReplaySubject rs = ReplaySubject.createWithTimeAndSize(1, TimeUnit.SECONDS, new TimesteppingScheduler(), 10); + + TestObserver to = rs.test(); + + rs.onNext(1); + rs.onNext(2); + rs.onComplete(); + + to.assertNoErrors() + .assertComplete(); + } + + @Test + public void timeAndSizeRemoveCorrectNumberOfOld() { + TestScheduler scheduler = new TestScheduler(); + ReplaySubject rs = ReplaySubject.createWithTimeAndSize(1, TimeUnit.SECONDS, scheduler, 2); + + rs.onNext(1); + rs.onNext(2); + rs.onNext(3); // remove 1 due to maxSize, size == 2 + + scheduler.advanceTimeBy(2, TimeUnit.SECONDS); + + rs.onNext(4); // remove 2 due to maxSize, remove 3 due to age, size == 1 + rs.onNext(5); // size == 2 + + rs.test().assertValuesOnly(4, 5); + } } diff --git a/src/test/java/io/reactivex/testsupport/TimesteppingScheduler.java b/src/test/java/io/reactivex/testsupport/TimesteppingScheduler.java new file mode 100644 index 0000000000..31e467fd83 --- /dev/null +++ b/src/test/java/io/reactivex/testsupport/TimesteppingScheduler.java @@ -0,0 +1,60 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.testsupport; + +import java.util.concurrent.TimeUnit; + +import io.reactivex.Scheduler; +import io.reactivex.disposables.*; + +/** + * Basic scheduler that produces an ever increasing {@link #now(TimeUnit)} value. + * Use this scheduler only as a time source! + */ +public class TimesteppingScheduler extends Scheduler { + + final class TimesteppingWorker extends Worker { + @Override + public void dispose() { + } + + @Override + public boolean isDisposed() { + return false; + } + + @Override + public Disposable schedule(Runnable run, long delay, TimeUnit unit) { + run.run(); + return Disposables.disposed(); + } + + @Override + public long now(TimeUnit unit) { + return time++; + } + } + + long time; + + @Override + public Worker createWorker() { + return new TimesteppingWorker(); + } + + @Override + public long now(TimeUnit unit) { + return time++; + } +} From f070303854c705bc3a54f5b6c6ec4a1b6073886f Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 22 Jul 2019 16:37:01 +0200 Subject: [PATCH 2/3] Remove impossible condition now that size is correctly checked/computed --- src/main/java/io/reactivex/processors/ReplayProcessor.java | 6 +----- src/main/java/io/reactivex/subjects/ReplaySubject.java | 6 +----- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/src/main/java/io/reactivex/processors/ReplayProcessor.java b/src/main/java/io/reactivex/processors/ReplayProcessor.java index b04a17d07e..58df266e7a 100644 --- a/src/main/java/io/reactivex/processors/ReplayProcessor.java +++ b/src/main/java/io/reactivex/processors/ReplayProcessor.java @@ -1074,12 +1074,8 @@ void trim() { head = h; break; } - TimedNode next = h.get(); - if (next == null) { - head = h; - break; - } + TimedNode next = h.get(); if (next.time > limit) { head = h; break; diff --git a/src/main/java/io/reactivex/subjects/ReplaySubject.java b/src/main/java/io/reactivex/subjects/ReplaySubject.java index 622854b5ec..164d50a710 100644 --- a/src/main/java/io/reactivex/subjects/ReplaySubject.java +++ b/src/main/java/io/reactivex/subjects/ReplaySubject.java @@ -1075,12 +1075,8 @@ void trim() { head = h; break; } - TimedNode next = h.get(); - if (next == null) { - head = h; - break; - } + TimedNode next = h.get(); if (next.time > limit) { head = h; break; From 07148ecc1a6a8c80a3a4614fa5531c6cdf8cbaf1 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 22 Jul 2019 17:13:59 +0200 Subject: [PATCH 3/3] Undo the last commit as it somehow hangs the CI, offline is okay. --- src/main/java/io/reactivex/processors/ReplayProcessor.java | 6 +++++- src/main/java/io/reactivex/subjects/ReplaySubject.java | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/main/java/io/reactivex/processors/ReplayProcessor.java b/src/main/java/io/reactivex/processors/ReplayProcessor.java index 58df266e7a..b04a17d07e 100644 --- a/src/main/java/io/reactivex/processors/ReplayProcessor.java +++ b/src/main/java/io/reactivex/processors/ReplayProcessor.java @@ -1074,8 +1074,12 @@ void trim() { head = h; break; } - TimedNode next = h.get(); + if (next == null) { + head = h; + break; + } + if (next.time > limit) { head = h; break; diff --git a/src/main/java/io/reactivex/subjects/ReplaySubject.java b/src/main/java/io/reactivex/subjects/ReplaySubject.java index 164d50a710..622854b5ec 100644 --- a/src/main/java/io/reactivex/subjects/ReplaySubject.java +++ b/src/main/java/io/reactivex/subjects/ReplaySubject.java @@ -1075,8 +1075,12 @@ void trim() { head = h; break; } - TimedNode next = h.get(); + if (next == null) { + head = h; + break; + } + if (next.time > limit) { head = h; break;