From fd69797c451ab91769092781bef2cc015b1c575d Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 6 Dec 2019 15:40:49 +0100 Subject: [PATCH] 3.x: Update observeOn docs with links and +backpressure explanation --- .../io/reactivex/rxjava3/core/Flowable.java | 36 +++++++++++++++---- 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java index 04f98a2aa2..dab6c437d7 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java @@ -11419,8 +11419,16 @@ public final Flowable mergeWith(@NonNull CompletableSource other) { *
This operator honors backpressure from downstream and expects it from the source {@code Publisher}. Violating this * expectation will lead to {@code MissingBackpressureException}. This is the most common operator where the exception * pops up; look for sources up the chain that don't support backpressure, - * such as {@code interval}, {@code timer}, {code PublishSubject} or {@code BehaviorSubject} and apply any - * of the {@code onBackpressureXXX} operators before applying {@code observeOn} itself.
+ * such as {@link #interval(long, TimeUnit)}, {@link #timer(long, TimeUnit)}, + * {@link io.reactivex.rxjava3.processors.PublishProcessor PublishProcessor} or + * {@link io.reactivex.rxjava3.processors.BehaviorProcessor BehaviorProcessor} and apply any + * of the {@code onBackpressureXXX} operators before applying {@code observeOn} itself. + * Note also that request amounts are not preserved between the immediate downstream and the + * immediate upstream. The operator always requests the default {@link #bufferSize()} amount first, then after + * every 75% of that amount delivered, another 75% of this default value. If preserving the request amounts + * is to be preferred over potential excess scheduler infrastructure use, consider applying + * {@link #delay(long, TimeUnit, Scheduler)} with zero time instead. + * *
Scheduler:
*
You specify which {@link Scheduler} this operator will use.
* @@ -11458,8 +11466,16 @@ public final Flowable observeOn(Scheduler scheduler) { *
This operator honors backpressure from downstream and expects it from the source {@code Publisher}. Violating this * expectation will lead to {@code MissingBackpressureException}. This is the most common operator where the exception * pops up; look for sources up the chain that don't support backpressure, - * such as {@code interval}, {@code timer}, {code PublishSubject} or {@code BehaviorSubject} and apply any - * of the {@code onBackpressureXXX} operators before applying {@code observeOn} itself.
+ * such as {@link #interval(long, TimeUnit)}, {@link #timer(long, TimeUnit)}, + * {@link io.reactivex.rxjava3.processors.PublishProcessor PublishProcessor} or + * {@link io.reactivex.rxjava3.processors.BehaviorProcessor BehaviorProcessor} and apply any + * of the {@code onBackpressureXXX} operators before applying {@code observeOn} itself. + * Note also that request amounts are not preserved between the immediate downstream and the + * immediate upstream. The operator always requests the default {@link #bufferSize()} amount first, then after + * every 75% of that amount delivered, another 75% of this default value. If preserving the request amounts + * is to be preferred over potential excess scheduler infrastructure use, consider applying + * {@link #delay(long, TimeUnit, Scheduler, boolean)} with zero time instead. + * *
Scheduler:
*
You specify which {@link Scheduler} this operator will use.
* @@ -11501,8 +11517,16 @@ public final Flowable observeOn(Scheduler scheduler, boolean delayError) { *
This operator honors backpressure from downstream and expects it from the source {@code Publisher}. Violating this * expectation will lead to {@code MissingBackpressureException}. This is the most common operator where the exception * pops up; look for sources up the chain that don't support backpressure, - * such as {@code interval}, {@code timer}, {code PublishSubject} or {@code BehaviorSubject} and apply any - * of the {@code onBackpressureXXX} operators before applying {@code observeOn} itself.
+ * such as {@link #interval(long, TimeUnit)}, {@link #timer(long, TimeUnit)}, + * {@link io.reactivex.rxjava3.processors.PublishProcessor PublishProcessor} or + * {@link io.reactivex.rxjava3.processors.BehaviorProcessor BehaviorProcessor} and apply any + * of the {@code onBackpressureXXX} operators before applying {@code observeOn} itself. + * Note also that request amounts are not preserved between the immediate downstream and the + * immediate upstream. The operator always requests the specified {@code bufferSize} amount first, then after + * every 75% of that amount delivered, another 75% of this specified value. If preserving the request amounts + * is to be preferred over potential excess scheduler infrastructure use, consider applying + * {@link #delay(long, TimeUnit, Scheduler, boolean)} with zero time instead. + * *
Scheduler:
*
You specify which {@link Scheduler} this operator will use.
*