Skip to content

Commit 583c4e7

Browse files
authored
3.x: Various small API changes and removals (#6517)
1 parent 40087e9 commit 583c4e7

20 files changed

+30
-415
lines changed

Diff for: src/jmh/java/io/reactivex/BlockingGetPerf.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public Object maybe() {
7878
}
7979

8080
@Benchmark
81-
public Object completable() {
82-
return completable.blockingGet();
81+
public void completable() {
82+
completable.blockingAwait();
8383
}
8484
}

Diff for: src/main/java/io/reactivex/Completable.java

-46
Original file line numberDiff line numberDiff line change
@@ -1232,52 +1232,6 @@ public final boolean blockingAwait(long timeout, TimeUnit unit) {
12321232
return observer.blockingAwait(timeout, unit);
12331233
}
12341234

1235-
/**
1236-
* Subscribes to this Completable instance and blocks until it terminates, then returns null or
1237-
* the emitted exception if any.
1238-
* <p>
1239-
* <img width="640" height="435" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingGet.png" alt="">
1240-
* <dl>
1241-
* <dt><b>Scheduler:</b></dt>
1242-
* <dd>{@code blockingGet} does not operate by default on a particular {@link Scheduler}.</dd>
1243-
* </dl>
1244-
* @return the throwable if this terminated with an error, null otherwise
1245-
* @throws RuntimeException that wraps an InterruptedException if the wait is interrupted
1246-
*/
1247-
@Nullable
1248-
@CheckReturnValue
1249-
@SchedulerSupport(SchedulerSupport.NONE)
1250-
public final Throwable blockingGet() {
1251-
BlockingMultiObserver<Void> observer = new BlockingMultiObserver<Void>();
1252-
subscribe(observer);
1253-
return observer.blockingGetError();
1254-
}
1255-
1256-
/**
1257-
* Subscribes to this Completable instance and blocks until it terminates or the specified timeout
1258-
* elapses, then returns null for normal termination or the emitted exception if any.
1259-
* <p>
1260-
* <img width="640" height="348" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.blockingGet.t.png" alt="">
1261-
* <dl>
1262-
* <dt><b>Scheduler:</b></dt>
1263-
* <dd>{@code blockingGet} does not operate by default on a particular {@link Scheduler}.</dd>
1264-
* </dl>
1265-
* @param timeout the timeout value
1266-
* @param unit the time unit
1267-
* @return the throwable if this terminated with an error, null otherwise
1268-
* @throws RuntimeException that wraps an InterruptedException if the wait is interrupted or
1269-
* TimeoutException if the specified timeout elapsed before it
1270-
*/
1271-
@Nullable
1272-
@CheckReturnValue
1273-
@SchedulerSupport(SchedulerSupport.NONE)
1274-
public final Throwable blockingGet(long timeout, TimeUnit unit) {
1275-
ObjectHelper.requireNonNull(unit, "unit is null");
1276-
BlockingMultiObserver<Void> observer = new BlockingMultiObserver<Void>();
1277-
subscribe(observer);
1278-
return observer.blockingGetError(timeout, unit);
1279-
}
1280-
12811235
/**
12821236
* Subscribes to this Completable only once, when the first CompletableObserver
12831237
* subscribes to the result Completable, caches its terminal event

Diff for: src/main/java/io/reactivex/Flowable.java

+4-46
Original file line numberDiff line numberDiff line change
@@ -14686,8 +14686,7 @@ public final Flowable<T> startWithArray(T... items) {
1468614686
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
1468714687
@SchedulerSupport(SchedulerSupport.NONE)
1468814688
public final Disposable subscribe() {
14689-
return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING,
14690-
Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
14689+
return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
1469114690
}
1469214691

1469314692
/**
@@ -14716,8 +14715,7 @@ public final Disposable subscribe() {
1471614715
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
1471714716
@SchedulerSupport(SchedulerSupport.NONE)
1471814717
public final Disposable subscribe(Consumer<? super T> onNext) {
14719-
return subscribe(onNext, Functions.ON_ERROR_MISSING,
14720-
Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
14718+
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
1472114719
}
1472214720

1472314721
/**
@@ -14747,7 +14745,7 @@ public final Disposable subscribe(Consumer<? super T> onNext) {
1474714745
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
1474814746
@SchedulerSupport(SchedulerSupport.NONE)
1474914747
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
14750-
return subscribe(onNext, onError, Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
14748+
return subscribe(onNext, onError, Functions.EMPTY_ACTION);
1475114749
}
1475214750

1475314751
/**
@@ -14782,51 +14780,11 @@ public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super T
1478214780
@SchedulerSupport(SchedulerSupport.NONE)
1478314781
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
1478414782
Action onComplete) {
14785-
return subscribe(onNext, onError, onComplete, FlowableInternalHelper.RequestMax.INSTANCE);
14786-
}
14787-
14788-
/**
14789-
* Subscribes to a Publisher and provides callbacks to handle the items it emits and any error or
14790-
* completion notification it issues.
14791-
* <dl>
14792-
* <dt><b>Backpressure:</b></dt>
14793-
* <dd>The operator consumes the source {@code Publisher} in an unbounded manner (i.e., no
14794-
* backpressure is applied to it).</dd>
14795-
* <dt><b>Scheduler:</b></dt>
14796-
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
14797-
* </dl>
14798-
*
14799-
* @param onNext
14800-
* the {@code Consumer<T>} you have designed to accept emissions from the Publisher
14801-
* @param onError
14802-
* the {@code Consumer<Throwable>} you have designed to accept any error notification from the
14803-
* Publisher
14804-
* @param onComplete
14805-
* the {@code Action} you have designed to accept a completion notification from the
14806-
* Publisher
14807-
* @param onSubscribe
14808-
* the {@code Consumer} that receives the upstream's Subscription
14809-
* @return a {@link Disposable} reference with which the caller can stop receiving items before
14810-
* the Publisher has finished sending them
14811-
* @throws NullPointerException
14812-
* if {@code onNext} is null, or
14813-
* if {@code onError} is null, or
14814-
* if {@code onComplete} is null, or
14815-
* if {@code onSubscribe} is null
14816-
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
14817-
*/
14818-
@CheckReturnValue
14819-
@NonNull
14820-
@BackpressureSupport(BackpressureKind.SPECIAL)
14821-
@SchedulerSupport(SchedulerSupport.NONE)
14822-
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
14823-
Action onComplete, Consumer<? super Subscription> onSubscribe) {
1482414783
ObjectHelper.requireNonNull(onNext, "onNext is null");
1482514784
ObjectHelper.requireNonNull(onError, "onError is null");
1482614785
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
14827-
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
1482814786

14829-
LambdaSubscriber<T> ls = new LambdaSubscriber<T>(onNext, onError, onComplete, onSubscribe);
14787+
LambdaSubscriber<T> ls = new LambdaSubscriber<T>(onNext, onError, onComplete, FlowableInternalHelper.RequestMax.INSTANCE);
1483014788

1483114789
subscribe(ls);
1483214790

Diff for: src/main/java/io/reactivex/Maybe.java

+5-28
Original file line numberDiff line numberDiff line change
@@ -2489,13 +2489,9 @@ public final Single<Long> count() {
24892489
}
24902490

24912491
/**
2492-
* Returns a Maybe that emits the item emitted by the source Maybe or a specified default item
2492+
* Returns a Single that emits the item emitted by the source Maybe or a specified default item
24932493
* if the source Maybe is empty.
24942494
* <p>
2495-
* Note that the result Maybe is semantically equivalent to a {@code Single}, since it's guaranteed
2496-
* to emit exactly one item or an error. See {@link #toSingle(Object)} for a method with equivalent
2497-
* behavior which returns a {@code Single}.
2498-
* <p>
24992495
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/defaultIfEmpty.png" alt="">
25002496
* <dl>
25012497
* <dt><b>Scheduler:</b></dt>
@@ -2504,16 +2500,16 @@ public final Single<Long> count() {
25042500
*
25052501
* @param defaultItem
25062502
* the item to emit if the source Maybe emits no items
2507-
* @return a Maybe that emits either the specified default item if the source Maybe emits no
2508-
* items, or the items emitted by the source Maybe
2503+
* @return a Single that emits either the specified default item if the source Maybe emits no
2504+
* item, or the item emitted by the source Maybe
25092505
* @see <a href="http://reactivex.io/documentation/operators/defaultifempty.html">ReactiveX operators documentation: DefaultIfEmpty</a>
25102506
*/
25112507
@CheckReturnValue
25122508
@NonNull
25132509
@SchedulerSupport(SchedulerSupport.NONE)
2514-
public final Maybe<T> defaultIfEmpty(T defaultItem) {
2510+
public final Single<T> defaultIfEmpty(T defaultItem) {
25152511
ObjectHelper.requireNonNull(defaultItem, "defaultItem is null");
2516-
return switchIfEmpty(just(defaultItem));
2512+
return RxJavaPlugins.onAssembly(new MaybeToSingle<T>(this, defaultItem));
25172513
}
25182514

25192515
/**
@@ -3619,25 +3615,6 @@ public final Observable<T> toObservable() {
36193615
return RxJavaPlugins.onAssembly(new MaybeToObservable<T>(this));
36203616
}
36213617

3622-
/**
3623-
* Converts this Maybe into a Single instance composing disposal
3624-
* through and turning an empty Maybe into a Single that emits the given
3625-
* value through onSuccess.
3626-
* <dl>
3627-
* <dt><b>Scheduler:</b></dt>
3628-
* <dd>{@code toSingle} does not operate by default on a particular {@link Scheduler}.</dd>
3629-
* </dl>
3630-
* @param defaultValue the default item to signal in Single if this Maybe is empty
3631-
* @return the new Single instance
3632-
*/
3633-
@CheckReturnValue
3634-
@NonNull
3635-
@SchedulerSupport(SchedulerSupport.NONE)
3636-
public final Single<T> toSingle(T defaultValue) {
3637-
ObjectHelper.requireNonNull(defaultValue, "defaultValue is null");
3638-
return RxJavaPlugins.onAssembly(new MaybeToSingle<T>(this, defaultValue));
3639-
}
3640-
36413618
/**
36423619
* Converts this Maybe into a Single instance composing disposal
36433620
* through and turning an empty Maybe into a signal of NoSuchElementException.

Diff for: src/main/java/io/reactivex/Observable.java

+4-39
Original file line numberDiff line numberDiff line change
@@ -12106,7 +12106,7 @@ public final Observable<T> startWithArray(T... items) {
1210612106
*/
1210712107
@SchedulerSupport(SchedulerSupport.NONE)
1210812108
public final Disposable subscribe() {
12109-
return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
12109+
return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
1211012110
}
1211112111

1211212112
/**
@@ -12131,7 +12131,7 @@ public final Disposable subscribe() {
1213112131
@CheckReturnValue
1213212132
@SchedulerSupport(SchedulerSupport.NONE)
1213312133
public final Disposable subscribe(Consumer<? super T> onNext) {
12134-
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
12134+
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
1213512135
}
1213612136

1213712137
/**
@@ -12157,7 +12157,7 @@ public final Disposable subscribe(Consumer<? super T> onNext) {
1215712157
@CheckReturnValue
1215812158
@SchedulerSupport(SchedulerSupport.NONE)
1215912159
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
12160-
return subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());
12160+
return subscribe(onNext, onError, Functions.EMPTY_ACTION);
1216112161
}
1216212162

1216312163
/**
@@ -12188,46 +12188,11 @@ public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super T
1218812188
@SchedulerSupport(SchedulerSupport.NONE)
1218912189
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
1219012190
Action onComplete) {
12191-
return subscribe(onNext, onError, onComplete, Functions.emptyConsumer());
12192-
}
12193-
12194-
/**
12195-
* Subscribes to an ObservableSource and provides callbacks to handle the items it emits and any error or
12196-
* completion notification it issues.
12197-
* <dl>
12198-
* <dt><b>Scheduler:</b></dt>
12199-
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
12200-
* </dl>
12201-
*
12202-
* @param onNext
12203-
* the {@code Consumer<T>} you have designed to accept emissions from the ObservableSource
12204-
* @param onError
12205-
* the {@code Consumer<Throwable>} you have designed to accept any error notification from the
12206-
* ObservableSource
12207-
* @param onComplete
12208-
* the {@code Action} you have designed to accept a completion notification from the
12209-
* ObservableSource
12210-
* @param onSubscribe
12211-
* the {@code Consumer} that receives the upstream's Disposable
12212-
* @return a {@link Disposable} reference with which the caller can stop receiving items before
12213-
* the ObservableSource has finished sending them
12214-
* @throws NullPointerException
12215-
* if {@code onNext} is null, or
12216-
* if {@code onError} is null, or
12217-
* if {@code onComplete} is null, or
12218-
* if {@code onSubscribe} is null
12219-
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
12220-
*/
12221-
@CheckReturnValue
12222-
@SchedulerSupport(SchedulerSupport.NONE)
12223-
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
12224-
Action onComplete, Consumer<? super Disposable> onSubscribe) {
1222512191
ObjectHelper.requireNonNull(onNext, "onNext is null");
1222612192
ObjectHelper.requireNonNull(onError, "onError is null");
1222712193
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
12228-
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
1222912194

12230-
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
12195+
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, Functions.emptyConsumer());
1223112196

1223212197
subscribe(ls);
1223312198

Diff for: src/main/java/io/reactivex/Single.java

-23
Original file line numberDiff line numberDiff line change
@@ -3850,29 +3850,6 @@ public final <R> R to(@NonNull SingleConverter<T, ? extends R> converter) {
38503850
return ObjectHelper.requireNonNull(converter, "converter is null").apply(this);
38513851
}
38523852

3853-
/**
3854-
* Returns a {@link Completable} that discards result of the {@link Single}
3855-
* and calls {@code onComplete} when this source {@link Single} calls
3856-
* {@code onSuccess}. Error terminal event is propagated.
3857-
* <p>
3858-
* <img width="640" height="436" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.toCompletable.png" alt="">
3859-
* <dl>
3860-
* <dt><b>Scheduler:</b></dt>
3861-
* <dd>{@code toCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
3862-
* </dl>
3863-
*
3864-
* @return a {@link Completable} that calls {@code onComplete} on it's subscriber when the source {@link Single}
3865-
* calls {@code onSuccess}.
3866-
* @since 2.0
3867-
* @deprecated see {@link #ignoreElement()} instead, will be removed in 3.0
3868-
*/
3869-
@CheckReturnValue
3870-
@SchedulerSupport(SchedulerSupport.NONE)
3871-
@Deprecated
3872-
public final Completable toCompletable() {
3873-
return RxJavaPlugins.onAssembly(new CompletableFromSingle<T>(this));
3874-
}
3875-
38763853
/**
38773854
* Returns a {@link Completable} that ignores the success value of this {@link Single}
38783855
* and calls {@code onComplete} instead on the returned {@code Completable}.

Diff for: src/main/java/io/reactivex/internal/observers/BlockingMultiObserver.java

-43
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
import io.reactivex.disposables.Disposable;
2020
import io.reactivex.internal.util.*;
2121

22-
import static io.reactivex.internal.util.ExceptionHelper.timeoutMessage;
23-
2422
/**
2523
* A combined Observer that awaits the success or error signal via a CountDownLatch.
2624
* @param <T> the value type
@@ -119,47 +117,6 @@ public T blockingGet(T defaultValue) {
119117
return v != null ? v : defaultValue;
120118
}
121119

122-
/**
123-
* Block until the latch is counted down and return the error received or null if no
124-
* error happened.
125-
* @return the error received or null
126-
*/
127-
public Throwable blockingGetError() {
128-
if (getCount() != 0) {
129-
try {
130-
BlockingHelper.verifyNonBlocking();
131-
await();
132-
} catch (InterruptedException ex) {
133-
dispose();
134-
return ex;
135-
}
136-
}
137-
return error;
138-
}
139-
140-
/**
141-
* Block until the latch is counted down and return the error received or
142-
* when the wait is interrupted or times out, null otherwise.
143-
* @param timeout the timeout value
144-
* @param unit the time unit
145-
* @return the error received or null
146-
*/
147-
public Throwable blockingGetError(long timeout, TimeUnit unit) {
148-
if (getCount() != 0) {
149-
try {
150-
BlockingHelper.verifyNonBlocking();
151-
if (!await(timeout, unit)) {
152-
dispose();
153-
throw ExceptionHelper.wrapOrThrow(new TimeoutException(timeoutMessage(timeout, unit)));
154-
}
155-
} catch (InterruptedException ex) {
156-
dispose();
157-
throw ExceptionHelper.wrapOrThrow(ex);
158-
}
159-
}
160-
return error;
161-
}
162-
163120
/**
164121
* Block until the observer terminates and return true; return false if
165122
* the wait times out.

0 commit comments

Comments
 (0)