Skip to content

Commit e3d20ec

Browse files
committed
Improved FlowableWithSingleObserveOn/FluxWithSinglePublishOn
Restored Flowable/FluxWithSingleCombine and fixed concurrency issues
1 parent 7276151 commit e3d20ec

File tree

4 files changed

+542
-373
lines changed

4 files changed

+542
-373
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
/*
2+
* Copyright 2018 dc-square and the HiveMQ MQTT Client Project
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*
16+
*/
17+
18+
package com.hivemq.client.internal.rx.reactor.operators;
19+
20+
import com.hivemq.client.internal.rx.reactor.CoreWithSingleConditionalSubscriber;
21+
import com.hivemq.client.rx.reactor.CoreWithSingleSubscriber;
22+
import com.hivemq.client.rx.reactor.FluxWithSingle;
23+
import org.jetbrains.annotations.NotNull;
24+
import org.jetbrains.annotations.Nullable;
25+
import org.reactivestreams.Subscription;
26+
import reactor.core.CoreSubscriber;
27+
import reactor.core.Fuseable;
28+
import reactor.core.publisher.Flux;
29+
import reactor.core.publisher.Operators;
30+
import reactor.util.context.Context;
31+
32+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
33+
34+
/**
35+
* @author Silvio Giebl
36+
*/
37+
class FluxWithSingleCombine<F, S> extends Flux<Object> {
38+
39+
private final @NotNull FluxWithSingle<F, S> source;
40+
41+
FluxWithSingleCombine(final @NotNull FluxWithSingle<F, S> source) {
42+
this.source = source;
43+
}
44+
45+
@Override
46+
public void subscribe(final @NotNull CoreSubscriber<? super Object> subscriber) {
47+
source.subscribeBoth(new CombineSubscriber<>(subscriber));
48+
}
49+
50+
private static class CombineSubscriber<F, S> implements CoreWithSingleSubscriber<F, S>, Subscription {
51+
52+
private static final @NotNull Object COMPLETE = new Object();
53+
@SuppressWarnings("rawtypes")
54+
private static final @NotNull AtomicLongFieldUpdater<CombineSubscriber> REQUESTED =
55+
AtomicLongFieldUpdater.newUpdater(CombineSubscriber.class, "requested");
56+
57+
private final @NotNull CoreSubscriber<? super Object> subscriber;
58+
private @Nullable Subscription subscription;
59+
private volatile long requested;
60+
61+
private @Nullable Object queued;
62+
private @Nullable Object done;
63+
64+
CombineSubscriber(final @NotNull CoreSubscriber<? super Object> subscriber) {
65+
this.subscriber = subscriber;
66+
}
67+
68+
@Override
69+
public void onSubscribe(final @NotNull Subscription subscription) {
70+
this.subscription = subscription;
71+
subscriber.onSubscribe(this);
72+
}
73+
74+
@Override
75+
public void onSingle(final @NotNull S s) {
76+
next(new SingleElement(s));
77+
}
78+
79+
@Override
80+
public void onNext(final @NotNull F f) {
81+
next(f);
82+
}
83+
84+
private void next(final @NotNull Object next) {
85+
if (REQUESTED.get(this) == 0) {
86+
synchronized (this) {
87+
if (REQUESTED.get(this) == 0) {
88+
queued = next;
89+
return;
90+
}
91+
}
92+
}
93+
Operators.produced(REQUESTED, this, 1);
94+
subscriber.onNext(next);
95+
}
96+
97+
@Override
98+
public void onComplete() {
99+
synchronized (this) {
100+
if (queued != null) {
101+
done = COMPLETE;
102+
return;
103+
}
104+
subscriber.onComplete();
105+
}
106+
}
107+
108+
@Override
109+
public void onError(final @NotNull Throwable error) {
110+
synchronized (this) {
111+
if (queued != null) {
112+
done = error;
113+
return;
114+
}
115+
subscriber.onError(error);
116+
}
117+
}
118+
119+
@Override
120+
public void request(long n) {
121+
assert subscription != null;
122+
if (n > 0) {
123+
if (Operators.addCap(REQUESTED, this, n) == 0) {
124+
synchronized (this) {
125+
final Object queued = this.queued;
126+
if (queued != null) {
127+
this.queued = null;
128+
Operators.produced(REQUESTED, this, 1);
129+
subscriber.onNext(queued);
130+
n--;
131+
final Object done = this.done;
132+
if (done != null) {
133+
this.done = null;
134+
if (done instanceof Throwable) {
135+
subscriber.onError((Throwable) done);
136+
} else {
137+
subscriber.onComplete();
138+
}
139+
}
140+
}
141+
if (n > 0) {
142+
subscription.request(n);
143+
}
144+
}
145+
} else {
146+
subscription.request(n);
147+
}
148+
}
149+
}
150+
151+
@Override
152+
public void cancel() {
153+
assert subscription != null;
154+
subscription.cancel();
155+
}
156+
157+
@Override
158+
public @NotNull Context currentContext() {
159+
return subscriber.currentContext();
160+
}
161+
}
162+
163+
static <F, S> void split(
164+
final @NotNull Flux<Object> source,
165+
final @NotNull CoreWithSingleSubscriber<? super F, ? super S> subscriber) {
166+
167+
if (subscriber instanceof CoreWithSingleConditionalSubscriber) {
168+
//noinspection unchecked
169+
source.subscribe(new SplitSubscriber.Conditional<>(
170+
(CoreWithSingleConditionalSubscriber<? super F, ? super S>) subscriber));
171+
} else {
172+
source.subscribe(new SplitSubscriber.Default<>(subscriber));
173+
}
174+
}
175+
176+
private static abstract class SplitSubscriber<F, S, T extends CoreWithSingleSubscriber<? super F, ? super S>>
177+
implements Fuseable.ConditionalSubscriber<Object>, Subscription {
178+
179+
final @NotNull T subscriber;
180+
private @Nullable Subscription subscription;
181+
182+
SplitSubscriber(final @NotNull T subscriber) {
183+
this.subscriber = subscriber;
184+
}
185+
186+
@Override
187+
public void onSubscribe(final @NotNull Subscription subscription) {
188+
this.subscription = subscription;
189+
subscriber.onSubscribe(this);
190+
}
191+
192+
@Override
193+
public void onNext(final @NotNull Object o) {
194+
if (!tryOnNext(o)) {
195+
assert subscription != null;
196+
subscription.request(1);
197+
}
198+
}
199+
200+
@Override
201+
public boolean tryOnNext(final @NotNull Object o) {
202+
if (o instanceof SingleElement) {
203+
//noinspection unchecked
204+
subscriber.onSingle((S) ((SingleElement) o).element);
205+
return false;
206+
}
207+
//noinspection unchecked
208+
return tryOnNextActual((F) o);
209+
}
210+
211+
abstract boolean tryOnNextActual(final @NotNull F f);
212+
213+
@Override
214+
public void onError(final @NotNull Throwable throwable) {
215+
subscriber.onError(throwable);
216+
}
217+
218+
@Override
219+
public void onComplete() {
220+
subscriber.onComplete();
221+
}
222+
223+
@Override
224+
public void request(final long n) {
225+
assert subscription != null;
226+
subscription.request(n);
227+
}
228+
229+
@Override
230+
public void cancel() {
231+
assert subscription != null;
232+
subscription.cancel();
233+
}
234+
235+
@Override
236+
public @NotNull Context currentContext() {
237+
return subscriber.currentContext();
238+
}
239+
240+
private static class Default<F, S>
241+
extends SplitSubscriber<F, S, CoreWithSingleSubscriber<? super F, ? super S>> {
242+
243+
Default(final @NotNull CoreWithSingleSubscriber<? super F, ? super S> subscriber) {
244+
super(subscriber);
245+
}
246+
247+
@Override
248+
boolean tryOnNextActual(final @NotNull F f) {
249+
subscriber.onNext(f);
250+
return true;
251+
}
252+
}
253+
254+
private static class Conditional<F, S>
255+
extends SplitSubscriber<F, S, CoreWithSingleConditionalSubscriber<? super F, ? super S>> {
256+
257+
Conditional(final @NotNull CoreWithSingleConditionalSubscriber<? super F, ? super S> subscriber) {
258+
super(subscriber);
259+
}
260+
261+
@Override
262+
boolean tryOnNextActual(final @NotNull F f) {
263+
return subscriber.tryOnNext(f);
264+
}
265+
}
266+
}
267+
268+
private static class SingleElement {
269+
270+
final @NotNull Object element;
271+
272+
SingleElement(final @NotNull Object element) {
273+
this.element = element;
274+
}
275+
}
276+
}

0 commit comments

Comments
 (0)