Skip to content

Commit 88ff1f6

Browse files
authored
2.x: flatMap{Completable, Maybe, Single} operators (#4667)
* 2.x: flatMapCompletable operator * Jacoco to ignore TCK * Add remaining flatMap{Single, Maybe} to Flowable/Observable * Fix active counting race condition
1 parent 23a77e8 commit 88ff1f6

18 files changed

+4176
-2
lines changed

Diff for: build.gradle

+7
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,13 @@ jacocoTestReport {
120120
xml.enabled = true
121121
html.enabled = true
122122
}
123+
124+
afterEvaluate {
125+
classDirectories = files(classDirectories.files.collect {
126+
fileTree(dir: it,
127+
exclude: ['io/reactivex/tck/**'])
128+
})
129+
}
123130
}
124131

125132
build.dependsOn jacocoTestReport

Diff for: src/main/java/io/reactivex/Flowable.java

+134-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import io.reactivex.internal.functions.*;
2626
import io.reactivex.internal.fuseable.ScalarCallable;
2727
import io.reactivex.internal.operators.flowable.*;
28-
import io.reactivex.internal.operators.observable.ObservableFromPublisher;
28+
import io.reactivex.internal.operators.observable.*;
2929
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
3030
import io.reactivex.internal.subscribers.*;
3131
import io.reactivex.internal.util.*;
@@ -8266,6 +8266,49 @@ public final <U, R> Flowable<R> flatMap(Function<? super T, ? extends Publisher<
82668266
return flatMap(mapper, combiner, false, maxConcurrency, bufferSize());
82678267
}
82688268

8269+
/**
8270+
* Maps each element of the upstream Flowable into CompletableSources, subscribes to them and
8271+
* waits until the upstream and all CompletableSources complete.
8272+
* <dl>
8273+
* <dt><b>Backpressure:</b></dt>
8274+
* <dd>The operator consumes the upstream in an unbounded manner.</dd>
8275+
* <dt><b>Scheduler:</b></dt>
8276+
* <dd>{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
8277+
* </dl>
8278+
* @param mapper the function that received each source value and transforms them into CompletableSources.
8279+
* @return the new Completable instance
8280+
*/
8281+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
8282+
@SchedulerSupport(SchedulerSupport.NONE)
8283+
public final Completable flatMapCompletable(Function<? super T, ? extends CompletableSource> mapper) {
8284+
return flatMapCompletable(mapper, false, Integer.MAX_VALUE);
8285+
}
8286+
8287+
/**
8288+
* Maps each element of the upstream Flowable into CompletableSources, subscribes to them and
8289+
* waits until the upstream and all CompletableSources complete, optionally delaying all errors.
8290+
* <dl>
8291+
* <dt><b>Backpressure:</b></dt>
8292+
* <dd>If {@code maxConcurrency == Integer.MAX_VALUE} the operator consumes the upstream in an unbounded manner.
8293+
* Otherwise the operator expects the upstream to honor backpressure. If the upstream doesn't support backpressure
8294+
* the operator behaves as if {@code maxConcurrency == Integer.MAX_VALUE} was used.</dd>
8295+
* <dt><b>Scheduler:</b></dt>
8296+
* <dd>{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
8297+
* </dl>
8298+
* @param mapper the function that received each source value and transforms them into CompletableSources.
8299+
* @param delayErrors if true errors from the upstream and inner CompletableSources are delayed until each of them
8300+
* terminates.
8301+
* @param maxConcurrency the maximum number of active subscriptions to the CompletableSources.
8302+
* @return the new Completable instance
8303+
*/
8304+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
8305+
@SchedulerSupport(SchedulerSupport.NONE)
8306+
public final Completable flatMapCompletable(Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors, int maxConcurrency) {
8307+
ObjectHelper.requireNonNull(mapper, "mapper is null");
8308+
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
8309+
return RxJavaPlugins.onAssembly(new FlowableFlatMapCompletableCompletable<T>(this, mapper, delayErrors, maxConcurrency));
8310+
}
8311+
82698312
/**
82708313
* Returns a Flowable that merges each item emitted by the source Publisher with the values in an
82718314
* Iterable corresponding to that item that is generated by a selector.
@@ -8405,6 +8448,96 @@ public final <U, V> Flowable<V> flatMapIterable(final Function<? super T, ? exte
84058448
return flatMap(FlowableInternalHelper.flatMapIntoIterable(mapper), resultSelector, false, bufferSize(), prefetch);
84068449
}
84078450

8451+
/**
8452+
* Maps each element of the upstream Flowable into MaybeSources, subscribes to them and
8453+
* waits until the upstream and all MaybeSources complete.
8454+
* <dl>
8455+
* <dt><b>Backpressure:</b></dt>
8456+
* <dd>The operator consumes the upstream in an unbounded manner.</dd>
8457+
* <dt><b>Scheduler:</b></dt>
8458+
* <dd>{@code flatMapMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
8459+
* </dl>
8460+
* @param <R> the result value type
8461+
* @param mapper the function that received each source value and transforms them into MaybeSources.
8462+
* @return the new Flowable instance
8463+
*/
8464+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
8465+
@SchedulerSupport(SchedulerSupport.NONE)
8466+
public final <R> Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper) {
8467+
return flatMapMaybe(mapper, false, Integer.MAX_VALUE);
8468+
}
8469+
8470+
/**
8471+
* Maps each element of the upstream Flowable into MaybeSources, subscribes to them and
8472+
* waits until the upstream and all MaybeSources complete, optionally delaying all errors.
8473+
* <dl>
8474+
* <dt><b>Backpressure:</b></dt>
8475+
* <dd>If {@code maxConcurrency == Integer.MAX_VALUE} the operator consumes the upstream in an unbounded manner.
8476+
* Otherwise the operator expects the upstream to honor backpressure. If the upstream doesn't support backpressure
8477+
* the operator behaves as if {@code maxConcurrency == Integer.MAX_VALUE} was used.</dd>
8478+
* <dt><b>Scheduler:</b></dt>
8479+
* <dd>{@code flatMapMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
8480+
* </dl>
8481+
* @param <R> the result value type
8482+
* @param mapper the function that received each source value and transforms them into MaybeSources.
8483+
* @param delayErrors if true errors from the upstream and inner MaybeSources are delayed until each of them
8484+
* terminates.
8485+
* @param maxConcurrency the maximum number of active subscriptions to the MaybeSources.
8486+
* @return the new Flowable instance
8487+
*/
8488+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
8489+
@SchedulerSupport(SchedulerSupport.NONE)
8490+
public final <R> Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
8491+
ObjectHelper.requireNonNull(mapper, "mapper is null");
8492+
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
8493+
return RxJavaPlugins.onAssembly(new FlowableFlatMapMaybe<T, R>(this, mapper, delayErrors, maxConcurrency));
8494+
}
8495+
8496+
/**
8497+
* Maps each element of the upstream Flowable into SingleSources, subscribes to them and
8498+
* waits until the upstream and all SingleSources complete.
8499+
* <dl>
8500+
* <dt><b>Backpressure:</b></dt>
8501+
* <dd>The operator consumes the upstream in an unbounded manner.</dd>
8502+
* <dt><b>Scheduler:</b></dt>
8503+
* <dd>{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
8504+
* </dl>
8505+
* @param <R> the result value type
8506+
* @param mapper the function that received each source value and transforms them into SingleSources.
8507+
* @return the new Flowable instance
8508+
*/
8509+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
8510+
@SchedulerSupport(SchedulerSupport.NONE)
8511+
public final <R> Flowable<R> flatMapSingle(Function<? super T, ? extends SingleSource<? extends R>> mapper) {
8512+
return flatMapSingle(mapper, false, Integer.MAX_VALUE);
8513+
}
8514+
8515+
/**
8516+
* Maps each element of the upstream Flowable into SingleSources, subscribes to them and
8517+
* waits until the upstream and all SingleSources complete, optionally delaying all errors.
8518+
* <dl>
8519+
* <dt><b>Backpressure:</b></dt>
8520+
* <dd>If {@code maxConcurrency == Integer.MAX_VALUE} the operator consumes the upstream in an unbounded manner.
8521+
* Otherwise the operator expects the upstream to honor backpressure. If the upstream doesn't support backpressure
8522+
* the operator behaves as if {@code maxConcurrency == Integer.MAX_VALUE} was used.</dd>
8523+
* <dt><b>Scheduler:</b></dt>
8524+
* <dd>{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
8525+
* </dl>
8526+
* @param <R> the result value type
8527+
* @param mapper the function that received each source value and transforms them into SingleSources.
8528+
* @param delayErrors if true errors from the upstream and inner SingleSources are delayed until each of them
8529+
* terminates.
8530+
* @param maxConcurrency the maximum number of active subscriptions to the SingleSources.
8531+
* @return the new Flowable instance
8532+
*/
8533+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
8534+
@SchedulerSupport(SchedulerSupport.NONE)
8535+
public final <R> Flowable<R> flatMapSingle(Function<? super T, ? extends SingleSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
8536+
ObjectHelper.requireNonNull(mapper, "mapper is null");
8537+
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
8538+
return RxJavaPlugins.onAssembly(new FlowableFlatMapSingle<T, R>(this, mapper, delayErrors, maxConcurrency));
8539+
}
8540+
84088541
/**
84098542
* Subscribes to the {@link Publisher} and receives notifications for each element.
84108543
* <p>

Diff for: src/main/java/io/reactivex/Observable.java

+103
Original file line numberDiff line numberDiff line change
@@ -7166,6 +7166,39 @@ public final <U, R> Observable<R> flatMap(Function<? super T, ? extends Observab
71667166
return flatMap(mapper, combiner, false, maxConcurrency, bufferSize());
71677167
}
71687168

7169+
/**
7170+
* Maps each element of the upstream Observable into CompletableSources, subscribes to them and
7171+
* waits until the upstream and all CompletableSources complete.
7172+
* <dl>
7173+
* <dt><b>Scheduler:</b></dt>
7174+
* <dd>{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
7175+
* </dl>
7176+
* @param mapper the function that received each source value and transforms them into CompletableSources.
7177+
* @return the new Completable instance
7178+
*/
7179+
@SchedulerSupport(SchedulerSupport.NONE)
7180+
public final Completable flatMapCompletable(Function<? super T, ? extends CompletableSource> mapper) {
7181+
return flatMapCompletable(mapper, false);
7182+
}
7183+
7184+
/**
7185+
* Maps each element of the upstream Observable into CompletableSources, subscribes to them and
7186+
* waits until the upstream and all CompletableSources complete, optionally delaying all errors.
7187+
* <dl>
7188+
* <dt><b>Scheduler:</b></dt>
7189+
* <dd>{@code flatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
7190+
* </dl>
7191+
* @param mapper the function that received each source value and transforms them into CompletableSources.
7192+
* @param delayErrors if true errors from the upstream and inner CompletableSources are delayed until each of them
7193+
* terminates.
7194+
* @return the new Completable instance
7195+
*/
7196+
@SchedulerSupport(SchedulerSupport.NONE)
7197+
public final Completable flatMapCompletable(Function<? super T, ? extends CompletableSource> mapper, boolean delayErrors) {
7198+
ObjectHelper.requireNonNull(mapper, "mapper is null");
7199+
return RxJavaPlugins.onAssembly(new ObservableFlatMapCompletableCompletable<T>(this, mapper, delayErrors));
7200+
}
7201+
71697202
/**
71707203
* Returns an Observable that merges each item emitted by the source ObservableSource with the values in an
71717204
* Iterable corresponding to that item that is generated by a selector.
@@ -7247,6 +7280,76 @@ public final <U> Observable<U> flatMapIterable(final Function<? super T, ? exten
72477280
return flatMap(ObservableInternalHelper.flatMapIntoIterable(mapper), false, bufferSize);
72487281
}
72497282

7283+
/**
7284+
* Maps each element of the upstream Observable into MaybeSources, subscribes to them and
7285+
* waits until the upstream and all MaybeSources complete.
7286+
* <dl>
7287+
* <dt><b>Scheduler:</b></dt>
7288+
* <dd>{@code flatMapMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
7289+
* </dl>
7290+
* @param <R> the result value type
7291+
* @param mapper the function that received each source value and transforms them into MaybeSources.
7292+
* @return the new Observable instance
7293+
*/
7294+
@SchedulerSupport(SchedulerSupport.NONE)
7295+
public final <R> Observable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper) {
7296+
return flatMapMaybe(mapper, false);
7297+
}
7298+
7299+
/**
7300+
* Maps each element of the upstream Observable into MaybeSources, subscribes to them and
7301+
* waits until the upstream and all MaybeSources complete, optionally delaying all errors.
7302+
* <dl>
7303+
* <dt><b>Scheduler:</b></dt>
7304+
* <dd>{@code flatMapMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
7305+
* </dl>
7306+
* @param <R> the result value type
7307+
* @param mapper the function that received each source value and transforms them into MaybeSources.
7308+
* @param delayErrors if true errors from the upstream and inner MaybeSources are delayed until each of them
7309+
* terminates.
7310+
* @return the new Observable instance
7311+
*/
7312+
@SchedulerSupport(SchedulerSupport.NONE)
7313+
public final <R> Observable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper, boolean delayErrors) {
7314+
ObjectHelper.requireNonNull(mapper, "mapper is null");
7315+
return RxJavaPlugins.onAssembly(new ObservableFlatMapMaybe<T, R>(this, mapper, delayErrors));
7316+
}
7317+
7318+
/**
7319+
* Maps each element of the upstream Observable into SingleSources, subscribes to them and
7320+
* waits until the upstream and all SingleSources complete.
7321+
* <dl>
7322+
* <dt><b>Scheduler:</b></dt>
7323+
* <dd>{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
7324+
* </dl>
7325+
* @param <R> the result value type
7326+
* @param mapper the function that received each source value and transforms them into SingleSources.
7327+
* @return the new Observable instance
7328+
*/
7329+
@SchedulerSupport(SchedulerSupport.NONE)
7330+
public final <R> Observable<R> flatMapSingle(Function<? super T, ? extends SingleSource<? extends R>> mapper) {
7331+
return flatMapSingle(mapper, false);
7332+
}
7333+
7334+
/**
7335+
* Maps each element of the upstream Observable into SingleSources, subscribes to them and
7336+
* waits until the upstream and all SingleSources complete, optionally delaying all errors.
7337+
* <dl>
7338+
* <dt><b>Scheduler:</b></dt>
7339+
* <dd>{@code flatMapSingle} does not operate by default on a particular {@link Scheduler}.</dd>
7340+
* </dl>
7341+
* @param <R> the result value type
7342+
* @param mapper the function that received each source value and transforms them into SingleSources.
7343+
* @param delayErrors if true errors from the upstream and inner SingleSources are delayed until each of them
7344+
* terminates.
7345+
* @return the new Observable instance
7346+
*/
7347+
@SchedulerSupport(SchedulerSupport.NONE)
7348+
public final <R> Observable<R> flatMapSingle(Function<? super T, ? extends SingleSource<? extends R>> mapper, boolean delayErrors) {
7349+
ObjectHelper.requireNonNull(mapper, "mapper is null");
7350+
return RxJavaPlugins.onAssembly(new ObservableFlatMapSingle<T, R>(this, mapper, delayErrors));
7351+
}
7352+
72507353
/**
72517354
* Subscribes to the {@link ObservableSource} and receives notifications for each element.
72527355
* <p>

0 commit comments

Comments
 (0)