Skip to content

Commit e4b9175

Browse files
akarnokdakarnokd
akarnokd
authored and
akarnokd
committed
Fix: request and subscriber tracking, nonpositive request error
reporting.
1 parent ce20f14 commit e4b9175

File tree

2 files changed

+56
-17
lines changed

2 files changed

+56
-17
lines changed

rxjava-reactive-streams/src/main/java/rx/internal/reactivestreams/PublisherAdapter.java

+22-8
Original file line numberDiff line numberDiff line change
@@ -15,33 +15,37 @@
1515
*/
1616
package rx.internal.reactivestreams;
1717

18+
import java.util.concurrent.ConcurrentHashMap;
19+
import java.util.concurrent.ConcurrentMap;
20+
import java.util.concurrent.atomic.AtomicBoolean;
21+
import java.util.concurrent.atomic.AtomicLong;
22+
1823
import org.reactivestreams.Publisher;
1924
import org.reactivestreams.Subscriber;
2025
import org.reactivestreams.Subscription;
21-
import rx.Observable;
2226

23-
import java.util.HashSet;
24-
import java.util.Set;
25-
import java.util.concurrent.atomic.AtomicBoolean;
27+
import rx.Observable;
28+
import rx.internal.operators.BackpressureUtils;
2629

2730
public class PublisherAdapter<T> implements Publisher<T> {
2831

2932
private final Observable<T> observable;
3033

31-
private final Set<Subscriber<?>> subscribers = new HashSet<Subscriber<?>>();
34+
private final ConcurrentMap<Subscriber<?>, Object> subscribers = new ConcurrentHashMap<Subscriber<?>, Object>();
3235

3336
public PublisherAdapter(final Observable<T> observable) {
3437
this.observable = observable.serialize();
3538
}
3639

3740
@Override
3841
public void subscribe(final Subscriber<? super T> s) {
39-
if (subscribers.add(s)) {
42+
if (subscribers.putIfAbsent(s, s) == null) {
4043
observable.subscribe(new rx.Subscriber<T>() {
4144
private final AtomicBoolean done = new AtomicBoolean();
42-
45+
private final AtomicLong childRequested = new AtomicLong();
4346
private void doRequest(long n) {
4447
if (!done.get()) {
48+
BackpressureUtils.getAndAddRequest(childRequested, n);
4549
request(n);
4650
}
4751
}
@@ -55,6 +59,7 @@ public void request(long n) {
5559
if (n < 1) {
5660
unsubscribe();
5761
onError(new IllegalArgumentException("3.9 While the Subscription is not cancelled, Subscription.request(long n) MUST throw a java.lang.IllegalArgumentException if the argument is <= 0."));
62+
return;
5863
}
5964

6065
requested.set(true);
@@ -98,7 +103,16 @@ public void onError(Throwable e) {
98103
@Override
99104
public void onNext(T t) {
100105
if (!done.get()) {
101-
s.onNext(t);
106+
if (childRequested.get() > 0) {
107+
s.onNext(t);
108+
childRequested.decrementAndGet();
109+
} else {
110+
try {
111+
onError(new IllegalStateException("1.1 source doesn't respect backpressure"));
112+
} finally {
113+
unsubscribe();
114+
}
115+
}
102116
}
103117
}
104118
});

rxjava-reactive-streams/src/test/java/rx/reactivestreams/NonTckTest.java

+34-9
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,24 @@
1515
*/
1616
package rx.reactivestreams;
1717

18+
import static org.testng.Assert.*;
19+
import static org.testng.AssertJUnit.assertEquals;
20+
import static org.testng.AssertJUnit.assertNotNull;
21+
import static rx.RxReactiveStreams.toObservable;
22+
import static rx.RxReactiveStreams.toPublisher;
23+
24+
import java.util.Arrays;
25+
import java.util.concurrent.TimeUnit;
26+
1827
import org.reactivestreams.Publisher;
1928
import org.testng.annotations.Test;
29+
2030
import rx.Observable;
2131
import rx.Subscriber;
2232
import rx.reactivestreams.test.IterablePublisher;
2333
import rx.reactivestreams.test.RsSubscriber;
2434
import rx.reactivestreams.test.RxSubscriber;
25-
26-
import java.util.Arrays;
27-
import java.util.concurrent.TimeUnit;
28-
29-
import static org.testng.Assert.*;
30-
import static org.testng.AssertJUnit.assertEquals;
31-
import static org.testng.AssertJUnit.assertNotNull;
32-
import static rx.RxReactiveStreams.toObservable;
33-
import static rx.RxReactiveStreams.toPublisher;
35+
import rx.subjects.PublishSubject;
3436

3537
public class NonTckTest {
3638

@@ -145,4 +147,27 @@ void subscribingToHotObservableWithBackpressureStrategy() throws InterruptedExce
145147
subscriber.subscription.cancel();
146148
}
147149

150+
@Test
151+
public void subscribeToPublishSubjectEmitsRuleViolationException() {
152+
PublishSubject<Integer> source = PublishSubject.create();
153+
RsSubscriber<Integer> rxs = subscribe(source);
154+
155+
source.onNext(1);
156+
157+
assertEquals(0, rxs.received.size());
158+
assertNotNull(rxs.error);
159+
}
160+
@Test
161+
public void subscribeToPublishSubjectEmitsRuleViolationExceptionAfterOneRequest() {
162+
PublishSubject<Integer> source = PublishSubject.create();
163+
RsSubscriber<Integer> rxs = subscribe(source);
164+
165+
rxs.subscription.request(1);
166+
source.onNext(1);
167+
168+
source.onNext(2);
169+
170+
assertEquals(1, rxs.received.size());
171+
assertNotNull(rxs.error);
172+
}
148173
}

0 commit comments

Comments
 (0)