Skip to content

Commit 03f8106

Browse files
author
Aaron Tull
committed
Refactoring, renaming, and simplifying
1 parent cf6e21c commit 03f8106

27 files changed

+246
-268
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package rx.dual;
2+
3+
public interface DualConversion<O, T1, T2> {
4+
public O convert(DualOnSubscribe<T1, T2> onSubscribe);
5+
}

src/main/java/rx/dual/DualExtendingOperator.java

-5
This file was deleted.

src/main/java/rx/dual/DualObservable.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,5 @@
44
public interface DualObservable<T1, T2> {
55
public void subscribe(DualSubscriber<T1, T2> subscriber);
66
public <R1, R2> DualObservable<R1, R2> lift(final DualOperator<? extends R1, ? extends R2, ? super T1, ? super T2> operator);
7-
public <R1, R2, O extends DualObservable<R1, R2>> O lift(DualObservableFactory<O, R1, R2, T1, T2> factory, DualOperator<? extends R1, ? extends R2, ? super T1, ? super T2> operator);
8-
public <R1, R2, O extends DualObservable<R1, R2>> O extend(DualExtendingOperator<O, R1, R2, T1, T2> operator);
7+
public <O> O extend(DualConversion<O, T1, T2> operator);
98
}

src/main/java/rx/dual/DualObservableFactory.java

-8
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package rx.dual;
2+
3+
import rx.Observable.OnSubscribe;
4+
5+
public interface MonoToDualConversion<O extends DualObservable<R1, R2>, R1, R2, T> {
6+
public O convert(OnSubscribe<T> onSubscribe);
7+
}

src/main/java/rx/dual/SingleToDualOperator.java renamed to src/main/java/rx/dual/MonoToDualOperator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import rx.Subscriber;
44
import rx.functions.Func1;
55

6-
public interface SingleToDualOperator<R1, R2, T> extends Func1<DualSubscriber<? super R1, ? super R2>, Subscriber<? super T>> {
6+
public interface MonoToDualOperator<R1, R2, T> extends Func1<DualSubscriber<? super R1, ? super R2>, Subscriber<? super T>> {
77
@Override
88
public Subscriber<? super T> call(DualSubscriber<? super R1, ? super R2> subscriber);
99
}

src/main/java/rx/dual/SingleToDualExtendingOperator.java

-7
This file was deleted.

src/main/java/rx/dual/SingleToDualObservableFactory.java

-9
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package rx.simple;
2+
3+
import rx.Observable;
4+
import rx.Observable.OnSubscribe;
5+
import rx.Observable.Operator;
6+
import rx.Subscriber;
7+
import rx.functions.Func1;
8+
import rx.functions.Func2;
9+
import rx.internal.operators.OperatorFilter;
10+
import rx.internal.operators.OperatorMap;
11+
import rx.internal.operators.OperatorScan;
12+
import rx.single.MonoConversion;
13+
import rx.single.MonoObservable;
14+
15+
public class CoreObservable<T> extends SimpleMonoObservable<T> {
16+
17+
private CoreObservable(OnSubscribe<T> onSubscribe) {
18+
super(onSubscribe);
19+
}
20+
21+
public static <T> CoreObservable<T> create(OnSubscribe<T> onSubscribe) {
22+
return new CoreObservable<T>(onSubscribe);
23+
}
24+
25+
@Override
26+
public void subscribe(Subscriber<T> subscriber) {
27+
28+
}
29+
30+
@Override
31+
public <R> CoreObservable<R> lift(Operator<? extends R, ? super T> operator) {
32+
// TODO Auto-generated method stub
33+
return null;
34+
}
35+
36+
@Override
37+
public <R, O> O extend(MonoConversion<O, T> operator) {
38+
// TODO Auto-generated method stub
39+
return null;
40+
}
41+
42+
@Override
43+
public <R> MonoObservable<? extends R> compose(Func1<? super MonoObservable<? super T>, ? extends MonoObservable<? extends R>> composition) {
44+
// TODO Auto-generated method stub
45+
return null;
46+
}
47+
48+
public final <R> CoreObservable<R> map(Func1<? super T, ? extends R> func) {
49+
return lift(new OperatorMap<T, R>(func));
50+
}
51+
52+
public final CoreObservable<T> filter(Func1<? super T, Boolean> predicate) {
53+
return lift(new OperatorFilter<T>(predicate));
54+
}
55+
56+
public final <R> CoreObservable<R> scan(R initialValue, Func2<R, ? super T, R> accumulator) {
57+
return lift(new OperatorScan<R, T>(initialValue, accumulator));
58+
}
59+
60+
public final <R> CoreObservable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
61+
// return map(func).lift(OperatorMerge.<T>instance(false));
62+
// return map(func).extend(new CoreObservableConversion<R, Observable<R>>(OperatorMerge.<T>instance(false)));
63+
return null;
64+
}
65+
66+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package rx.simple;
2+
3+
import rx.Observable.OnSubscribe;
4+
import rx.Observable.Operator;
5+
import rx.Subscriber;
6+
import rx.exceptions.OnErrorNotImplementedException;
7+
import rx.single.MonoConversion;
8+
9+
public class CoreObservableConversion<R, T> implements MonoConversion<CoreObservable<R>, T> {
10+
11+
private Operator<? extends R, ? super T> operator;
12+
13+
public CoreObservableConversion(Operator<? extends R, ? super T> operator) {
14+
this.operator = operator;
15+
}
16+
17+
@Override
18+
public CoreObservable<R> convert(OnSubscribe<T> onSubscribe) {
19+
return CoreObservable.create(wrapSubscriber(onSubscribe, operator));
20+
}
21+
22+
protected OnSubscribe<R> wrapSubscriber(OnSubscribe<T> onSubscribe, Operator<? extends R, ? super T> operator) {
23+
return (Subscriber<? super R> o) -> {
24+
try {
25+
Subscriber<? super T> st = operator.call(o);
26+
try {
27+
// new Subscriber created and being subscribed with so 'onStart' it
28+
st.onStart();
29+
onSubscribe.call(st);
30+
} catch (Throwable e) {
31+
// localized capture of errors rather than it skipping all operators
32+
// and ending up in the try/catch of the subscribe method which then
33+
// prevents onErrorResumeNext and other similar approaches to error handling
34+
if (e instanceof OnErrorNotImplementedException) {
35+
throw e;
36+
}
37+
st.onError(e);
38+
}
39+
} catch (Throwable e) {
40+
if (e instanceof OnErrorNotImplementedException) {
41+
throw e;
42+
}
43+
// if the lift function failed all we can do is pass the error to the final Subscriber
44+
// as we don't have the operator available to us
45+
o.onError(e);
46+
}
47+
};
48+
}
49+
50+
}

src/main/java/rx/simple/SimpleDualExtendingOperator.java renamed to src/main/java/rx/simple/SimpleDualConversion.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
11
package rx.simple;
22

3-
import rx.dual.DualExtendingOperator;
3+
import rx.dual.DualConversion;
44
import rx.dual.DualOnSubscribe;
55
import rx.dual.DualOperator;
66
import rx.dual.DualSubscriber;
77
import rx.exceptions.OnErrorNotImplementedException;
88

9-
public class SimpleDualExtendingOperator<R1, R2, T1, T2> implements DualExtendingOperator<SimpleDualObservable<R1, R2>, R1, R2, T1, T2> {
9+
public class SimpleDualConversion<R1, R2, T1, T2> implements DualConversion<SimpleDualObservable<R1, R2>, T1, T2> {
1010

11-
private DualOperator<R1, R2, T1, T2> operator;
11+
private DualOperator<? extends R1, ? extends R2, ? super T1, ? super T2> operator;
1212

13-
public SimpleDualExtendingOperator(DualOperator<R1, R2, T1, T2> operator) {
13+
public SimpleDualConversion(DualOperator<? extends R1, ? extends R2, ? super T1, ? super T2> operator) {
1414
this.operator = operator;
1515
}
1616

1717
@Override
18-
public SimpleDualObservable<R1, R2> compose(DualOnSubscribe<T1, T2> onSubscribe) {
18+
public SimpleDualObservable<R1, R2> convert(DualOnSubscribe<T1, T2> onSubscribe) {
1919
return SimpleDualObservable.create(wrapSubscriber(onSubscribe));
2020
}
2121

src/main/java/rx/simple/SimpleDualObservable.java

+5-11
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
package rx.simple;
22

3-
import rx.dual.DualExtendingOperator;
3+
import rx.dual.DualConversion;
44
import rx.dual.DualObservable;
5-
import rx.dual.DualObservableFactory;
65
import rx.dual.DualOnSubscribe;
76
import rx.dual.DualOperator;
87
import rx.dual.DualSubscriber;
@@ -26,17 +25,12 @@ public void subscribe(DualSubscriber<T1, T2> subscriber) {
2625

2726
@Override
2827
public <R1, R2> DualObservable<R1, R2> lift(DualOperator<? extends R1, ? extends R2, ? super T1, ? super T2> operator) {
29-
return lift(new SimpleDualObservableFactory<R1, R2, T1, T2>(), operator);
28+
return extend(new SimpleDualConversion<R1, R2, T1, T2>(operator));
3029
}
31-
32-
@Override
33-
public <R1, R2, O extends DualObservable<R1, R2>> O lift(DualObservableFactory<O, R1, R2, T1, T2> factory, DualOperator<? extends R1, ? extends R2, ? super T1, ? super T2> operator) {
34-
return factory.call(operator, onSubscribe);
35-
}
36-
30+
3731
@Override
38-
public <R1, R2, O extends DualObservable<R1, R2>> O extend(DualExtendingOperator<O, R1, R2, T1, T2> operator) {
39-
return operator.compose(onSubscribe);
32+
public <O> O extend(DualConversion<O, T1, T2> operator) {
33+
return operator.convert(onSubscribe);
4034
}
4135

4236
}

src/main/java/rx/simple/SimpleDualObservableFactory.java

-42
This file was deleted.

src/main/java/rx/simple/SimpleExtendingOperator.java renamed to src/main/java/rx/simple/SimpleMonoConversion.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,19 @@
44
import rx.Observable.Operator;
55
import rx.Subscriber;
66
import rx.exceptions.OnErrorNotImplementedException;
7-
import rx.single.SingleExtendingOperator;
7+
import rx.single.MonoConversion;
88

9-
public class SimpleExtendingOperator<R, T> implements SingleExtendingOperator<SimpleSingleObservable<R>, R, T> {
9+
public class SimpleMonoConversion<R, T> implements MonoConversion<SimpleMonoObservable<R>, T> {
1010

11-
private Operator<R, T> operator;
11+
private Operator<? extends R, ? super T> operator;
1212

13-
public SimpleExtendingOperator(Operator<R, T> operator) {
13+
public SimpleMonoConversion(Operator<? extends R, ? super T> operator) {
1414
this.operator = operator;
1515
}
1616

1717
@Override
18-
public SimpleSingleObservable<R> compose(OnSubscribe<T> onSubscribe) {
19-
return SimpleSingleObservable.create(wrapSubscriber(onSubscribe));
18+
public SimpleMonoObservable<R> convert(OnSubscribe<T> onSubscribe) {
19+
return SimpleMonoObservable.create(wrapSubscriber(onSubscribe));
2020
}
2121

2222
private OnSubscribe<R> wrapSubscriber(OnSubscribe<T> onSubscribe) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package rx.simple;
2+
3+
import rx.Observable.OnSubscribe;
4+
import rx.Observable.Operator;
5+
import rx.Subscriber;
6+
import rx.functions.Func1;
7+
import rx.single.MonoConversion;
8+
import rx.single.MonoObservable;
9+
10+
public class SimpleMonoObservable<T> implements MonoObservable<T> {
11+
12+
private OnSubscribe<T> onSubscribe;
13+
14+
public static <T> SimpleMonoObservable<T> create(OnSubscribe<T> onSubscribe) {
15+
return new SimpleMonoObservable<T>(onSubscribe);
16+
}
17+
18+
protected SimpleMonoObservable(OnSubscribe<T> onSubscribe) {
19+
this.onSubscribe = onSubscribe;
20+
}
21+
22+
@Override
23+
public void subscribe(Subscriber<T> subscriber) {
24+
onSubscribe.call(subscriber);
25+
}
26+
27+
@Override
28+
public <R> SimpleMonoObservable<R> lift(Operator<? extends R, ? super T> operator) {
29+
return extend(new SimpleMonoConversion<R, T>(operator));
30+
}
31+
32+
public <R, O> O extend(MonoConversion<O, T> operator) {
33+
return operator.convert(onSubscribe);
34+
}
35+
36+
@Override
37+
public <R> MonoObservable<? extends R> compose(Func1<? super MonoObservable<? super T>, ? extends MonoObservable<? extends R>> transformer) {
38+
return transformer.call(this);
39+
}
40+
41+
}

src/main/java/rx/simple/SimpleSingleToDualExtendingOperator.java renamed to src/main/java/rx/simple/SimpleMonoToDualConversion.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,19 @@
44
import rx.Subscriber;
55
import rx.dual.DualOnSubscribe;
66
import rx.dual.DualSubscriber;
7-
import rx.dual.SingleToDualExtendingOperator;
8-
import rx.dual.SingleToDualOperator;
7+
import rx.dual.MonoToDualOperator;
98
import rx.exceptions.OnErrorNotImplementedException;
9+
import rx.single.MonoConversion;
1010

11-
public class SimpleSingleToDualExtendingOperator<R1, R2, T> implements SingleToDualExtendingOperator<SimpleDualObservable<R1, R2>, R1, R2, T> {
12-
private SingleToDualOperator<R1, R2, T> operator;
11+
public class SimpleMonoToDualConversion<R1, R2, T> implements MonoConversion<SimpleDualObservable<R1, R2>, T> {
12+
private MonoToDualOperator<R1, R2, T> operator;
1313

14-
public SimpleSingleToDualExtendingOperator(SingleToDualOperator<R1, R2, T> operator) {
14+
public SimpleMonoToDualConversion(MonoToDualOperator<R1, R2, T> operator) {
1515
this.operator = operator;
1616
}
1717

1818
@Override
19-
public SimpleDualObservable<R1, R2> compose(OnSubscribe<T> onSubscribe) {
19+
public SimpleDualObservable<R1, R2> convert(OnSubscribe<T> onSubscribe) {
2020
return SimpleDualObservable.create(wrapSubscriber(onSubscribe));
2121
}
2222

0 commit comments

Comments
 (0)