Skip to content

Commit ea8ab3b

Browse files
authored
3.x: Add Maybe.dematerialize (#6871)
1 parent 53ba435 commit ea8ab3b

File tree

4 files changed

+272
-1
lines changed

4 files changed

+272
-1
lines changed

Diff for: src/main/java/io/reactivex/rxjava3/core/Maybe.java

+41
Original file line numberDiff line numberDiff line change
@@ -2893,6 +2893,47 @@ public final Single<T> defaultIfEmpty(@NonNull T defaultItem) {
28932893
return RxJavaPlugins.onAssembly(new MaybeToSingle<>(this, defaultItem));
28942894
}
28952895

2896+
/**
2897+
* Maps the {@link Notification} success value of the current {@code Maybe} back into normal
2898+
* {@code onSuccess}, {@code onError} or {@code onComplete} signals.
2899+
* <p>
2900+
* <img width="640" height="268" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.dematerialize.png" alt="">
2901+
* <p>
2902+
* The intended use of the {@code selector} function is to perform a
2903+
* type-safe identity mapping (see example) on a source that is already of type
2904+
* {@code Notification<T>}. The Java language doesn't allow
2905+
* limiting instance methods to a certain generic argument shape, therefore,
2906+
* a function is used to ensure the conversion remains type safe.
2907+
* <p>
2908+
* Regular {@code onError} or {@code onComplete} signals from the current {@code Maybe} are passed along to the downstream.
2909+
* <dl>
2910+
* <dt><b>Scheduler:</b></dt>
2911+
* <dd>{@code dematerialize} does not operate by default on a particular {@link Scheduler}.</dd>
2912+
* </dl>
2913+
* <p>
2914+
* Example:
2915+
* <pre><code>
2916+
* Maybe.just(Notification.createOnNext(1))
2917+
* .dematerialize(notification -&gt; notification)
2918+
* .test()
2919+
* .assertResult(1);
2920+
* </code></pre>
2921+
* @param <R> the result type
2922+
* @param selector the function called with the success item and should
2923+
* return a {@code Notification} instance.
2924+
* @return the new {@code Maybe} instance
2925+
* @throws NullPointerException if {@code selector} is {@code null}
2926+
* @since 3.0.0
2927+
* @see #materialize()
2928+
*/
2929+
@CheckReturnValue
2930+
@NonNull
2931+
@SchedulerSupport(SchedulerSupport.NONE)
2932+
public final <@NonNull R> Maybe<R> dematerialize(@NonNull Function<? super T, @NonNull Notification<R>> selector) {
2933+
Objects.requireNonNull(selector, "selector is null");
2934+
return RxJavaPlugins.onAssembly(new MaybeDematerialize<>(this, selector));
2935+
}
2936+
28962937
/**
28972938
* Returns a {@code Maybe} that signals the events emitted by the current {@code Maybe} shifted forward in time by a
28982939
* specified delay.

Diff for: src/main/java/io/reactivex/rxjava3/core/Single.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -2511,7 +2511,7 @@ public final Single<T> delaySubscription(long time, @NonNull TimeUnit unit, @Non
25112511
}
25122512

25132513
/**
2514-
* Maps the {@link Notification} success value of this {@code Single} back into normal
2514+
* Maps the {@link Notification} success value of the current {@code Single} back into normal
25152515
* {@code onSuccess}, {@code onError} or {@code onComplete} signals as a
25162516
* {@link Maybe} source.
25172517
* <p>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava3.internal.operators.maybe;
15+
16+
import io.reactivex.rxjava3.core.*;
17+
import io.reactivex.rxjava3.disposables.Disposable;
18+
import io.reactivex.rxjava3.exceptions.Exceptions;
19+
import io.reactivex.rxjava3.functions.Function;
20+
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
21+
22+
import java.util.Objects;
23+
24+
/**
25+
* Maps the success value of the source to a Notification, then
26+
* maps it back to the corresponding signal type.
27+
* <p>History: 2.2.4 - experimental
28+
* @param <T> the element type of the source
29+
* @param <R> the element type of the Notification and result
30+
* @since 3.0.0
31+
*/
32+
public final class MaybeDematerialize<T, R> extends AbstractMaybeWithUpstream<T, R> {
33+
34+
final Function<? super T, Notification<R>> selector;
35+
36+
public MaybeDematerialize(Maybe<T> source, Function<? super T, Notification<R>> selector) {
37+
super(source);
38+
this.selector = selector;
39+
}
40+
41+
@Override
42+
protected void subscribeActual(MaybeObserver<? super R> observer) {
43+
source.subscribe(new DematerializeObserver<>(observer, selector));
44+
}
45+
46+
static final class DematerializeObserver<T, R> implements MaybeObserver<T>, Disposable {
47+
48+
final MaybeObserver<? super R> downstream;
49+
50+
final Function<? super T, Notification<R>> selector;
51+
52+
Disposable upstream;
53+
54+
DematerializeObserver(MaybeObserver<? super R> downstream,
55+
Function<? super T, Notification<R>> selector) {
56+
this.downstream = downstream;
57+
this.selector = selector;
58+
}
59+
60+
@Override
61+
public void dispose() {
62+
upstream.dispose();
63+
}
64+
65+
@Override
66+
public boolean isDisposed() {
67+
return upstream.isDisposed();
68+
}
69+
70+
@Override
71+
public void onSubscribe(Disposable d) {
72+
if (DisposableHelper.validate(upstream, d)) {
73+
upstream = d;
74+
downstream.onSubscribe(this);
75+
}
76+
}
77+
78+
@Override
79+
public void onSuccess(T t) {
80+
Notification<R> notification;
81+
82+
try {
83+
notification = Objects.requireNonNull(selector.apply(t), "The selector returned a null Notification");
84+
} catch (Throwable ex) {
85+
Exceptions.throwIfFatal(ex);
86+
downstream.onError(ex);
87+
return;
88+
}
89+
if (notification.isOnNext()) {
90+
downstream.onSuccess(notification.getValue());
91+
} else if (notification.isOnComplete()) {
92+
downstream.onComplete();
93+
} else {
94+
downstream.onError(notification.getError());
95+
}
96+
}
97+
98+
@Override
99+
public void onError(Throwable e) {
100+
downstream.onError(e);
101+
}
102+
103+
@Override
104+
public void onComplete() {
105+
downstream.onComplete();
106+
}
107+
}
108+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava3.internal.operators.maybe;
15+
16+
import static org.mockito.Mockito.*;
17+
import org.junit.Test;
18+
19+
import io.reactivex.rxjava3.core.*;
20+
import io.reactivex.rxjava3.exceptions.TestException;
21+
import io.reactivex.rxjava3.functions.Function;
22+
import io.reactivex.rxjava3.internal.functions.Functions;
23+
import io.reactivex.rxjava3.subjects.MaybeSubject;
24+
import io.reactivex.rxjava3.testsupport.TestHelper;
25+
26+
public class MaybeDematerializeTest extends RxJavaTest {
27+
28+
@Test
29+
public void success() {
30+
Maybe.just(Notification.createOnNext(1))
31+
.dematerialize(Functions.<Notification<Integer>>identity())
32+
.test()
33+
.assertResult(1);
34+
}
35+
36+
@Test
37+
public void empty() {
38+
Maybe.just(Notification.<Integer>createOnComplete())
39+
.dematerialize(Functions.<Notification<Integer>>identity())
40+
.test()
41+
.assertResult();
42+
}
43+
44+
@Test
45+
public void emptySource() throws Throwable {
46+
@SuppressWarnings("unchecked")
47+
Function<Notification<Integer>, Notification<Integer>> function = mock(Function.class);
48+
49+
Maybe.<Notification<Integer>>empty()
50+
.dematerialize(function)
51+
.test()
52+
.assertResult();
53+
54+
verify(function, never()).apply(any());
55+
}
56+
57+
@Test
58+
public void error() {
59+
Maybe.<Notification<Integer>>error(new TestException())
60+
.dematerialize(Functions.<Notification<Integer>>identity())
61+
.test()
62+
.assertFailure(TestException.class);
63+
}
64+
65+
@Test
66+
public void errorNotification() {
67+
Maybe.just(Notification.<Integer>createOnError(new TestException()))
68+
.dematerialize(Functions.<Notification<Integer>>identity())
69+
.test()
70+
.assertFailure(TestException.class);
71+
}
72+
73+
@Test
74+
public void doubleOnSubscribe() {
75+
TestHelper.checkDoubleOnSubscribeMaybe(new Function<Maybe<Object>, MaybeSource<Object>>() {
76+
@SuppressWarnings({ "unchecked", "rawtypes" })
77+
@Override
78+
public MaybeSource<Object> apply(Maybe<Object> v) throws Exception {
79+
return v.dematerialize((Function)Functions.identity());
80+
}
81+
});
82+
}
83+
84+
@Test
85+
public void dispose() {
86+
TestHelper.checkDisposed(MaybeSubject.<Notification<Integer>>create().dematerialize(Functions.<Notification<Integer>>identity()));
87+
}
88+
89+
@Test
90+
public void selectorCrash() {
91+
Maybe.just(Notification.createOnNext(1))
92+
.dematerialize(new Function<Notification<Integer>, Notification<Integer>>() {
93+
@Override
94+
public Notification<Integer> apply(Notification<Integer> v) throws Exception {
95+
throw new TestException();
96+
}
97+
})
98+
.test()
99+
.assertFailure(TestException.class);
100+
}
101+
102+
@Test
103+
public void selectorNull() {
104+
Maybe.just(Notification.createOnNext(1))
105+
.dematerialize(Functions.justFunction((Notification<Integer>)null))
106+
.test()
107+
.assertFailure(NullPointerException.class);
108+
}
109+
110+
@Test
111+
public void selectorDifferentType() {
112+
Maybe.just(Notification.createOnNext(1))
113+
.dematerialize(new Function<Notification<Integer>, Notification<String>>() {
114+
@Override
115+
public Notification<String> apply(Notification<Integer> v) throws Exception {
116+
return Notification.createOnNext("Value-" + 1);
117+
}
118+
})
119+
.test()
120+
.assertResult("Value-1");
121+
}
122+
}

0 commit comments

Comments
 (0)