Skip to content

Commit cf6e21c

Browse files
author
Aaron Tull
committed
Added dual map extending operator
1 parent a9ad898 commit cf6e21c

File tree

5 files changed

+65
-8
lines changed

5 files changed

+65
-8
lines changed
+5-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package rx.dual;
22

3-
import rx.Observer;
43

5-
public interface DualObserver<T1, T2> extends Observer<T1> {
4+
public interface DualObserver<T1, T2> {
65
public void onNext(T1 t1, T2 t2);
6+
7+
public void onCompleted();
8+
9+
public void onError(Throwable e);
710
}
+16-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package rx.dual;
22

3-
import rx.Subscriber;
3+
import rx.Subscription;
44

5-
public abstract class DualSubscriber<T1, T2> extends Subscriber<T1> implements DualObserver<T1, T2> {
5+
6+
public abstract class DualSubscriber<T1, T2> implements DualObserver<T1, T2>, Subscription {
67

78
@Override
89
public abstract void onCompleted();
@@ -11,9 +12,20 @@ public abstract class DualSubscriber<T1, T2> extends Subscriber<T1> implements D
1112
public abstract void onError(Throwable e);
1213

1314
@Override
14-
public void onNext(T1 t) {}
15+
public abstract void onNext(T1 t1, T2 t2);
1516

17+
public void onStart() {
18+
19+
}
20+
1621
@Override
17-
public abstract void onNext(T1 t1, T2 t2);
22+
public void unsubscribe() {
23+
24+
}
25+
26+
@Override
27+
public boolean isUnsubscribed() {
28+
return false;
29+
}
1830

1931
}

src/main/java/rx/simple/SimpleDualExtendingOperator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ public class SimpleDualExtendingOperator<R1, R2, T1, T2> implements DualExtendin
1010

1111
private DualOperator<R1, R2, T1, T2> operator;
1212

13-
SimpleDualExtendingOperator(DualOperator<R1, R2, T1, T2> operator) {
13+
public SimpleDualExtendingOperator(DualOperator<R1, R2, T1, T2> operator) {
1414
this.operator = operator;
1515
}
1616

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package rx.simple.operator;
2+
3+
import rx.dual.DualExtendingOperator;
4+
import rx.dual.DualOperator;
5+
import rx.dual.DualSubscriber;
6+
import rx.functions.Func2;
7+
import rx.simple.SimpleDualExtendingOperator;
8+
import rx.simple.SimpleDualObservable;
9+
10+
public class DualOperators {
11+
12+
public static <T1, T2, R1> DualExtendingOperator<SimpleDualObservable<R1, T2>, R1, T2, T1, T2> map1(Func2<? super T1, ? super T2, ? extends R1> mapFunc) {
13+
return new SimpleDualExtendingOperator<R1, T2, T1, T2>(new DualOperator<R1, T2, T1, T2>() {
14+
15+
@Override
16+
public DualSubscriber<? super T1, ? super T2> call(DualSubscriber<? super R1, ? super T2> subscriber) {
17+
return new DualSubscriber<T1, T2>() {
18+
19+
@Override
20+
public void onCompleted() {
21+
subscriber.onCompleted();
22+
}
23+
24+
@Override
25+
public void onError(Throwable e) {
26+
subscriber.onError(e);
27+
}
28+
29+
@Override
30+
public void onNext(T1 t1, T2 t2) {
31+
subscriber.onNext(mapFunc.call(t1, t2), t2);
32+
}};
33+
}});
34+
}
35+
36+
}

src/test/java/rx/simple/SimpleSingleObservableTest.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import org.junit.Test;
44

55
import static rx.simple.operator.CoreOperators.*;
6+
import static rx.simple.operator.DualOperators.*;
67
import static rx.simple.operator.ConversionOperators.*;
78
import rx.Subscriber;
89
import rx.dual.DualSubscriber;
@@ -51,7 +52,12 @@ public void testConversion() {
5152
sub.onNext(new Integer(i));
5253
sub.onCompleted();
5354
})
54-
.extend(generate(i -> i.toString()))
55+
.extend(
56+
generate(i -> i.toString())
57+
)
58+
.extend(
59+
map1((Integer i, String s) -> i * s.length())
60+
)
5561
.subscribe(new DualSubscriber<Integer, String>() {
5662
@Override
5763
public void onNext(Integer t1, String t2) {

0 commit comments

Comments
 (0)