diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java
index 486562b483..948d8aecde 100644
--- a/src/main/java/io/reactivex/Completable.java
+++ b/src/main/java/io/reactivex/Completable.java
@@ -26,7 +26,7 @@
import io.reactivex.internal.operators.completable.*;
import io.reactivex.internal.operators.maybe.*;
import io.reactivex.internal.operators.mixed.*;
-import io.reactivex.internal.operators.single.SingleDelayWithCompletable;
+import io.reactivex.internal.operators.single.*;
import io.reactivex.internal.util.ExceptionHelper;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
@@ -1782,6 +1782,27 @@ public final Completable lift(final CompletableOperator onLift) {
return RxJavaPlugins.onAssembly(new CompletableLift(this, onLift));
}
+ /**
+ * Maps the signal types of this Completable into a {@link Notification} of the same kind
+ * and emits it as a single success value to downstream.
+ *
+ *
+ *
+ *
Scheduler:
+ *
{@code materialize} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the intended target element type of the notification
+ * @return the new Single instance
+ * @since 2.2.4 - experimental
+ * @see Single#dematerialize(Function)
+ */
+ @Experimental
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final Single> materialize() {
+ return RxJavaPlugins.onAssembly(new CompletableMaterialize(this));
+ }
+
/**
* Returns a Completable which subscribes to this and the other Completable and completes
* when both of them complete or one emits an error.
diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java
index 346d35e16a..b1d34260f1 100644
--- a/src/main/java/io/reactivex/Maybe.java
+++ b/src/main/java/io/reactivex/Maybe.java
@@ -3377,6 +3377,26 @@ public final Maybe map(Function super T, ? extends R> mapper) {
return RxJavaPlugins.onAssembly(new MaybeMap(this, mapper));
}
+ /**
+ * Maps the signal types of this Maybe into a {@link Notification} of the same kind
+ * and emits it as a single success value to downstream.
+ *
+ *
+ *
+ *
Scheduler:
+ *
{@code materialize} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @return the new Single instance
+ * @since 2.2.4 - experimental
+ * @see Single#dematerialize(Function)
+ */
+ @Experimental
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final Single> materialize() {
+ return RxJavaPlugins.onAssembly(new MaybeMaterialize(this));
+ }
+
/**
* Flattens this and another Maybe into a single Flowable, without any transformation.
*
diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java
index 8d4013be66..168c9c216a 100644
--- a/src/main/java/io/reactivex/Single.java
+++ b/src/main/java/io/reactivex/Single.java
@@ -2302,6 +2302,43 @@ public final Single delaySubscription(long time, TimeUnit unit, Scheduler sch
return delaySubscription(Observable.timer(time, unit, scheduler));
}
+ /**
+ * Maps the {@link Notification} success value of this Single back into normal
+ * {@code onSuccess}, {@code onError} or {@code onComplete} signals as a
+ * {@link Maybe} source.
+ *
+ * The intended use of the {@code selector} function is to perform a
+ * type-safe identity mapping (see example) on a source that is already of type
+ * {@code Notification}. The Java language doesn't allow
+ * limiting instance methods to a certain generic argument shape, therefore,
+ * a function is used to ensure the conversion remains type safe.
+ *
+ *
Scheduler:
+ *
{@code dematerialize} does not operate by default on a particular {@link Scheduler}.
+ * @param the result type
+ * @param selector the function called with the success item and should
+ * return a {@link Notification} instance.
+ * @return the new Maybe instance
+ * @since 2.2.4 - experimental
+ * @see #materialize()
+ */
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @Experimental
+ public final Maybe dematerialize(Function super T, Notification> selector) {
+ ObjectHelper.requireNonNull(selector, "selector is null");
+ return RxJavaPlugins.onAssembly(new SingleDematerialize(this, selector));
+ }
+
/**
* Calls the specified consumer with the success item after this item has been emitted to the downstream.
*
@@ -2871,6 +2908,26 @@ public final Single map(Function super T, ? extends R> mapper) {
return RxJavaPlugins.onAssembly(new SingleMap(this, mapper));
}
+ /**
+ * Maps the signal types of this Single into a {@link Notification} of the same kind
+ * and emits it as a single success value to downstream.
+ *
+ *
+ *
+ *
Scheduler:
+ *
{@code materialize} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @return the new Single instance
+ * @since 2.2.4 - experimental
+ * @see #dematerialize(Function)
+ */
+ @Experimental
+ @CheckReturnValue
+ @SchedulerSupport(SchedulerSupport.NONE)
+ public final Single> materialize() {
+ return RxJavaPlugins.onAssembly(new SingleMaterialize(this));
+ }
+
/**
* Signals true if the current Single signals a success value that is Object-equals with the value
* provided.
diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableMaterialize.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableMaterialize.java
new file mode 100644
index 0000000000..5eda7f6ae2
--- /dev/null
+++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableMaterialize.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.internal.operators.completable;
+
+import io.reactivex.*;
+import io.reactivex.annotations.Experimental;
+import io.reactivex.internal.operators.mixed.MaterializeSingleObserver;
+
+/**
+ * Turn the signal types of a Completable source into a single Notification of
+ * equal kind.
+ *
+ * @param the element type of the source
+ * @since 2.2.4 - experimental
+ */
+@Experimental
+public final class CompletableMaterialize extends Single> {
+
+ final Completable source;
+
+ public CompletableMaterialize(Completable source) {
+ this.source = source;
+ }
+
+ @Override
+ protected void subscribeActual(SingleObserver super Notification> observer) {
+ source.subscribe(new MaterializeSingleObserver(observer));
+ }
+}
diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeMaterialize.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeMaterialize.java
new file mode 100644
index 0000000000..2b74829ba1
--- /dev/null
+++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeMaterialize.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.internal.operators.maybe;
+
+import io.reactivex.*;
+import io.reactivex.annotations.Experimental;
+import io.reactivex.internal.operators.mixed.MaterializeSingleObserver;
+
+/**
+ * Turn the signal types of a Maybe source into a single Notification of
+ * equal kind.
+ *
+ * @param the element type of the source
+ * @since 2.2.4 - experimental
+ */
+@Experimental
+public final class MaybeMaterialize extends Single> {
+
+ final Maybe source;
+
+ public MaybeMaterialize(Maybe source) {
+ this.source = source;
+ }
+
+ @Override
+ protected void subscribeActual(SingleObserver super Notification> observer) {
+ source.subscribe(new MaterializeSingleObserver(observer));
+ }
+}
diff --git a/src/main/java/io/reactivex/internal/operators/mixed/MaterializeSingleObserver.java b/src/main/java/io/reactivex/internal/operators/mixed/MaterializeSingleObserver.java
new file mode 100644
index 0000000000..ef8a87076d
--- /dev/null
+++ b/src/main/java/io/reactivex/internal/operators/mixed/MaterializeSingleObserver.java
@@ -0,0 +1,71 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.internal.operators.mixed;
+
+import io.reactivex.*;
+import io.reactivex.annotations.Experimental;
+import io.reactivex.disposables.Disposable;
+import io.reactivex.internal.disposables.DisposableHelper;
+
+/**
+ * A consumer that implements the consumer types of Maybe, Single and Completable
+ * and turns their signals into Notifications for a SingleObserver.
+ * @param the element type of the source
+ * @since 2.2.4 - experimental
+ */
+@Experimental
+public final class MaterializeSingleObserver
+implements SingleObserver, MaybeObserver, CompletableObserver, Disposable {
+
+ final SingleObserver super Notification> downstream;
+
+ Disposable upstream;
+
+ public MaterializeSingleObserver(SingleObserver super Notification> downstream) {
+ this.downstream = downstream;
+ }
+
+ @Override
+ public void onSubscribe(Disposable d) {
+ if (DisposableHelper.validate(upstream, d)) {
+ this.upstream = d;
+ downstream.onSubscribe(this);
+ }
+ }
+
+ @Override
+ public void onComplete() {
+ downstream.onSuccess(Notification.createOnComplete());
+ }
+
+ @Override
+ public void onSuccess(T t) {
+ downstream.onSuccess(Notification.createOnNext(t));
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ downstream.onSuccess(Notification.createOnError(e));
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return upstream.isDisposed();
+ }
+
+ @Override
+ public void dispose() {
+ upstream.dispose();
+ }
+}
diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleDematerialize.java b/src/main/java/io/reactivex/internal/operators/single/SingleDematerialize.java
new file mode 100644
index 0000000000..2e402b05da
--- /dev/null
+++ b/src/main/java/io/reactivex/internal/operators/single/SingleDematerialize.java
@@ -0,0 +1,105 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.internal.operators.single;
+
+import io.reactivex.*;
+import io.reactivex.annotations.Experimental;
+import io.reactivex.disposables.Disposable;
+import io.reactivex.exceptions.Exceptions;
+import io.reactivex.functions.Function;
+import io.reactivex.internal.disposables.DisposableHelper;
+import io.reactivex.internal.functions.ObjectHelper;
+
+/**
+ * Maps the success value of the source to a Notification, then
+ * maps it back to the corresponding signal type.
+ * @param the element type of the source
+ * @param the element type of the Notification and result
+ * @since 2.2.4 - experimental
+ */
+@Experimental
+public final class SingleDematerialize extends Maybe {
+
+ final Single source;
+
+ final Function super T, Notification> selector;
+
+ public SingleDematerialize(Single source, Function super T, Notification> selector) {
+ this.source = source;
+ this.selector = selector;
+ }
+
+ @Override
+ protected void subscribeActual(MaybeObserver super R> observer) {
+ source.subscribe(new DematerializeObserver(observer, selector));
+ }
+
+ static final class DematerializeObserver implements SingleObserver, Disposable {
+
+ final MaybeObserver super R> downstream;
+
+ final Function super T, Notification> selector;
+
+ Disposable upstream;
+
+ DematerializeObserver(MaybeObserver super R> downstream,
+ Function super T, Notification> selector) {
+ this.downstream = downstream;
+ this.selector = selector;
+ }
+
+ @Override
+ public void dispose() {
+ upstream.dispose();
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return upstream.isDisposed();
+ }
+
+ @Override
+ public void onSubscribe(Disposable d) {
+ if (DisposableHelper.validate(upstream, d)) {
+ upstream = d;
+ downstream.onSubscribe(this);
+ }
+ }
+
+ @Override
+ public void onSuccess(T t) {
+ Notification notification;
+
+ try {
+ notification = ObjectHelper.requireNonNull(selector.apply(t), "The selector returned a null Notification");
+ } catch (Throwable ex) {
+ Exceptions.throwIfFatal(ex);
+ downstream.onError(ex);
+ return;
+ }
+ if (notification.isOnNext()) {
+ downstream.onSuccess(notification.getValue());
+ } else if (notification.isOnComplete()) {
+ downstream.onComplete();
+ } else {
+ downstream.onError(notification.getError());
+ }
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ downstream.onError(e);
+ }
+ }
+}
diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleMaterialize.java b/src/main/java/io/reactivex/internal/operators/single/SingleMaterialize.java
new file mode 100644
index 0000000000..e22b64865d
--- /dev/null
+++ b/src/main/java/io/reactivex/internal/operators/single/SingleMaterialize.java
@@ -0,0 +1,40 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.internal.operators.single;
+
+import io.reactivex.*;
+import io.reactivex.annotations.Experimental;
+import io.reactivex.internal.operators.mixed.MaterializeSingleObserver;
+
+/**
+ * Turn the signal types of a Single source into a single Notification of
+ * equal kind.
+ *
+ * @param the element type of the source
+ * @since 2.2.4 - experimental
+ */
+@Experimental
+public final class SingleMaterialize extends Single> {
+
+ final Single source;
+
+ public SingleMaterialize(Single source) {
+ this.source = source;
+ }
+
+ @Override
+ protected void subscribeActual(SingleObserver super Notification> observer) {
+ source.subscribe(new MaterializeSingleObserver(observer));
+ }
+}
diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableMaterializeTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableMaterializeTest.java
new file mode 100644
index 0000000000..aec11e5a61
--- /dev/null
+++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableMaterializeTest.java
@@ -0,0 +1,58 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.internal.operators.completable;
+
+import org.junit.Test;
+
+import io.reactivex.*;
+import io.reactivex.exceptions.TestException;
+import io.reactivex.functions.Function;
+import io.reactivex.subjects.CompletableSubject;
+
+public class CompletableMaterializeTest {
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void error() {
+ TestException ex = new TestException();
+ Completable.error(ex)
+ .materialize()
+ .test()
+ .assertResult(Notification.createOnError(ex));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void empty() {
+ Completable.complete()
+ .materialize()
+ .test()
+ .assertResult(Notification.createOnComplete());
+ }
+
+ @Test
+ public void doubleOnSubscribe() {
+ TestHelper.checkDoubleOnSubscribeCompletableToSingle(new Function>>() {
+ @Override
+ public SingleSource> apply(Completable v) throws Exception {
+ return v.materialize();
+ }
+ });
+ }
+
+ @Test
+ public void dispose() {
+ TestHelper.checkDisposed(CompletableSubject.create().materialize());
+ }
+}
diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeMaterializeTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeMaterializeTest.java
new file mode 100644
index 0000000000..f429ecddf2
--- /dev/null
+++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeMaterializeTest.java
@@ -0,0 +1,67 @@
+/**
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.internal.operators.maybe;
+
+import org.junit.Test;
+
+import io.reactivex.*;
+import io.reactivex.exceptions.TestException;
+import io.reactivex.functions.Function;
+import io.reactivex.subjects.MaybeSubject;
+
+public class MaybeMaterializeTest {
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void success() {
+ Maybe.just(1)
+ .materialize()
+ .test()
+ .assertResult(Notification.createOnNext(1));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void error() {
+ TestException ex = new TestException();
+ Maybe.error(ex)
+ .materialize()
+ .test()
+ .assertResult(Notification.createOnError(ex));
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void empty() {
+ Maybe.empty()
+ .materialize()
+ .test()
+ .assertResult(Notification.createOnComplete());
+ }
+
+ @Test
+ public void doubleOnSubscribe() {
+ TestHelper.checkDoubleOnSubscribeMaybeToSingle(new Function, SingleSource>>() {
+ @Override
+ public SingleSource> apply(Maybe