Skip to content

Commit 7fbd565

Browse files
committed
+tck #284 support "demand when all downstreams demand" Processor in TCK
1 parent 4264e1d commit 7fbd565

File tree

4 files changed

+186
-23
lines changed

4 files changed

+186
-23
lines changed

tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java

+20-13
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.reactivestreams.tck.support.Function;
1212
import org.reactivestreams.tck.support.SubscriberWhiteboxVerificationRules;
1313
import org.reactivestreams.tck.support.PublisherVerificationRules;
14+
import org.reactivestreams.tck.support.TestException;
1415
import org.testng.annotations.BeforeMethod;
1516
import org.testng.annotations.Test;
1617

@@ -387,23 +388,29 @@ public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANo
387388
optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() {
388389
@Override
389390
public TestSetup apply(Long aLong) throws Throwable {
390-
return new TestSetup(env, processorBufferSize) {{
391+
return new TestSetup(env, processorBufferSize, false) {{
391392
final ManualSubscriberWithErrorCollection<T> sub1 = new ManualSubscriberWithErrorCollection<T>(env);
392-
env.subscribe(processor, sub1);
393-
394393
final ManualSubscriberWithErrorCollection<T> sub2 = new ManualSubscriberWithErrorCollection<T>(env);
394+
395+
// connect upstream
396+
env.subscribe(this, processor);
397+
// connect downstreams
398+
env.subscribe(processor, sub1);
399+
sub1.request(2);
395400
env.subscribe(processor, sub2);
401+
sub2.request(1);
396402

397-
sub1.request(1);
403+
// request bubbles up to upstream publisher:
398404
expectRequest();
399405
final T x = sendNextTFromUpstream();
400406
expectNextElement(sub1, x);
401407
sub1.request(1);
402408

403409
// sub1 has received one element, and has one demand pending
404-
// sub2 has not yet requested anything
410+
// sub2 has received one element, and no more pending demand
405411

406-
final Exception ex = new RuntimeException("Test exception");
412+
// if upstream fails, both should get the error signal
413+
final Exception ex = new TestException();
407414
sendError(ex);
408415
sub1.expectError(ex);
409416
sub2.expectError(ex);
@@ -472,11 +479,11 @@ public void onError(Throwable cause) {
472479
// must immediately pass on `onError` events received from its upstream to its downstream
473480
@Test
474481
public void mustImmediatelyPassOnOnErrorEventsReceivedFromItsUpstreamToItsDownstream() throws Exception {
475-
new TestSetup(env, processorBufferSize) {{
482+
new TestSetup(env, processorBufferSize, true) {{
476483
final ManualSubscriberWithErrorCollection<T> sub = new ManualSubscriberWithErrorCollection<T>(env);
477484
env.subscribe(processor, sub);
478485

479-
final Exception ex = new RuntimeException("Test exception");
486+
final Exception ex = new TestException();
480487
sendError(ex);
481488
sub.expectError(ex); // "immediately", i.e. without a preceding request
482489

@@ -629,13 +636,13 @@ public void required_mustRequestFromUpstreamForElementsThatHaveBeenRequestedLong
629636
optionalMultipleSubscribersTest(2, new Function<Long,TestSetup>() {
630637
@Override
631638
public TestSetup apply(Long subscribers) throws Throwable {
632-
return new TestSetup(env, processorBufferSize) {{
633-
ManualSubscriber<T> sub1 = newSubscriber();
639+
return new TestSetup(env, processorBufferSize, false) {{
640+
final ManualSubscriber<T> sub1 = newSubscriber();
634641
sub1.request(20);
635642

636643
long totalRequests = expectRequest();
637644
final T x = sendNextTFromUpstream();
638-
expectNextElement(sub1, x);
645+
expectNextElement(sub1, x); // correct, this is not valid in case of "wait for slowest"
639646

640647
if (totalRequests == 1) {
641648
totalRequests += expectRequest();
@@ -700,11 +707,11 @@ public abstract class TestSetup extends ManualPublisher<T> {
700707

701708
final Processor<T, T> processor;
702709

703-
public TestSetup(TestEnvironment env, int testBufferSize) throws InterruptedException {
710+
public TestSetup(TestEnvironment env, int testBufferSize, boolean subscribeProcessorToEnvPublisher) throws InterruptedException {
704711
super(env);
705712
tees = env.newManualSubscriber(createHelperPublisher(Long.MAX_VALUE));
706713
processor = createIdentityProcessor(testBufferSize);
707-
subscribe(processor);
714+
if (subscribeProcessorToEnvPublisher) subscribe(processor);
708715
}
709716

710717
public ManualSubscriber<T> newSubscriber() throws InterruptedException {

tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java

+4
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,10 @@ public <T> T flopAndFail(String msg) {
183183
}
184184

185185

186+
public <T> void subscribe(Publisher<T> pub, Subscriber<T> sub) throws InterruptedException {
187+
pub.subscribe(sub);
188+
verifyNoAsyncErrorsNoDelay();
189+
}
186190

187191
public <T> void subscribe(Publisher<T> pub, TestSubscriber<T> sub) throws InterruptedException {
188192
subscribe(pub, sub, defaultTimeoutMillis);

tck/src/test/java/org/reactivestreams/tck/IdentityProcessorVerificationTest.java

+161-10
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@
44
import org.reactivestreams.Publisher;
55
import org.reactivestreams.Subscriber;
66
import org.reactivestreams.Subscription;
7+
import org.reactivestreams.tck.support.NonFatal;
78
import org.reactivestreams.tck.support.TCKVerificationSupport;
89
import org.testng.annotations.AfterClass;
910
import org.testng.annotations.BeforeClass;
1011
import org.testng.annotations.Test;
1112

12-
import java.util.concurrent.ExecutorService;
13-
import java.util.concurrent.Executors;
13+
import java.util.concurrent.*;
14+
import java.util.concurrent.atomic.AtomicLong;
1415

1516
/**
1617
* Validates that the TCK's {@link IdentityProcessorVerification} fails with nice human readable errors.
@@ -27,25 +28,36 @@ public class IdentityProcessorVerificationTest extends TCKVerificationSupport {
2728
@Test
2829
public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError_shouldBeIgnored() throws Throwable {
2930
requireTestSkip(new ThrowingRunnable() {
30-
@Override public void run() throws Throwable {
31-
new IdentityProcessorVerification<Integer>(newTestEnvironment(), DEFAULT_TIMEOUT_MILLIS){
32-
@Override public Processor<Integer, Integer> createIdentityProcessor(int bufferSize) {
31+
@Override
32+
public void run() throws Throwable {
33+
new IdentityProcessorVerification<Integer>(newTestEnvironment(), DEFAULT_TIMEOUT_MILLIS) {
34+
@Override
35+
public Processor<Integer, Integer> createIdentityProcessor(int bufferSize) {
3336
return new NoopProcessor();
3437
}
3538

36-
@Override public ExecutorService publisherExecutorService() { return ex; }
39+
@Override
40+
public ExecutorService publisherExecutorService() {
41+
return ex;
42+
}
3743

38-
@Override public Integer createElement(int element) { return element; }
44+
@Override
45+
public Integer createElement(int element) {
46+
return element;
47+
}
3948

40-
@Override public Publisher<Integer> createHelperPublisher(long elements) {
49+
@Override
50+
public Publisher<Integer> createHelperPublisher(long elements) {
4151
return SKIP;
4252
}
4353

44-
@Override public Publisher<Integer> createFailedPublisher() {
54+
@Override
55+
public Publisher<Integer> createFailedPublisher() {
4556
return SKIP;
4657
}
4758

48-
@Override public long maxSupportedSubscribers() {
59+
@Override
60+
public long maxSupportedSubscribers() {
4961
return 1; // can only support 1 subscribe => unable to run this test
5062
}
5163
}.required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError();
@@ -115,6 +127,145 @@ public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANo
115127
}, "Did not receive expected error on downstream within " + DEFAULT_TIMEOUT_MILLIS);
116128
}
117129

130+
@Test
131+
public void required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError_shouldAllowSignalingElementAfterBothDownstreamsDemand() throws Throwable {
132+
final TestEnvironment env = newTestEnvironment();
133+
new IdentityProcessorVerification<Integer>(env, DEFAULT_TIMEOUT_MILLIS) {
134+
@Override
135+
public Processor<Integer, Integer> createIdentityProcessor(int bufferSize) { // knowingly ignoring buffer size, acting as-if 0
136+
return new Processor<Integer, Integer>() {
137+
138+
private volatile Subscription upstreamSubscription;
139+
140+
private final CopyOnWriteArrayList<MySubscription> subs = new CopyOnWriteArrayList<MySubscription>();
141+
private final CopyOnWriteArrayList<Subscriber<? super Integer>> subscribers = new CopyOnWriteArrayList<Subscriber<? super Integer>>();
142+
private final AtomicLong demand1 = new AtomicLong();
143+
private final AtomicLong demand2 = new AtomicLong();
144+
private final CountDownLatch awaitLatch = new CountDownLatch(2); // to know when both subscribers have signalled demand
145+
146+
@Override
147+
public void subscribe(final Subscriber<? super Integer> s) {
148+
int subscriberCount = subs.size();
149+
switch (subscriberCount) {
150+
case 0:
151+
s.onSubscribe(createSubscription(awaitLatch, s, demand1));
152+
break;
153+
case 1:
154+
s.onSubscribe(createSubscription(awaitLatch, s, demand2));
155+
break;
156+
default:
157+
throw new RuntimeException(String.format("This for-test-purposes-processor supports only 2 subscribers, yet got %s!", subscriberCount));
158+
}
159+
}
160+
161+
@Override
162+
public void onSubscribe(Subscription s) {
163+
this.upstreamSubscription = s;
164+
}
165+
166+
@Override
167+
public void onNext(Integer elem) {
168+
for (Subscriber<? super Integer> subscriber : subscribers) {
169+
try {
170+
subscriber.onNext(elem);
171+
} catch (Throwable t) {
172+
env.flop(t, String.format("Calling onNext on [%s] should not throw! See https://github.com/reactive-streams/reactive-streams-jvm#2.13", subscriber));
173+
}
174+
}
175+
}
176+
177+
@Override
178+
public void onError(Throwable t) {
179+
for (Subscriber<? super Integer> subscriber : subscribers) {
180+
try {
181+
subscriber.onError(t);
182+
} catch (Exception ex) {
183+
env.flop(ex, String.format("Calling onError on [%s] should not throw! See https://github.com/reactive-streams/reactive-streams-jvm#2.13", subscriber));
184+
}
185+
}
186+
}
187+
188+
@Override
189+
public void onComplete() {
190+
for (Subscriber<? super Integer> subscriber : subscribers) {
191+
try {
192+
subscriber.onComplete();
193+
} catch (Exception ex) {
194+
env.flop(ex, String.format("Calling onComplete on [%s] should not throw! See https://github.com/reactive-streams/reactive-streams-jvm#2.13", subscriber));
195+
}
196+
}
197+
}
198+
199+
private Subscription createSubscription(CountDownLatch awaitLatch, final Subscriber<? super Integer> s, final AtomicLong demand) {
200+
final MySubscription sub = new MySubscription(awaitLatch, s, demand);
201+
subs.add(sub);
202+
subscribers.add(s);
203+
return sub;
204+
}
205+
206+
final class MySubscription implements Subscription {
207+
private final CountDownLatch awaitLatch;
208+
private final Subscriber<? super Integer> s;
209+
private final AtomicLong demand;
210+
211+
public MySubscription(CountDownLatch awaitTwoLatch, Subscriber<? super Integer> s, AtomicLong demand) {
212+
this.awaitLatch = awaitTwoLatch;
213+
this.s = s;
214+
this.demand = demand;
215+
}
216+
217+
@Override
218+
public void request(final long n) {
219+
ex.execute(new Runnable() {
220+
@Override
221+
public void run() {
222+
if (demand.get() >= 0) {
223+
demand.addAndGet(n);
224+
awaitLatch.countDown();
225+
try {
226+
awaitLatch.await(env.defaultTimeoutMillis(), TimeUnit.MILLISECONDS);
227+
final long d = demand.getAndSet(0);
228+
if (d > 0) upstreamSubscription.request(d);
229+
} catch (InterruptedException e) {
230+
env.flop(e, "Interrupted while awaiting for all downstreams to signal some demand.");
231+
} catch (Throwable t) {
232+
env.flop(t, "Subscription#request has thrown an exception, which is illegal!");
233+
}
234+
} // else cancel was called, do nothing
235+
}
236+
});
237+
}
238+
239+
@Override
240+
public void cancel() {
241+
demand.set(-1); // marks subscription as cancelled
242+
}
243+
244+
@Override
245+
public String toString() {
246+
return String.format("IdentityProcessorVerificationTest:MySubscription(%s, demand = %s)", s, demand);
247+
}
248+
}
249+
};
250+
}
251+
252+
@Override
253+
public ExecutorService publisherExecutorService() {
254+
return ex;
255+
}
256+
257+
@Override
258+
public Integer createElement(int element) {
259+
return element;
260+
}
261+
262+
@Override
263+
public Publisher<Integer> createFailedPublisher() {
264+
return SKIP;
265+
}
266+
}.required_spec104_mustCallOnErrorOnAllItsSubscribersIfItEncountersANonRecoverableError();
267+
}
268+
118269
// FAILING IMPLEMENTATIONS //
119270

120271
final Publisher<Integer> SKIP = null;

tck/src/test/java/org/reactivestreams/tck/SyncTriggeredDemandSubscriberTest.java

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public SyncTriggeredDemandSubscriberTest() {
2424
super(new TestEnvironment());
2525
}
2626

27+
@SuppressWarnings("unchecked")
2728
@Override public void triggerRequest(final Subscriber<? super Integer> subscriber) {
2829
((SyncTriggeredDemandSubscriber<? super Integer>)subscriber).triggerDemand(1);
2930
}

0 commit comments

Comments
 (0)