Skip to content

Commit a97d871

Browse files
authored
3.x: Add missing throwIfFatal calls (#6801)
1 parent d1cbf57 commit a97d871

22 files changed

+144
-11
lines changed

src/main/java/io/reactivex/rxjava3/disposables/AutoCloseableDisposable.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
package io.reactivex.rxjava3.disposables;
1515

1616
import io.reactivex.rxjava3.annotations.NonNull;
17-
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
17+
import io.reactivex.rxjava3.internal.util.ExceptionHelper;
1818

1919
/**
2020
* A disposable container that manages an {@link AutoCloseable} instance.
@@ -33,7 +33,7 @@ protected void onDisposed(@NonNull AutoCloseable value) {
3333
try {
3434
value.close();
3535
} catch (Throwable ex) {
36-
RxJavaPlugins.onError(ex);
36+
throw ExceptionHelper.wrapOrThrow(ex);
3737
}
3838
}
3939

src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStream.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
import io.reactivex.rxjava3.annotations.NonNull;
2323
import io.reactivex.rxjava3.core.*;
24-
import io.reactivex.rxjava3.exceptions.MissingBackpressureException;
24+
import io.reactivex.rxjava3.exceptions.*;
2525
import io.reactivex.rxjava3.functions.*;
2626
import io.reactivex.rxjava3.internal.fuseable.*;
2727
import io.reactivex.rxjava3.internal.queue.SpscArrayQueue;
@@ -61,6 +61,7 @@ protected void subscribeActual(Subscriber<? super R> s) {
6161
stream = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Stream");
6262
}
6363
} catch (Throwable ex) {
64+
Exceptions.throwIfFatal(ex);
6465
EmptySubscription.error(ex, s);
6566
return;
6667
}
@@ -243,6 +244,7 @@ void drain() {
243244
try {
244245
t = queue.poll();
245246
} catch (Throwable ex) {
247+
Exceptions.throwIfFatal(ex);
246248
trySignalError(downstream, ex);
247249
continue;
248250
}
@@ -271,6 +273,7 @@ else if (!isEmpty) {
271273
iterator = null;
272274
}
273275
} catch (Throwable ex) {
276+
Exceptions.throwIfFatal(ex);
274277
trySignalError(downstream, ex);
275278
}
276279
continue;
@@ -282,6 +285,7 @@ else if (!isEmpty) {
282285
try {
283286
item = Objects.requireNonNull(iterator.next(), "The Stream.Iterator returned a null value");
284287
} catch (Throwable ex) {
288+
Exceptions.throwIfFatal(ex);
285289
trySignalError(downstream, ex);
286290
continue;
287291
}
@@ -297,6 +301,7 @@ else if (!isEmpty) {
297301
clearCurrentRethrowCloseError();
298302
}
299303
} catch (Throwable ex) {
304+
Exceptions.throwIfFatal(ex);
300305
trySignalError(downstream, ex);
301306
}
302307
}
@@ -328,6 +333,7 @@ void clearCurrentSuppressCloseError() {
328333
try {
329334
clearCurrentRethrowCloseError();
330335
} catch (Throwable ex) {
336+
Exceptions.throwIfFatal(ex);
331337
RxJavaPlugins.onError(ex);
332338
}
333339
}

src/main/java/io/reactivex/rxjava3/internal/jdk8/ObservableFlatMapStream.java

+1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ protected void subscribeActual(Observer<? super R> observer) {
5555
stream = Objects.requireNonNull(mapper.apply(t), "The mapper returned a null Stream");
5656
}
5757
} catch (Throwable ex) {
58+
Exceptions.throwIfFatal(ex);
5859
EmptyDisposable.error(ex, observer);
5960
return;
6061
}

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCollect.java

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ protected void subscribeActual(Subscriber<? super U> s) {
3939
try {
4040
u = Objects.requireNonNull(initialSupplier.get(), "The initial value supplied is null");
4141
} catch (Throwable e) {
42+
Exceptions.throwIfFatal(e);
4243
EmptySubscription.error(e, s);
4344
return;
4445
}

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCollectSingle.java

+1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ protected void subscribeActual(SingleObserver<? super U> observer) {
4444
try {
4545
u = Objects.requireNonNull(initialSupplier.get(), "The initialSupplier returned a null value");
4646
} catch (Throwable e) {
47+
Exceptions.throwIfFatal(e);
4748
EmptyDisposable.error(e, observer);
4849
return;
4950
}

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDoOnEach.java

+4
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ public T poll() throws Throwable {
159159
try {
160160
onError.accept(ex);
161161
} catch (Throwable exc) {
162+
Exceptions.throwIfFatal(exc);
162163
throw new CompositeException(ex, exc);
163164
}
164165
throw ExceptionHelper.<Exception>throwIfThrowable(ex);
@@ -173,6 +174,7 @@ public T poll() throws Throwable {
173174
try {
174175
onError.accept(ex);
175176
} catch (Throwable exc) {
177+
Exceptions.throwIfFatal(exc);
176178
throw new CompositeException(ex, exc);
177179
}
178180
throw ExceptionHelper.<Exception>throwIfThrowable(ex);
@@ -314,6 +316,7 @@ public T poll() throws Throwable {
314316
try {
315317
onError.accept(ex);
316318
} catch (Throwable exc) {
319+
Exceptions.throwIfFatal(exc);
317320
throw new CompositeException(ex, exc);
318321
}
319322
throw ExceptionHelper.<Exception>throwIfThrowable(ex);
@@ -328,6 +331,7 @@ public T poll() throws Throwable {
328331
try {
329332
onError.accept(ex);
330333
} catch (Throwable exc) {
334+
Exceptions.throwIfFatal(exc);
331335
throw new CompositeException(ex, exc);
332336
}
333337
throw ExceptionHelper.<Exception>throwIfThrowable(ex);

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableReplay.java

+1
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,7 @@ public void connect(Consumer<? super Disposable> connection) {
209209
try {
210210
connection.accept(ps);
211211
} catch (Throwable ex) {
212+
Exceptions.throwIfFatal(ex);
212213
if (doConnect) {
213214
ps.shouldConnect.compareAndSet(true, false);
214215
}

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableScalarXMap.java

+1
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ public void subscribeActual(Subscriber<? super R> s) {
136136
try {
137137
other = Objects.requireNonNull(mapper.apply(value), "The mapper returned a null Publisher");
138138
} catch (Throwable e) {
139+
Exceptions.throwIfFatal(e);
139140
EmptySubscription.error(e, s);
140141
return;
141142
}

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableWindowBoundarySelector.java

+1
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,7 @@ void drain() {
245245
try {
246246
endSource = Objects.requireNonNull(closingIndicator.apply(startItem), "The closingIndicator returned a null Publisher");
247247
} catch (Throwable ex) {
248+
Exceptions.throwIfFatal(ex);
248249
upstream.cancel();
249250
startSubscriber.cancel();
250251
resources.dispose();

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableZipIterable.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -101,15 +101,15 @@ public void onNext(T t) {
101101
try {
102102
u = Objects.requireNonNull(iterator.next(), "The iterator returned a null value");
103103
} catch (Throwable e) {
104-
error(e);
104+
fail(e);
105105
return;
106106
}
107107

108108
V v;
109109
try {
110110
v = Objects.requireNonNull(zipper.apply(t, u), "The zipper function returned a null value");
111111
} catch (Throwable e) {
112-
error(e);
112+
fail(e);
113113
return;
114114
}
115115

@@ -120,7 +120,7 @@ public void onNext(T t) {
120120
try {
121121
b = iterator.hasNext();
122122
} catch (Throwable e) {
123-
error(e);
123+
fail(e);
124124
return;
125125
}
126126

@@ -131,7 +131,7 @@ public void onNext(T t) {
131131
}
132132
}
133133

134-
void error(Throwable e) {
134+
void fail(Throwable e) {
135135
Exceptions.throwIfFatal(e);
136136
done = true;
137137
upstream.cancel();

src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeFromFuture.java

+1
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ protected void subscribeActual(MaybeObserver<? super T> observer) {
5252
v = future.get(timeout, unit);
5353
}
5454
} catch (Throwable ex) {
55+
Exceptions.throwIfFatal(ex);
5556
if (ex instanceof ExecutionException) {
5657
ex = ex.getCause();
5758
}

src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableBuffer.java

+1
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ public void onNext(T t) {
186186
try {
187187
b = ExceptionHelper.nullCheck(bufferSupplier.get(), "The bufferSupplier returned a null Collection.");
188188
} catch (Throwable e) {
189+
Exceptions.throwIfFatal(e);
189190
buffers.clear();
190191
upstream.dispose();
191192
downstream.onError(e);

src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCollect.java

+3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import io.reactivex.rxjava3.core.*;
1616
import io.reactivex.rxjava3.disposables.Disposable;
17+
import io.reactivex.rxjava3.exceptions.Exceptions;
1718
import io.reactivex.rxjava3.functions.*;
1819
import io.reactivex.rxjava3.internal.disposables.*;
1920
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
@@ -37,6 +38,7 @@ protected void subscribeActual(Observer<? super U> t) {
3738
try {
3839
u = Objects.requireNonNull(initialSupplier.get(), "The initialSupplier returned a null value");
3940
} catch (Throwable e) {
41+
Exceptions.throwIfFatal(e);
4042
EmptyDisposable.error(e, t);
4143
return;
4244
}
@@ -86,6 +88,7 @@ public void onNext(T t) {
8688
try {
8789
collector.accept(u, t);
8890
} catch (Throwable e) {
91+
Exceptions.throwIfFatal(e);
8992
upstream.dispose();
9093
onError(e);
9194
}

src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableCollectSingle.java

+3
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import io.reactivex.rxjava3.core.*;
1616
import io.reactivex.rxjava3.disposables.Disposable;
17+
import io.reactivex.rxjava3.exceptions.Exceptions;
1718
import io.reactivex.rxjava3.functions.*;
1819
import io.reactivex.rxjava3.internal.disposables.*;
1920
import io.reactivex.rxjava3.internal.fuseable.FuseToObservable;
@@ -41,6 +42,7 @@ protected void subscribeActual(SingleObserver<? super U> t) {
4142
try {
4243
u = Objects.requireNonNull(initialSupplier.get(), "The initialSupplier returned a null value");
4344
} catch (Throwable e) {
45+
Exceptions.throwIfFatal(e);
4446
EmptyDisposable.error(e, t);
4547
return;
4648
}
@@ -94,6 +96,7 @@ public void onNext(T t) {
9496
try {
9597
collector.accept(u, t);
9698
} catch (Throwable e) {
99+
Exceptions.throwIfFatal(e);
97100
upstream.dispose();
98101
onError(e);
99102
}

src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableReplay.java

+1
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@ public void connect(Consumer<? super Disposable> connection) {
205205
try {
206206
connection.accept(ps);
207207
} catch (Throwable ex) {
208+
Exceptions.throwIfFatal(ex);
208209
if (doConnect) {
209210
ps.shouldConnect.compareAndSet(true, false);
210211
}

src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableScalarXMap.java

+1
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ public void subscribeActual(Observer<? super R> observer) {
140140
try {
141141
other = Objects.requireNonNull(mapper.apply(value), "The mapper returned a null ObservableSource");
142142
} catch (Throwable e) {
143+
Exceptions.throwIfFatal(e);
143144
EmptyDisposable.error(e, observer);
144145
return;
145146
}

src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableWindowBoundarySelector.java

+1
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ void drain() {
237237
try {
238238
endSource = Objects.requireNonNull(closingIndicator.apply(startItem), "The closingIndicator returned a null ObservableSource");
239239
} catch (Throwable ex) {
240+
Exceptions.throwIfFatal(ex);
240241
upstream.dispose();
241242
startObserver.dispose();
242243
resources.dispose();

src/main/java/io/reactivex/rxjava3/internal/schedulers/InstantPeriodicTask.java

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.concurrent.atomic.AtomicReference;
2121

2222
import io.reactivex.rxjava3.disposables.Disposable;
23+
import io.reactivex.rxjava3.exceptions.Exceptions;
2324
import io.reactivex.rxjava3.internal.functions.Functions;
2425
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
2526

@@ -56,6 +57,7 @@ public Void call() throws Exception {
5657
setRest(executor.submit(this));
5758
runner = null;
5859
} catch (Throwable ex) {
60+
Exceptions.throwIfFatal(ex);
5961
runner = null;
6062
RxJavaPlugins.onError(ex);
6163
}

src/main/java/io/reactivex/rxjava3/internal/schedulers/ScheduledDirectPeriodicTask.java

+2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package io.reactivex.rxjava3.internal.schedulers;
1818

19+
import io.reactivex.rxjava3.exceptions.Exceptions;
1920
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
2021

2122
/**
@@ -38,6 +39,7 @@ public void run() {
3839
runnable.run();
3940
runner = null;
4041
} catch (Throwable ex) {
42+
Exceptions.throwIfFatal(ex);
4143
runner = null;
4244
lazySet(FINISHED);
4345
RxJavaPlugins.onError(ex);

src/main/java/io/reactivex/rxjava3/internal/schedulers/SchedulerPoolFactory.java

+3
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.concurrent.*;
2121
import java.util.concurrent.atomic.AtomicReference;
2222

23+
import io.reactivex.rxjava3.exceptions.Exceptions;
2324
import io.reactivex.rxjava3.functions.Function;
2425

2526
/**
@@ -108,6 +109,7 @@ static int getIntProperty(boolean enabled, String key, int defaultNotFound, int
108109
}
109110
return Integer.parseInt(value);
110111
} catch (Throwable ex) {
112+
Exceptions.throwIfFatal(ex);
111113
return defaultNotFound;
112114
}
113115
}
@@ -123,6 +125,7 @@ static boolean getBooleanProperty(boolean enabled, String key, boolean defaultNo
123125
}
124126
return "true".equals(value);
125127
} catch (Throwable ex) {
128+
Exceptions.throwIfFatal(ex);
126129
return defaultNotFound;
127130
}
128131
}

src/test/java/io/reactivex/rxjava3/disposables/DisposableTest.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -213,17 +213,20 @@ public void fromAutoCloseableThrows() throws Throwable {
213213

214214
assertTrue(errors.isEmpty());
215215

216-
d.dispose();
216+
try {
217+
d.dispose();
218+
fail("Should have thrown!");
219+
} catch (TestException expected) {
220+
// expected
221+
}
217222

218223
assertTrue(d.isDisposed());
219-
assertEquals(1, errors.size());
220224

221225
d.dispose();
222226

223227
assertTrue(d.isDisposed());
224-
assertEquals(1, errors.size());
225228

226-
TestHelper.assertUndeliverable(errors, 0, TestException.class);
229+
assertTrue(errors.isEmpty());
227230
});
228231
}
229232

0 commit comments

Comments
 (0)