Skip to content

Commit 53b2a8d

Browse files
author
Aaron Tull
committed
Refactoring and renaming. Removed Dual observable.
1 parent 03f8106 commit 53b2a8d

17 files changed

+33
-306
lines changed

src/main/java/rx/dual/DualConversion.java

-5
This file was deleted.

src/main/java/rx/dual/DualObservable.java

-8
This file was deleted.

src/main/java/rx/dual/DualObserver.java

-10
This file was deleted.

src/main/java/rx/dual/DualOnSubscribe.java

-8
This file was deleted.

src/main/java/rx/dual/DualOperator.java

-8
This file was deleted.

src/main/java/rx/dual/DualSubscriber.java

-31
This file was deleted.

src/main/java/rx/dual/MonoToDualConversion.java

-7
This file was deleted.

src/main/java/rx/dual/MonoToDualOperator.java

-9
This file was deleted.

src/main/java/rx/simple/CoreObservable.java

+13-18
Original file line numberDiff line numberDiff line change
@@ -8,41 +8,31 @@
88
import rx.functions.Func2;
99
import rx.internal.operators.OperatorFilter;
1010
import rx.internal.operators.OperatorMap;
11+
import rx.internal.operators.OperatorMerge;
1112
import rx.internal.operators.OperatorScan;
1213
import rx.single.MonoConversion;
13-
import rx.single.MonoObservable;
1414

15-
public class CoreObservable<T> extends SimpleMonoObservable<T> {
15+
public class CoreObservable<T> {
16+
protected OnSubscribe<T> onSubscribe;
1617

1718
private CoreObservable(OnSubscribe<T> onSubscribe) {
18-
super(onSubscribe);
19+
this.onSubscribe = onSubscribe;
1920
}
2021

2122
public static <T> CoreObservable<T> create(OnSubscribe<T> onSubscribe) {
2223
return new CoreObservable<T>(onSubscribe);
2324
}
2425

25-
@Override
2626
public void subscribe(Subscriber<T> subscriber) {
27-
27+
onSubscribe.call(subscriber);
2828
}
2929

30-
@Override
3130
public <R> CoreObservable<R> lift(Operator<? extends R, ? super T> operator) {
32-
// TODO Auto-generated method stub
33-
return null;
31+
return extend(new CoreObservableConversion<R, T>(operator));
3432
}
3533

36-
@Override
3734
public <R, O> O extend(MonoConversion<O, T> operator) {
38-
// TODO Auto-generated method stub
39-
return null;
40-
}
41-
42-
@Override
43-
public <R> MonoObservable<? extends R> compose(Func1<? super MonoObservable<? super T>, ? extends MonoObservable<? extends R>> composition) {
44-
// TODO Auto-generated method stub
45-
return null;
35+
return operator.convert(onSubscribe);
4636
}
4737

4838
public final <R> CoreObservable<R> map(Func1<? super T, ? extends R> func) {
@@ -59,7 +49,12 @@ public final <R> CoreObservable<R> scan(R initialValue, Func2<R, ? super T, R> a
5949

6050
public final <R> CoreObservable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
6151
// return map(func).lift(OperatorMerge.<T>instance(false));
62-
// return map(func).extend(new CoreObservableConversion<R, Observable<R>>(OperatorMerge.<T>instance(false)));
52+
53+
// return map(func).extend(new MonoConversion<CoreObservable<R>, R>(){
54+
//
55+
// @Override
56+
// public CoreObservable<R> convert(OnSubscribe<R> onSubscribe) {
57+
// }});
6358
return null;
6459
}
6560

src/main/java/rx/simple/CoreObservableConversion.java

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import rx.Observable.OnSubscribe;
44
import rx.Observable.Operator;
5+
import rx.Observable;
56
import rx.Subscriber;
67
import rx.exceptions.OnErrorNotImplementedException;
78
import rx.single.MonoConversion;

src/main/java/rx/simple/SimpleDualConversion.java

-51
This file was deleted.

src/main/java/rx/simple/SimpleDualObservable.java

-36
This file was deleted.

src/main/java/rx/simple/SimpleMonoObservable.java

+3-7
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,10 @@
55
import rx.Subscriber;
66
import rx.functions.Func1;
77
import rx.single.MonoConversion;
8-
import rx.single.MonoObservable;
98

10-
public class SimpleMonoObservable<T> implements MonoObservable<T> {
9+
public class SimpleMonoObservable<T> {
1110

12-
private OnSubscribe<T> onSubscribe;
11+
protected OnSubscribe<T> onSubscribe;
1312

1413
public static <T> SimpleMonoObservable<T> create(OnSubscribe<T> onSubscribe) {
1514
return new SimpleMonoObservable<T>(onSubscribe);
@@ -19,12 +18,10 @@ protected SimpleMonoObservable(OnSubscribe<T> onSubscribe) {
1918
this.onSubscribe = onSubscribe;
2019
}
2120

22-
@Override
2321
public void subscribe(Subscriber<T> subscriber) {
2422
onSubscribe.call(subscriber);
2523
}
2624

27-
@Override
2825
public <R> SimpleMonoObservable<R> lift(Operator<? extends R, ? super T> operator) {
2926
return extend(new SimpleMonoConversion<R, T>(operator));
3027
}
@@ -33,8 +30,7 @@ public <R, O> O extend(MonoConversion<O, T> operator) {
3330
return operator.convert(onSubscribe);
3431
}
3532

36-
@Override
37-
public <R> MonoObservable<? extends R> compose(Func1<? super MonoObservable<? super T>, ? extends MonoObservable<? extends R>> transformer) {
33+
public <R> SimpleMonoObservable<? extends R> compose(Func1<SimpleMonoObservable<? super T>, SimpleMonoObservable<? extends R>> transformer) {
3834
return transformer.call(this);
3935
}
4036

src/main/java/rx/simple/SimpleMonoToDualConversion.java

-51
This file was deleted.

src/main/java/rx/simple/operator/ConversionOperators.java

+10-11
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,20 @@
11
package rx.simple.operator;
22

3+
import rx.Observable.Operator;
34
import rx.Subscriber;
4-
import rx.dual.DualSubscriber;
5-
import rx.dual.MonoToDualOperator;
65
import rx.functions.Func1;
7-
import rx.simple.SimpleDualObservable;
8-
import rx.simple.SimpleMonoToDualConversion;
6+
import rx.simple.CoreObservable;
7+
import rx.simple.CoreObservableConversion;
98
import rx.single.MonoConversion;
109

1110
public class ConversionOperators {
1211

13-
public static <T1, R1> MonoConversion<SimpleDualObservable<T1, R1>, T1> generate(Func1<? super T1, ? extends R1> generatorFunc) {
14-
return new SimpleMonoToDualConversion<T1, R1, T1>(new MonoToDualOperator<T1, R1, T1>() {
12+
public static <T, R> MonoConversion<CoreObservable<R>, T> mapToCore(Func1<? super T, ? extends R> mapFunc) {
13+
return new CoreObservableConversion<R, T>(new Operator<R, T>() {
1514

1615
@Override
17-
public Subscriber<? super T1> call(DualSubscriber<? super T1, ? super R1> subscriber) {
18-
return new Subscriber<T1>() {
16+
public Subscriber<? super T> call(Subscriber<? super R> subscriber) {
17+
return new Subscriber<T>() {
1918
@Override
2019
public void onCompleted() {
2120
subscriber.onCompleted();
@@ -27,10 +26,10 @@ public void onError(Throwable e) {
2726
}
2827

2928
@Override
30-
public void onNext(T1 t0) {
29+
public void onNext(T t) {
3130
try {
32-
R1 t1 = generatorFunc.call(t0);
33-
subscriber.onNext(t0, t1);
31+
R r = mapFunc.call(t);
32+
subscriber.onNext(r);
3433
} catch (Throwable e) {
3534
subscriber.onError(e);
3635
}

0 commit comments

Comments
 (0)