Skip to content

Commit 543ea1a

Browse files
akarnokdkojilin
authored andcommitted
2.x: Add concatMap with Scheduler guaranteeing where the mapper runs (ReactiveX#6538)
Back port `Add concatMap with Scheduler guaranteeing where the mapper runs` to 2.x branch. see ReactiveX#6538
1 parent 0df3285 commit 543ea1a

File tree

7 files changed

+3406
-2
lines changed

7 files changed

+3406
-2
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 110 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7254,6 +7254,10 @@ public final <R> Flowable<R> compose(FlowableTransformer<? super T, ? extends R>
72547254
* that result from concatenating those resulting Publishers.
72557255
* <p>
72567256
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
7257+
* <p>
7258+
* Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread,
7259+
* on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure
7260+
* the {@code mapper} function is confined to a known thread, use the {@link #concatMap(Function, int, Scheduler)} overload.
72577261
* <dl>
72587262
* <dt><b>Backpressure:</b></dt>
72597263
* <dd>The operator honors backpressure from downstream. Both this and the inner {@code Publisher}s are
@@ -7286,6 +7290,10 @@ public final <R> Flowable<R> concatMap(Function<? super T, ? extends Publisher<?
72867290
* that result from concatenating those resulting Publishers.
72877291
* <p>
72887292
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
7293+
* <p>
7294+
* Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread,
7295+
* on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure
7296+
* the {@code mapper} function is confined to a known thread, use the {@link #concatMap(Function, int, Scheduler)} overload.
72897297
* <dl>
72907298
* <dt><b>Backpressure:</b></dt>
72917299
* <dd>The operator honors backpressure from downstream. Both this and the inner {@code Publisher}s are
@@ -7306,6 +7314,7 @@ public final <R> Flowable<R> concatMap(Function<? super T, ? extends Publisher<?
73067314
* @return a Flowable that emits the result of applying the transformation function to each item emitted
73077315
* by the source Publisher and concatenating the Publishers obtained from this transformation
73087316
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
7317+
* @see #concatMap(Function, int, Scheduler)
73097318
*/
73107319
@CheckReturnValue
73117320
@NonNull
@@ -7325,6 +7334,52 @@ public final <R> Flowable<R> concatMap(Function<? super T, ? extends Publisher<?
73257334
return RxJavaPlugins.onAssembly(new FlowableConcatMap<T, R>(this, mapper, prefetch, ErrorMode.IMMEDIATE));
73267335
}
73277336

7337+
/**
7338+
* Returns a new Flowable that emits items resulting from applying a function (on a designated scheduler)
7339+
* that you supply to each item emitted by the source Publisher, where that function returns a Publisher, and then emitting the items
7340+
* that result from concatenating those resulting Publishers.
7341+
* <p>
7342+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/concatMap.png" alt="">
7343+
* <p>
7344+
* The difference between {@link #concatMap(Function, int)} and this operator is that this operator guarantees the {@code mapper}
7345+
* function is executed on the specified scheduler.
7346+
* <dl>
7347+
* <dt><b>Backpressure:</b></dt>
7348+
* <dd>The operator honors backpressure from downstream. Both this and the inner {@code Publisher}s are
7349+
* expected to honor backpressure as well. If the source {@code Publisher} violates the rule, the operator will
7350+
* signal a {@code MissingBackpressureException}. If any of the inner {@code Publisher}s doesn't honor
7351+
* backpressure, that <em>may</em> throw an {@code IllegalStateException} when that
7352+
* {@code Publisher} completes.</dd>
7353+
* <dt><b>Scheduler:</b></dt>
7354+
* <dd>{@code concatMap} executes the given {@code mapper} function on the provided {@link Scheduler}.</dd>
7355+
* </dl>
7356+
*
7357+
* @param <R> the type of the inner Publisher sources and thus the output type
7358+
* @param mapper
7359+
* a function that, when applied to an item emitted by the source Publisher, returns a
7360+
* Publisher
7361+
* @param prefetch
7362+
* the number of elements to prefetch from the current Flowable
7363+
* @param scheduler
7364+
* the scheduler where the {@code mapper} function will be executed
7365+
* @return a Flowable that emits the result of applying the transformation function to each item emitted
7366+
* by the source Publisher and concatenating the Publishers obtained from this transformation
7367+
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
7368+
* @since 3.0.0
7369+
* @see #concatMap(Function, int)
7370+
* @see #concatMapDelayError(Function, int, boolean, Scheduler)
7371+
*/
7372+
@CheckReturnValue
7373+
@NonNull
7374+
@BackpressureSupport(BackpressureKind.FULL)
7375+
@SchedulerSupport(SchedulerSupport.CUSTOM)
7376+
public final <R> Flowable<R> concatMap(Function<? super T, ? extends Publisher<? extends R>> mapper, int prefetch, Scheduler scheduler) {
7377+
ObjectHelper.requireNonNull(mapper, "mapper is null");
7378+
ObjectHelper.verifyPositive(prefetch, "prefetch");
7379+
ObjectHelper.requireNonNull(scheduler, "scheduler");
7380+
return RxJavaPlugins.onAssembly(new FlowableConcatMapScheduler<T, R>(this, mapper, prefetch, ErrorMode.IMMEDIATE, scheduler));
7381+
}
7382+
73287383
/**
73297384
* Maps the upstream items into {@link CompletableSource}s and subscribes to them one after the
73307385
* other completes.
@@ -7494,7 +7549,10 @@ public final Completable concatMapCompletableDelayError(Function<? super T, ? ex
74947549
* one at a time and emits their values in order
74957550
* while delaying any error from either this or any of the inner Publishers
74967551
* till all of them terminate.
7497-
*
7552+
* <p>
7553+
* Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread,
7554+
* on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure
7555+
* the {@code mapper} function is confined to a known thread, use the {@link #concatMapDelayError(Function, int, boolean, Scheduler)} overload.
74987556
* <dl>
74997557
* <dt><b>Backpressure:</b></dt>
75007558
* <dd>The operator honors backpressure from downstream. Both this and the inner {@code Publisher}s are
@@ -7509,6 +7567,7 @@ public final Completable concatMapCompletableDelayError(Function<? super T, ? ex
75097567
* @param <R> the result value type
75107568
* @param mapper the function that maps the items of this Publisher into the inner Publishers.
75117569
* @return the new Publisher instance with the concatenation behavior
7570+
* @see #concatMapDelayError(Function, int, boolean, Scheduler)
75127571
*/
75137572
@CheckReturnValue
75147573
@BackpressureSupport(BackpressureKind.FULL)
@@ -7522,6 +7581,10 @@ public final <R> Flowable<R> concatMapDelayError(Function<? super T, ? extends P
75227581
* one at a time and emits their values in order
75237582
* while delaying any error from either this or any of the inner Publishers
75247583
* till all of them terminate.
7584+
* <p>
7585+
* Note that there is no guarantee where the given {@code mapper} function will be executed; it could be on the subscribing thread,
7586+
* on the upstream thread signaling the new item to be mapped or on the thread where the inner source terminates. To ensure
7587+
* the {@code mapper} function is confined to a known thread, use the {@link #concatMapDelayError(Function, int, boolean, Scheduler)} overload.
75257588
*
75267589
* <dl>
75277590
* <dt><b>Backpressure:</b></dt>
@@ -7542,6 +7605,7 @@ public final <R> Flowable<R> concatMapDelayError(Function<? super T, ? extends P
75427605
* if true, all errors from the outer and inner Publisher sources are delayed until the end,
75437606
* if false, an error from the main source is signaled when the current Publisher source terminates
75447607
* @return the new Publisher instance with the concatenation behavior
7608+
* @see #concatMapDelayError(Function, int, boolean, Scheduler)
75457609
*/
75467610
@CheckReturnValue
75477611
@NonNull
@@ -7562,6 +7626,51 @@ public final <R> Flowable<R> concatMapDelayError(Function<? super T, ? extends P
75627626
return RxJavaPlugins.onAssembly(new FlowableConcatMap<T, R>(this, mapper, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY));
75637627
}
75647628

7629+
/**
7630+
* Maps each of the upstream items into a Publisher, subscribes to them one after the other,
7631+
* one at a time and emits their values in order
7632+
* while executing the mapper function on the designated scheduler, delaying any error from either this or any of the
7633+
* inner Publishers till all of them terminate.
7634+
* <p>
7635+
* The difference between {@link #concatMapDelayError(Function, int, boolean)} and this operator is that this operator guarantees the {@code mapper}
7636+
* function is executed on the specified scheduler.
7637+
*
7638+
* <dl>
7639+
* <dt><b>Backpressure:</b></dt>
7640+
* <dd>The operator honors backpressure from downstream. Both this and the inner {@code Publisher}s are
7641+
* expected to honor backpressure as well. If the source {@code Publisher} violates the rule, the operator will
7642+
* signal a {@code MissingBackpressureException}. If any of the inner {@code Publisher}s doesn't honor
7643+
* backpressure, that <em>may</em> throw an {@code IllegalStateException} when that
7644+
* {@code Publisher} completes.</dd>
7645+
* <dt><b>Scheduler:</b></dt>
7646+
* <dd>{@code concatMapDelayError} executes the given {@code mapper} function on the provided {@link Scheduler}.</dd>
7647+
* </dl>
7648+
*
7649+
* @param <R> the result value type
7650+
* @param mapper the function that maps the items of this Publisher into the inner Publishers.
7651+
* @param prefetch
7652+
* the number of elements to prefetch from the current Flowable
7653+
* @param tillTheEnd
7654+
* if true, all errors from the outer and inner Publisher sources are delayed until the end,
7655+
* if false, an error from the main source is signaled when the current Publisher source terminates
7656+
* @param scheduler
7657+
* the scheduler where the {@code mapper} function will be executed
7658+
* @return the new Publisher instance with the concatenation behavior
7659+
* @see #concatMapDelayError(Function, int, boolean)
7660+
* @since 3.0.0
7661+
*/
7662+
@CheckReturnValue
7663+
@NonNull
7664+
@BackpressureSupport(BackpressureKind.FULL)
7665+
@SchedulerSupport(SchedulerSupport.CUSTOM)
7666+
public final <R> Flowable<R> concatMapDelayError(Function<? super T, ? extends Publisher<? extends R>> mapper,
7667+
int prefetch, boolean tillTheEnd, Scheduler scheduler) {
7668+
ObjectHelper.requireNonNull(mapper, "mapper is null");
7669+
ObjectHelper.verifyPositive(prefetch, "prefetch");
7670+
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
7671+
return RxJavaPlugins.onAssembly(new FlowableConcatMapScheduler<T, R>(this, mapper, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY, scheduler));
7672+
}
7673+
75657674
/**
75667675
* Maps a sequence of values into Publishers and concatenates these Publishers eagerly into a single
75677676
* Publisher.

0 commit comments

Comments
 (0)