Skip to content

Commit 5c02998

Browse files
committed
Added tests for FluxWithSingle
1 parent 9c47a2c commit 5c02998

File tree

5 files changed

+1138
-2
lines changed

5 files changed

+1138
-2
lines changed

reactor/build.gradle

+5
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,8 @@ dependencies {
1515
api group: 'io.projectreactor', name: 'reactor-core', version: '3.3.1.RELEASE'
1616
implementation group: 'org.jetbrains', name: 'annotations', version: jetbrainsAnnotationsVersion
1717
}
18+
19+
dependencies {
20+
testImplementation group: 'io.projectreactor', name: 'reactor-test', version: '3.3.1.RELEASE'
21+
testImplementation group: 'com.google.guava', name: 'guava', version: '24.1-jre'
22+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
/*
2+
* Copyright 2020 dc-square and the HiveMQ MQTT Client Project
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package com.hivemq.client.rx.reactor;
19+
20+
import org.jetbrains.annotations.NotNull;
21+
import org.jetbrains.annotations.Nullable;
22+
import org.reactivestreams.Subscription;
23+
import reactor.core.CoreSubscriber;
24+
import reactor.core.publisher.Flux;
25+
26+
/**
27+
* @author Silvio Giebl
28+
*/
29+
public class FluxWithSingleItem<F, S> extends FluxWithSingle<F, S> {
30+
31+
private final @NotNull Flux<F> source;
32+
private final @NotNull S single;
33+
private final int index;
34+
35+
public FluxWithSingleItem(final @NotNull Flux<F> source, final @NotNull S single, final int index) {
36+
this.source = source;
37+
this.single = single;
38+
this.index = index;
39+
}
40+
41+
@Override
42+
public void subscribe(final @NotNull CoreSubscriber<? super F> subscriber) {
43+
source.subscribe(subscriber);
44+
}
45+
46+
@Override
47+
public void subscribeBoth(final @NotNull CoreWithSingleSubscriber<? super F, ? super S> subscriber) {
48+
source.subscribe(new SingleItemSubscriber<>(subscriber, single, index));
49+
}
50+
51+
private static class SingleItemSubscriber<F, S> implements CoreSubscriber<F>, Subscription {
52+
53+
private final @NotNull CoreWithSingleSubscriber<? super F, ? super S> subscriber;
54+
private final @NotNull S single;
55+
private int index;
56+
private int currentIndex;
57+
private @Nullable Subscription subscription;
58+
59+
SingleItemSubscriber(
60+
final @NotNull CoreWithSingleSubscriber<? super F, ? super S> subscriber, final @NotNull S single,
61+
final int index) {
62+
63+
this.subscriber = subscriber;
64+
this.single = single;
65+
this.index = index;
66+
}
67+
68+
@Override
69+
public void onSubscribe(final @NotNull Subscription subscription) {
70+
this.subscription = subscription;
71+
subscriber.onSubscribe(this);
72+
}
73+
74+
@Override
75+
public void onNext(final @NotNull F f) {
76+
subscriber.onNext(f);
77+
if (index == ++currentIndex) {
78+
index = -1;
79+
subscriber.onSingle(single);
80+
}
81+
}
82+
83+
@Override
84+
public void onError(final @NotNull Throwable error) {
85+
subscriber.onError(error);
86+
}
87+
88+
@Override
89+
public void onComplete() {
90+
subscriber.onComplete();
91+
}
92+
93+
@Override
94+
public void request(final long n) {
95+
assert subscription != null;
96+
if (index == 0) {
97+
index = -1;
98+
subscriber.onSingle(single);
99+
}
100+
subscription.request(n);
101+
}
102+
103+
@Override
104+
public void cancel() {
105+
assert subscription != null;
106+
subscription.cancel();
107+
}
108+
}
109+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
/*
2+
* Copyright 2020 dc-square and the HiveMQ MQTT Client Project
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package com.hivemq.client.rx.reactor;
19+
20+
import com.hivemq.client.rx.reactivestreams.WithSingleSubscriber;
21+
import org.jetbrains.annotations.NotNull;
22+
import org.jetbrains.annotations.Nullable;
23+
import org.reactivestreams.Subscriber;
24+
import org.reactivestreams.Subscription;
25+
import reactor.core.CoreSubscriber;
26+
import reactor.core.publisher.Flux;
27+
28+
/**
29+
* @author Silvio Giebl
30+
*/
31+
public class FluxWithSingleSplit<U, F, S> extends FluxWithSingle<F, S> {
32+
33+
private final @NotNull Flux<U> source;
34+
private final @NotNull Class<F> flowableClass;
35+
private final @NotNull Class<S> singleClass;
36+
37+
public FluxWithSingleSplit(
38+
final @NotNull Flux<U> source, final @NotNull Class<F> flowableClass, final @NotNull Class<S> singleClass) {
39+
40+
this.source = source;
41+
this.flowableClass = flowableClass;
42+
this.singleClass = singleClass;
43+
}
44+
45+
@Override
46+
public void subscribe(final @NotNull CoreSubscriber<? super F> subscriber) {
47+
source.subscribe(new SplitSubscriber<>(subscriber, flowableClass, singleClass));
48+
}
49+
50+
@Override
51+
public void subscribeBoth(final @NotNull CoreWithSingleSubscriber<? super F, ? super S> subscriber) {
52+
source.subscribe(new SplitSubscriber<>(subscriber, flowableClass, singleClass));
53+
}
54+
55+
private static class SplitSubscriber<U, F, S> implements Subscriber<U>, Subscription {
56+
57+
private final @NotNull Subscriber<? super F> subscriber;
58+
private final @NotNull Class<F> flowableClass;
59+
private final @NotNull Class<S> singleClass;
60+
private @Nullable Subscription subscription;
61+
62+
SplitSubscriber(
63+
final @NotNull Subscriber<? super F> subscriber, final @NotNull Class<F> flowableClass,
64+
final @NotNull Class<S> singleClass) {
65+
66+
this.subscriber = subscriber;
67+
this.flowableClass = flowableClass;
68+
this.singleClass = singleClass;
69+
}
70+
71+
@Override
72+
public void onSubscribe(final @NotNull Subscription subscription) {
73+
this.subscription = subscription;
74+
subscriber.onSubscribe(this);
75+
}
76+
77+
@Override
78+
public void onNext(final @NotNull U u) {
79+
assert subscription != null;
80+
if (singleClass.isInstance(u)) {
81+
if (subscriber instanceof WithSingleSubscriber) {
82+
//noinspection unchecked
83+
((WithSingleSubscriber<F, S>) subscriber).onSingle(singleClass.cast(u));
84+
}
85+
subscription.request(1);
86+
} else if (flowableClass.isInstance(u)) {
87+
subscriber.onNext(flowableClass.cast(u));
88+
} else {
89+
subscription.request(1);
90+
}
91+
}
92+
93+
@Override
94+
public void onError(final @NotNull Throwable error) {
95+
subscriber.onError(error);
96+
}
97+
98+
@Override
99+
public void onComplete() {
100+
subscriber.onComplete();
101+
}
102+
103+
@Override
104+
public void request(final long n) {
105+
assert subscription != null;
106+
subscription.request(n);
107+
}
108+
109+
@Override
110+
public void cancel() {
111+
assert subscription != null;
112+
subscription.cancel();
113+
}
114+
}
115+
}

0 commit comments

Comments
 (0)