Skip to content

Commit 38bcd5e

Browse files
authored
3.x: XProcessor.offer to throw NPE immediately (#6799)
1 parent 3d00eb1 commit 38bcd5e

File tree

6 files changed

+48
-38
lines changed

6 files changed

+48
-38
lines changed

Diff for: src/main/java/io/reactivex/rxjava3/processors/BehaviorProcessor.java

+9-10
Original file line numberDiff line numberDiff line change
@@ -234,11 +234,11 @@ public static <T> BehaviorProcessor<T> create() {
234234
*/
235235
BehaviorProcessor(T defaultValue) {
236236
this();
237-
this.value.lazySet(Objects.requireNonNull(defaultValue, "defaultValue is null"));
237+
this.value.lazySet(defaultValue);
238238
}
239239

240240
@Override
241-
protected void subscribeActual(Subscriber<? super T> s) {
241+
protected void subscribeActual(@NonNull Subscriber<? super T> s) {
242242
BehaviorSubscription<T> bs = new BehaviorSubscription<>(s, this);
243243
s.onSubscribe(bs);
244244
if (add(bs)) {
@@ -258,7 +258,7 @@ protected void subscribeActual(Subscriber<? super T> s) {
258258
}
259259

260260
@Override
261-
public void onSubscribe(Subscription s) {
261+
public void onSubscribe(@NonNull Subscription s) {
262262
if (terminalEvent.get() != null) {
263263
s.cancel();
264264
return;
@@ -267,7 +267,7 @@ public void onSubscribe(Subscription s) {
267267
}
268268

269269
@Override
270-
public void onNext(T t) {
270+
public void onNext(@NonNull T t) {
271271
ExceptionHelper.nullCheck(t, "onNext called with a null value.");
272272

273273
if (terminalEvent.get() != null) {
@@ -281,7 +281,7 @@ public void onNext(T t) {
281281
}
282282

283283
@Override
284-
public void onError(Throwable t) {
284+
public void onError(@NonNull Throwable t) {
285285
ExceptionHelper.nullCheck(t, "onError called with a null Throwable.");
286286
if (!terminalEvent.compareAndSet(null, t)) {
287287
RxJavaPlugins.onError(t);
@@ -316,14 +316,13 @@ public void onComplete() {
316316
* <p>History: 2.0.8 - experimental
317317
* @param t the item to emit, not null
318318
* @return true if the item was emitted to all Subscribers
319+
* @throws NullPointerException if {@code t} is {@code null}
319320
* @since 2.2
320321
*/
321322
@CheckReturnValue
322-
public boolean offer(T t) {
323-
if (t == null) {
324-
onError(ExceptionHelper.createNullPointerException("offer called with a null value."));
325-
return true;
326-
}
323+
public boolean offer(@NonNull T t) {
324+
ExceptionHelper.nullCheck(t, "offer called with a null value.");
325+
327326
BehaviorSubscription<T>[] array = subscribers.get();
328327

329328
for (BehaviorSubscription<T> s : array) {

Diff for: src/main/java/io/reactivex/rxjava3/processors/MulticastProcessor.java

+11-8
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ public void startUnbounded() {
258258
}
259259

260260
@Override
261-
public void onSubscribe(Subscription s) {
261+
public void onSubscribe(@NonNull Subscription s) {
262262
if (SubscriptionHelper.setOnce(upstream, s)) {
263263
if (s instanceof QueueSubscription) {
264264
@SuppressWarnings("unchecked")
@@ -288,7 +288,7 @@ public void onSubscribe(Subscription s) {
288288
}
289289

290290
@Override
291-
public void onNext(T t) {
291+
public void onNext(@NonNull T t) {
292292
if (once.get()) {
293293
return;
294294
}
@@ -306,26 +306,29 @@ public void onNext(T t) {
306306
/**
307307
* Tries to offer an item into the internal queue and returns false
308308
* if the queue is full.
309-
* @param t the item to offer, not null
309+
* @param t the item to offer, not {@code null}
310310
* @return true if successful, false if the queue is full
311+
* @throws NullPointerException if {@code t} is {@code null}
312+
* @throws IllegalStateException if the processor is in fusion mode
311313
*/
312314
@CheckReturnValue
313-
public boolean offer(T t) {
315+
public boolean offer(@NonNull T t) {
316+
ExceptionHelper.nullCheck(t, "offer called with a null value.");
314317
if (once.get()) {
315318
return false;
316319
}
317-
ExceptionHelper.nullCheck(t, "offer called with a null value.");
318320
if (fusionMode == QueueSubscription.NONE) {
319321
if (queue.offer(t)) {
320322
drain();
321323
return true;
322324
}
325+
return false;
323326
}
324-
return false;
327+
throw new IllegalStateException("offer() should not be called in fusion mode!");
325328
}
326329

327330
@Override
328-
public void onError(Throwable t) {
331+
public void onError(@NonNull Throwable t) {
329332
ExceptionHelper.nullCheck(t, "onError called with a null Throwable.");
330333
if (once.compareAndSet(false, true)) {
331334
error = t;
@@ -369,7 +372,7 @@ public Throwable getThrowable() {
369372
}
370373

371374
@Override
372-
protected void subscribeActual(Subscriber<? super T> s) {
375+
protected void subscribeActual(@NonNull Subscriber<? super T> s) {
373376
MulticastSubscription<T> ms = new MulticastSubscription<>(s, this);
374377
s.onSubscribe(ms);
375378
if (add(ms)) {

Diff for: src/main/java/io/reactivex/rxjava3/processors/PublishProcessor.java

+8-9
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ public static <T> PublishProcessor<T> create() {
141141
}
142142

143143
@Override
144-
protected void subscribeActual(Subscriber<? super T> t) {
144+
protected void subscribeActual(@NonNull Subscriber<? super T> t) {
145145
PublishSubscription<T> ps = new PublishSubscription<>(t, this);
146146
t.onSubscribe(ps);
147147
if (add(ps)) {
@@ -226,7 +226,7 @@ void remove(PublishSubscription<T> ps) {
226226
}
227227

228228
@Override
229-
public void onSubscribe(Subscription s) {
229+
public void onSubscribe(@NonNull Subscription s) {
230230
if (subscribers.get() == TERMINATED) {
231231
s.cancel();
232232
return;
@@ -236,7 +236,7 @@ public void onSubscribe(Subscription s) {
236236
}
237237

238238
@Override
239-
public void onNext(T t) {
239+
public void onNext(@NonNull T t) {
240240
ExceptionHelper.nullCheck(t, "onNext called with a null value.");
241241
for (PublishSubscription<T> s : subscribers.get()) {
242242
s.onNext(t);
@@ -245,7 +245,7 @@ public void onNext(T t) {
245245

246246
@SuppressWarnings("unchecked")
247247
@Override
248-
public void onError(Throwable t) {
248+
public void onError(@NonNull Throwable t) {
249249
ExceptionHelper.nullCheck(t, "onError called with a null Throwable.");
250250
if (subscribers.get() == TERMINATED) {
251251
RxJavaPlugins.onError(t);
@@ -281,14 +281,13 @@ public void onComplete() {
281281
* <p>History: 2.0.8 - experimental
282282
* @param t the item to emit, not null
283283
* @return true if the item was emitted to all Subscribers
284+
* @throws NullPointerException if {@code t} is {@code null}
284285
* @since 2.2
285286
*/
286287
@CheckReturnValue
287-
public boolean offer(T t) {
288-
if (t == null) {
289-
onError(ExceptionHelper.createNullPointerException("offer called with a null value."));
290-
return true;
291-
}
288+
public boolean offer(@NonNull T t) {
289+
ExceptionHelper.nullCheck(t, "offer called with a null value.");
290+
292291
PublishSubscription<T>[] array = subscribers.get();
293292

294293
for (PublishSubscription<T> s : array) {

Diff for: src/test/java/io/reactivex/rxjava3/processors/BehaviorProcessorTest.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -673,12 +673,14 @@ public void offer() {
673673

674674
ts = pp.test(1);
675675

676-
assertTrue(pp.offer(null));
677-
678-
ts.assertFailure(NullPointerException.class, 2);
676+
try {
677+
pp.offer(null);
678+
fail("Should have thrown NPE!");
679+
} catch (NullPointerException expected) {
680+
// expected
681+
}
679682

680-
assertTrue(pp.hasThrowable());
681-
assertTrue(pp.getThrowable().toString(), pp.getThrowable() instanceof NullPointerException);
683+
ts.assertValuesOnly(2);
682684
}
683685

684686
@Test

Diff for: src/test/java/io/reactivex/rxjava3/processors/MulticastProcessorTest.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -466,7 +466,12 @@ public void asyncFused() {
466466
up.onNext(i);
467467
}
468468

469-
assertFalse(mp.offer(10));
469+
try {
470+
mp.offer(10);
471+
fail("Should have thrown IllegalStateException");
472+
} catch (IllegalStateException expected) {
473+
// expected
474+
}
470475

471476
up.onComplete();
472477

Diff for: src/test/java/io/reactivex/rxjava3/processors/PublishProcessorTest.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -597,12 +597,14 @@ public void offer() {
597597

598598
ts = pp.test(0);
599599

600-
assertTrue(pp.offer(null));
601-
602-
ts.assertFailure(NullPointerException.class);
600+
try {
601+
pp.offer(null);
602+
fail("Should have thrown NPE!");
603+
} catch (NullPointerException expected) {
604+
// expected
605+
}
603606

604-
assertTrue(pp.hasThrowable());
605-
assertTrue(pp.getThrowable().toString(), pp.getThrowable() instanceof NullPointerException);
607+
ts.assertEmpty();
606608
}
607609

608610
@Test

0 commit comments

Comments
 (0)