Skip to content

Commit 534fc67

Browse files
vanniktechakarnokd
authored andcommitted
2.x: Add Single.flatMapMaybe (#4617)
1 parent 8335d29 commit 534fc67

File tree

3 files changed

+251
-4
lines changed

3 files changed

+251
-4
lines changed

src/main/java/io/reactivex/Single.java

+26-4
Original file line numberDiff line numberDiff line change
@@ -1817,7 +1817,7 @@ public final Maybe<T> filter(Predicate<? super T> predicate) {
18171817
* @param <R> the result value type
18181818
* @param mapper
18191819
* a function that, when applied to the item emitted by the source Single, returns a SingleSource
1820-
* @return the Single returned from {@code func} when applied to the item emitted by the source Single
1820+
* @return the Single returned from {@code mapper} when applied to the item emitted by the source Single
18211821
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
18221822
*/
18231823
@SchedulerSupport(SchedulerSupport.NONE)
@@ -1826,6 +1826,28 @@ public final <R> Single<R> flatMap(Function<? super T, ? extends SingleSource<?
18261826
return RxJavaPlugins.onAssembly(new SingleFlatMap<T, R>(this, mapper));
18271827
}
18281828

1829+
/**
1830+
* Returns a Maybe that is based on applying a specified function to the item emitted by the source Single,
1831+
* where that function returns a MaybeSource.
1832+
* <p>
1833+
* <img width="640" height="300" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.flatMapMaybe.png" alt="">
1834+
* <dl>
1835+
* <dt><b>Scheduler:</b></dt>
1836+
* <dd>{@code flatMapMaybe} does not operate by default on a particular {@link Scheduler}.</dd>
1837+
* </dl>
1838+
*
1839+
* @param <R> the result value type
1840+
* @param mapper
1841+
* a function that, when applied to the item emitted by the source Single, returns a MaybeSource
1842+
* @return the Maybe returned from {@code mapper} when applied to the item emitted by the source Single
1843+
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
1844+
*/
1845+
@SchedulerSupport(SchedulerSupport.NONE)
1846+
public final <R> Maybe<R> flatMapMaybe(final Function<? super T, ? extends MaybeSource<? extends R>> mapper) {
1847+
ObjectHelper.requireNonNull(mapper, "mapper is null");
1848+
return RxJavaPlugins.onAssembly(new SingleFlatMapMaybe<T, R>(this, mapper));
1849+
}
1850+
18291851
/**
18301852
* Returns a Flowable that emits items based on applying a specified function to the item emitted by the
18311853
* source Single, where that function returns a Publisher.
@@ -1853,7 +1875,7 @@ public final <R> Flowable<R> flatMapPublisher(Function<? super T, ? extends Publ
18531875
}
18541876

18551877
/**
1856-
* Returns a Single that is based on applying a specified function to the item emitted by the source Single,
1878+
* Returns an Observable that is based on applying a specified function to the item emitted by the source Single,
18571879
* where that function returns a SingleSource.
18581880
* <p>
18591881
* <img width="640" height="300" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.flatMap.png" alt="">
@@ -1864,8 +1886,8 @@ public final <R> Flowable<R> flatMapPublisher(Function<? super T, ? extends Publ
18641886
*
18651887
* @param <R> the result value type
18661888
* @param mapper
1867-
* a function that, when applied to the item emitted by the source Single, returns a SingleSource
1868-
* @return the Single returned from {@code func} when applied to the item emitted by the source Single
1889+
* a function that, when applied to the item emitted by the source Single, returns an ObservableSource
1890+
* @return the Observable returned from {@code func} when applied to the item emitted by the source Single
18691891
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
18701892
*/
18711893
@SchedulerSupport(SchedulerSupport.NONE)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.single;
15+
16+
import io.reactivex.Maybe;
17+
import io.reactivex.MaybeObserver;
18+
import io.reactivex.MaybeSource;
19+
import io.reactivex.SingleObserver;
20+
import io.reactivex.SingleSource;
21+
import io.reactivex.disposables.Disposable;
22+
import io.reactivex.exceptions.Exceptions;
23+
import io.reactivex.functions.Function;
24+
import io.reactivex.internal.disposables.DisposableHelper;
25+
import io.reactivex.internal.functions.ObjectHelper;
26+
import java.util.concurrent.atomic.AtomicReference;
27+
28+
public final class SingleFlatMapMaybe<T, R> extends Maybe<R> {
29+
30+
final SingleSource<? extends T> source;
31+
32+
final Function<? super T, ? extends MaybeSource<? extends R>> mapper;
33+
34+
public SingleFlatMapMaybe(SingleSource<? extends T> source, Function<? super T, ? extends MaybeSource<? extends R>> mapper) {
35+
this.mapper = mapper;
36+
this.source = source;
37+
}
38+
39+
@Override
40+
protected void subscribeActual(MaybeObserver<? super R> actual) {
41+
source.subscribe(new FlatMapSingleObserver<T, R>(actual, mapper));
42+
}
43+
44+
static final class FlatMapSingleObserver<T, R>
45+
extends AtomicReference<Disposable>
46+
implements SingleObserver<T>, Disposable {
47+
48+
private static final long serialVersionUID = -5843758257109742742L;
49+
50+
final MaybeObserver<? super R> actual;
51+
52+
final Function<? super T, ? extends MaybeSource<? extends R>> mapper;
53+
54+
FlatMapSingleObserver(MaybeObserver<? super R> actual, Function<? super T, ? extends MaybeSource<? extends R>> mapper) {
55+
this.actual = actual;
56+
this.mapper = mapper;
57+
}
58+
59+
@Override
60+
public void dispose() {
61+
DisposableHelper.dispose(this);
62+
}
63+
64+
@Override
65+
public boolean isDisposed() {
66+
return DisposableHelper.isDisposed(get());
67+
}
68+
69+
@Override
70+
public void onSubscribe(Disposable d) {
71+
if (DisposableHelper.setOnce(this, d)) {
72+
actual.onSubscribe(this);
73+
}
74+
}
75+
76+
@Override
77+
public void onSuccess(T value) {
78+
MaybeSource<? extends R> ms;
79+
80+
try {
81+
ms = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper returned a null MaybeSource");
82+
} catch (Throwable ex) {
83+
Exceptions.throwIfFatal(ex);
84+
onError(ex);
85+
return;
86+
}
87+
88+
ms.subscribe(new FlatMapMaybeObserver<R>(this, actual));
89+
}
90+
91+
@Override
92+
public void onError(Throwable e) {
93+
actual.onError(e);
94+
}
95+
}
96+
97+
static final class FlatMapMaybeObserver<R> implements MaybeObserver<R> {
98+
99+
final AtomicReference<Disposable> parent;
100+
101+
final MaybeObserver<? super R> actual;
102+
103+
FlatMapMaybeObserver(AtomicReference<Disposable> parent, MaybeObserver<? super R> actual) {
104+
this.parent = parent;
105+
this.actual = actual;
106+
}
107+
108+
@Override
109+
public void onSubscribe(final Disposable d) {
110+
DisposableHelper.replace(parent, d);
111+
}
112+
113+
@Override
114+
public void onSuccess(final R value) {
115+
actual.onSuccess(value);
116+
}
117+
118+
@Override
119+
public void onError(final Throwable e) {
120+
actual.onError(e);
121+
}
122+
123+
@Override
124+
public void onComplete() {
125+
actual.onComplete();
126+
}
127+
}
128+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/**
2+
* Copyright 2016 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.internal.operators.single;
15+
16+
import io.reactivex.Maybe;
17+
import io.reactivex.MaybeSource;
18+
import io.reactivex.Single;
19+
import io.reactivex.functions.Function;
20+
import org.junit.Test;
21+
22+
public class SingleFlatMapMaybeTest {
23+
@Test(expected = NullPointerException.class)
24+
public void flatMapMaybeNull() {
25+
Single.just(1)
26+
.flatMapMaybe(null);
27+
}
28+
29+
@Test
30+
public void flatMapMaybeValue() {
31+
Single.just(1).flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
32+
@Override public MaybeSource<Integer> apply(final Integer integer) throws Exception {
33+
if (integer == 1) {
34+
return Maybe.just(2);
35+
}
36+
37+
return Maybe.just(1);
38+
}
39+
})
40+
.test()
41+
.assertResult(2);
42+
}
43+
44+
@Test
45+
public void flatMapMaybeValueDifferentType() {
46+
Single.just(1).flatMapMaybe(new Function<Integer, MaybeSource<String>>() {
47+
@Override public MaybeSource<String> apply(final Integer integer) throws Exception {
48+
if (integer == 1) {
49+
return Maybe.just("2");
50+
}
51+
52+
return Maybe.just("1");
53+
}
54+
})
55+
.test()
56+
.assertResult("2");
57+
}
58+
59+
@Test
60+
public void flatMapMaybeValueNull() {
61+
Single.just(1).flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
62+
@Override public MaybeSource<Integer> apply(final Integer integer) throws Exception {
63+
return null;
64+
}
65+
})
66+
.test()
67+
.assertNoValues()
68+
.assertError(NullPointerException.class)
69+
.assertErrorMessage("The mapper returned a null MaybeSource");
70+
}
71+
72+
@Test
73+
public void flatMapMaybeValueErrorThrown() {
74+
Single.just(1).flatMapMaybe(new Function<Integer, MaybeSource<Integer>>() {
75+
@Override public MaybeSource<Integer> apply(final Integer integer) throws Exception {
76+
throw new RuntimeException("something went terribly wrong!");
77+
}
78+
})
79+
.test()
80+
.assertNoValues()
81+
.assertError(RuntimeException.class)
82+
.assertErrorMessage("something went terribly wrong!");
83+
}
84+
85+
@Test
86+
public void flatMapMaybeError() {
87+
RuntimeException exception = new RuntimeException("test");
88+
89+
Single.error(exception).flatMapMaybe(new Function<Object, MaybeSource<Object>>() {
90+
@Override public MaybeSource<Object> apply(final Object integer) throws Exception {
91+
return Maybe.just(new Object());
92+
}
93+
})
94+
.test()
95+
.assertError(exception);
96+
}
97+
}

0 commit comments

Comments
 (0)