Skip to content

Feature/automatic resubscribe #398

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Apr 18, 2020
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
c223a56
Enable subscribe(Stream), unsubscribe independent of connect (similar…
SgtSilvio Apr 1, 2020
f04dfa2
Refactored global publish flow subscribe
SgtSilvio Apr 1, 2020
1a2cb9a
Fixed MqttSubscriptionFlowsTest
SgtSilvio Apr 1, 2020
bd16bf6
Removed unnecessary parameter from MqttSubscriptionFlows.unsubscribe
SgtSilvio Apr 2, 2020
3240415
Automatic resubscribe when no session is present and automatically re…
SgtSilvio Apr 3, 2020
f8b3269
Refactor MqttIncomingPublishFlows method order
SgtSilvio Apr 3, 2020
d98399f
Added comments to assert statements
SgtSilvio Apr 3, 2020
44d0093
Improved MqttSubscriptionFlowsTest and MqttSubscriptionFlowTreeTest
SgtSilvio Apr 3, 2020
6feca4c
Improved FlowableWithSingleObserveOn/FluxWithSinglePublishOn
SgtSilvio Apr 3, 2020
09d9db0
Improved FlowableWithSingleObserveOn: delay complete/error when singl…
SgtSilvio Apr 4, 2020
3334672
Improved MqttPublishFlowableAckLink: ensured request and cancel are c…
SgtSilvio Apr 4, 2020
bf8899e
Refactored MqttAckSingleFlowable and MqttAckSingle
SgtSilvio Apr 4, 2020
056e4ac
Small refactorings
SgtSilvio Apr 4, 2020
c1a22f9
Improved FlowableWithSingleObserveOn/FluxWithSinglePublishOn
SgtSilvio Apr 5, 2020
993b9ca
Improved Flowable/FluxWithSingleCombine
SgtSilvio Apr 8, 2020
4308b56
Improved MqttSubscriptionHandler for automatic resubscribe
SgtSilvio Apr 9, 2020
0341918
Refactored MqttOutgoing/IncomingQosHandler similar to MqttSubscriptio…
SgtSilvio Apr 9, 2020
d1b72ab
Unify MqttSubscriptionHandler and MqttOutgoingQosHandler
SgtSilvio Apr 9, 2020
a057665
Reduce flush calls for many subscribe packets (reduces used network b…
SgtSilvio Apr 9, 2020
828edad
Renamed MqttSubscriptionFlows -> MqttSubscribedPublishFlows as this i…
SgtSilvio Apr 9, 2020
1bd97f4
Removed dependency injection from MqttPublishFlowables and MqttSubscr…
SgtSilvio Apr 17, 2020
0d53c11
Added Reconnector.resubscribeIfSessionExpired and republishIfSessionE…
SgtSilvio Apr 18, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import reactor.util.context.Context;

import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

/**
* @author Silvio Giebl
Expand All @@ -50,17 +49,17 @@ public void subscribe(final @NotNull CoreSubscriber<? super Object> subscriber)

private static class CombineSubscriber<F, S> implements CoreWithSingleSubscriber<F, S>, Subscription {

private static final @NotNull Object COMPLETE = new Object();
@SuppressWarnings("rawtypes")
static final @NotNull AtomicLongFieldUpdater<CombineSubscriber> REQUESTED =
private static final @NotNull AtomicLongFieldUpdater<CombineSubscriber> REQUESTED =
AtomicLongFieldUpdater.newUpdater(CombineSubscriber.class, "requested");
@SuppressWarnings("rawtypes")
static final @NotNull AtomicReferenceFieldUpdater<CombineSubscriber, Object> QUEUED =
AtomicReferenceFieldUpdater.newUpdater(CombineSubscriber.class, Object.class, "queued");

private final @NotNull CoreSubscriber<? super Object> subscriber;
private @Nullable Subscription subscription;
volatile long requested;
volatile @Nullable Object queued;
private volatile long requested;

private @Nullable Object queued;
private @Nullable Object done;

CombineSubscriber(final @NotNull CoreSubscriber<? super Object> subscriber) {
this.subscriber = subscriber;
Expand All @@ -84,57 +83,67 @@ public void onNext(final @NotNull F f) {

private void next(final @NotNull Object next) {
if (REQUESTED.get(this) == 0) {
QUEUED.set(this, next);
if ((REQUESTED.get(this) != 0) && (QUEUED.getAndSet(this, null)) != null) {
Operators.produced(REQUESTED, this, 1);
subscriber.onNext(next);
synchronized (this) {
if (REQUESTED.get(this) == 0) {
queued = next;
return;
}
}
} else {
Operators.produced(REQUESTED, this, 1);
subscriber.onNext(next);
}
Operators.produced(REQUESTED, this, 1);
subscriber.onNext(next);
}

@Override
public void onError(final @NotNull Throwable throwable) {
final Object next = QUEUED.get(this);
if ((next == null) || !QUEUED.compareAndSet(this, next, new TerminalElement(next, throwable))) {
subscriber.onError(throwable);
public void onComplete() {
synchronized (this) {
if (queued != null) {
done = COMPLETE;
} else {
subscriber.onComplete();
}
}
}

@Override
public void onComplete() {
final Object next = QUEUED.get(this);
if ((next == null) || !QUEUED.compareAndSet(this, next, new TerminalElement(next, null))) {
subscriber.onComplete();
public void onError(final @NotNull Throwable error) {
synchronized (this) {
if (queued != null) {
done = error;
} else {
subscriber.onError(error);
}
}
}

@Override
public void request(long n) {
assert subscription != null;
if (n > 0) {
if (REQUESTED.get(this) == 0) {
final Object next = QUEUED.getAndSet(this, null);
if (next != null) {
if (next instanceof TerminalElement) {
final TerminalElement terminalElement = (TerminalElement) next;
subscriber.onNext(terminalElement.element);
if (terminalElement.error == null) {
subscriber.onComplete();
} else {
subscriber.onError(terminalElement.error);
}
return;
} else {
subscriber.onNext(next);
if (Operators.addCap(REQUESTED, this, n) == 0) {
synchronized (this) {
final Object queued = this.queued;
if (queued != null) {
this.queued = null;
Operators.produced(REQUESTED, this, 1);
subscriber.onNext(queued);
n--;
final Object done = this.done;
if (done != null) {
this.done = null;
if (done instanceof Throwable) {
subscriber.onError((Throwable) done);
} else {
subscriber.onComplete();
}
return;
}
}
if (n > 0) {
subscription.request(n);
}
}
}
if (n > 0) {
Operators.addCap(REQUESTED, this, n);
} else {
subscription.request(n);
}
}
Expand Down Expand Up @@ -265,15 +274,4 @@ private static class SingleElement {
this.element = element;
}
}

private static class TerminalElement {

final @NotNull Object element;
final @Nullable Throwable error;

TerminalElement(final @NotNull Object element, final @Nullable Throwable error) {
this.element = element;
this.error = error;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -253,16 +253,15 @@ void mapBoth_multiple(final @NotNull FluxWithSingle<String, StringBuilder> fluxW

final AtomicInteger nextCounter = new AtomicInteger();
final AtomicInteger singleCounter = new AtomicInteger();
fluxWithSingle //
.mapBoth(s -> {
nextCounter.incrementAndGet();
assertNotEquals("test_thread", Thread.currentThread().getName());
return s + "-1";
}, stringBuilder -> {
assertEquals(1, singleCounter.incrementAndGet());
assertNotEquals("test_thread", Thread.currentThread().getName());
return stringBuilder.append("-1");
}).mapBoth(s -> {
fluxWithSingle.mapBoth(s -> {
nextCounter.incrementAndGet();
assertNotEquals("test_thread", Thread.currentThread().getName());
return s + "-1";
}, stringBuilder -> {
assertEquals(1, singleCounter.incrementAndGet());
assertNotEquals("test_thread", Thread.currentThread().getName());
return stringBuilder.append("-1");
}).mapBoth(s -> {
nextCounter.incrementAndGet();
assertNotEquals("test_thread", Thread.currentThread().getName());
return s + "-2";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.hivemq.client.internal.mqtt;

import com.hivemq.client.internal.mqtt.exceptions.MqttClientStateExceptions;
import com.hivemq.client.internal.mqtt.message.connect.MqttConnect;
import com.hivemq.client.internal.mqtt.message.disconnect.MqttDisconnect;
import com.hivemq.client.internal.mqtt.message.publish.MqttPublish;
Expand Down Expand Up @@ -96,6 +97,9 @@ public class MqttBlockingClient implements Mqtt5BlockingClient {
public @NotNull Mqtt5SubAck subscribe(final @Nullable Mqtt5Subscribe subscribe) {
final MqttSubscribe mqttSubscribe = MqttChecks.subscribe(subscribe);
try {
if (!getState().isConnectedOrReconnect()) {
throw MqttClientStateExceptions.notConnected();
}
return handleSubAck(delegate.subscribeUnsafe(mqttSubscribe).blockingGet());
} catch (final RuntimeException e) {
throw AsyncRuntimeException.fillInStackTrace(e);
Expand All @@ -113,6 +117,9 @@ public class MqttBlockingClient implements Mqtt5BlockingClient {
public @NotNull Mqtt5UnsubAck unsubscribe(final @Nullable Mqtt5Unsubscribe unsubscribe) {
final MqttUnsubscribe mqttUnsubscribe = MqttChecks.unsubscribe(unsubscribe);
try {
if (!getState().isConnectedOrReconnect()) {
throw MqttClientStateExceptions.notConnected();
}
return handleUnsubAck(delegate.unsubscribeUnsafe(mqttUnsubscribe).blockingGet());
} catch (final RuntimeException e) {
throw AsyncRuntimeException.fillInStackTrace(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void releaseEventLoop() {
if (--eventLoopAcquires == 0) {
final EventLoop eventLoop = this.eventLoop;
final long eventLoopAcquireCount = this.eventLoopAcquireCount;
assert eventLoop != null;
assert eventLoop != null : "eventLoopAcquires was > 0 -> eventLoop != null";
eventLoop.execute(() -> { // release eventLoop after all tasks are finished
synchronized (state) {
if (eventLoopAcquireCount == this.eventLoopAcquireCount) { // eventLoop has not been reacquired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public class Mqtt5ConnAckDecoder implements MqttMessageDecoder {
throw new MqttDecoderException(Mqtt5DisconnectReasonCode.PROTOCOL_ERROR, "wrong maximum Qos");
}
maximumQos = MqttQos.fromCode(maximumQosCode);
assert maximumQos != null;
assert maximumQos != null : "maximumQosCode = 0 or = 1";
maximumQosPresent = true;
restrictionsPresent |= maximumQos != DEFAULT_MAXIMUM_QOS;
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,7 @@ private void encodePayload(final @NotNull MqttStatefulSubscribe message, final @
final MqttSubscription subscription = subscriptions.get(i);

subscription.getTopicFilter().encode(out);

int subscriptionOptions = 0;
subscriptionOptions |= subscription.getRetainHandling().getCode() << 4;
if (subscription.isRetainAsPublished()) {
subscriptionOptions |= 0b0000_1000;
}
if (subscription.isNoLocal()) {
subscriptionOptions |= 0b0000_0100;
}
subscriptionOptions |= subscription.getQos().getCode();

out.writeByte(subscriptionOptions);
out.writeByte(subscription.encodeSubscriptionOptions());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.Arrays;

/**
* @author Silvio Giebl
* @see MqttTopicFilter
Expand Down Expand Up @@ -286,6 +288,11 @@ int getFilterByteStart() {
return toString();
}

public @Nullable byte[] getPrefix() {
final int filterByteStart = getFilterByteStart();
return (filterByteStart == 0) ? null : Arrays.copyOfRange(toBinary(), 0, filterByteStart - 1);
}

@Override
public boolean matches(final @Nullable MqttTopic topic) {
return matches(MqttChecks.topic(topic));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package com.hivemq.client.internal.mqtt.datatypes;

import com.hivemq.client.internal.util.ByteArrayUtil;
import com.hivemq.client.mqtt.datatypes.MqttTopic;
import com.hivemq.client.mqtt.datatypes.MqttTopicFilter;
import org.jetbrains.annotations.NotNull;

import java.util.Arrays;
Expand Down Expand Up @@ -84,7 +82,7 @@ public boolean hasMultiLevelWildcard() {
throw new NoSuchElementException();
}
start = end + 1;
end = ByteArrayUtil.indexOf(array, start, (byte) MqttTopic.TOPIC_LEVEL_SEPARATOR);
end = ByteArrayUtil.indexOf(array, start, (byte) MqttTopicImpl.TOPIC_LEVEL_SEPARATOR);
return this;
}

Expand All @@ -103,7 +101,7 @@ public boolean forwardIfEqual(final @NotNull MqttTopicLevels levels) {
final byte[] levelsArray = levels.getArray();
final int levelsEnd = levels.getEnd();
final int to = end + levelsArray.length - levelsEnd;
if ((to <= allEnd) && ((to == allEnd) || (array[to] == MqttTopic.TOPIC_LEVEL_SEPARATOR)) &&
if ((to <= allEnd) && ((to == allEnd) || (array[to] == MqttTopicImpl.TOPIC_LEVEL_SEPARATOR)) &&
ByteArrayUtil.equals(array, end + 1, to, levelsArray, levelsEnd + 1, levelsArray.length)) {
start = end = to;
return true;
Expand Down Expand Up @@ -171,7 +169,7 @@ public boolean forwardIfMatch(final @NotNull MqttTopicLevels levels) {
if (array[index] == lb) {
index++;
levelsIndex++;
} else if (lb == MqttTopicFilter.SINGLE_LEVEL_WILDCARD) {
} else if (lb == MqttTopicFilterImpl.SINGLE_LEVEL_WILDCARD) {
while ((index < allEnd) && (array[index] != MqttTopicImpl.TOPIC_LEVEL_SEPARATOR)) {
index++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package com.hivemq.client.internal.mqtt.datatypes;

import com.hivemq.client.internal.util.ByteArray;
import com.hivemq.client.mqtt.datatypes.MqttTopicFilter;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.Arrays;

Expand All @@ -32,7 +32,7 @@
public class MqttTopicLevel extends ByteArray {

private static final @NotNull MqttTopicLevel SINGLE_LEVEL_WILDCARD =
new MqttTopicLevel(new byte[]{MqttTopicFilter.SINGLE_LEVEL_WILDCARD});
new MqttTopicLevel(new byte[]{MqttTopicFilterImpl.SINGLE_LEVEL_WILDCARD});

static @NotNull MqttTopicLevel of(final @NotNull byte[] array, final int start, final int end) {
if (isSingleLevelWildcard(array, start, end)) {
Expand All @@ -42,7 +42,7 @@ public class MqttTopicLevel extends ByteArray {
}

private static boolean isSingleLevelWildcard(final @NotNull byte[] array, final int start, final int end) {
return ((end - start) == 1) && (array[start] == MqttTopicFilter.SINGLE_LEVEL_WILDCARD);
return ((end - start) == 1) && (array[start] == MqttTopicFilterImpl.SINGLE_LEVEL_WILDCARD);
}

MqttTopicLevel(final @NotNull byte[] array) {
Expand All @@ -60,4 +60,28 @@ public boolean isSingleLevelWildcard() {
public @NotNull MqttTopicLevel trim() {
return this;
}

public @Nullable MqttTopicFilterImpl toFilter(final @Nullable byte[] prefix, final boolean multiLevelWildcard) {
final byte[] bytes;
if (prefix != null) {
if (multiLevelWildcard) {
bytes = new byte[prefix.length + 1 + array.length + 2];
bytes[bytes.length - 2] = MqttTopicImpl.TOPIC_LEVEL_SEPARATOR;
bytes[bytes.length - 1] = MqttTopicFilterImpl.MULTI_LEVEL_WILDCARD;
} else {
bytes = new byte[prefix.length + 1 + array.length];
}
System.arraycopy(prefix, 0, bytes, 0, prefix.length);
bytes[prefix.length] = MqttTopicImpl.TOPIC_LEVEL_SEPARATOR;
System.arraycopy(array, 0, bytes, prefix.length + 1, array.length);
} else if (multiLevelWildcard) {
bytes = new byte[array.length + 2];
System.arraycopy(array, 0, bytes, 0, array.length);
bytes[bytes.length - 2] = MqttTopicImpl.TOPIC_LEVEL_SEPARATOR;
bytes[bytes.length - 1] = MqttTopicFilterImpl.MULTI_LEVEL_WILDCARD;
} else {
bytes = array;
}
return MqttTopicFilterImpl.of(bytes);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package com.hivemq.client.internal.mqtt.datatypes;

import com.hivemq.client.internal.util.ByteArrayUtil;
import com.hivemq.client.mqtt.datatypes.MqttTopic;
import org.jetbrains.annotations.NotNull;

import java.util.Arrays;
Expand All @@ -40,7 +39,7 @@ public class MqttTopicLevels extends MqttTopicLevel {
final byte[] array2 = level2.trim().getArray();
final byte[] array = new byte[array1.length + 1 + array2.length];
System.arraycopy(array1, 0, array, 0, array1.length);
array[array1.length] = MqttTopic.TOPIC_LEVEL_SEPARATOR;
array[array1.length] = MqttTopicImpl.TOPIC_LEVEL_SEPARATOR;
System.arraycopy(array2, 0, array, array1.length + 1, array2.length);
return new MqttTopicLevels(array, level1.length());
}
Expand All @@ -61,17 +60,17 @@ protected int getEnd() {
if (index == array.length) {
return this;
}
assert array[index] == MqttTopic.TOPIC_LEVEL_SEPARATOR;
assert array[index] == MqttTopicImpl.TOPIC_LEVEL_SEPARATOR : "topic levels must only be split on /";
if (index == firstEnd) {
return MqttTopicLevel.of(array, 0, firstEnd);
}
return new MqttTopicLevels(Arrays.copyOfRange(array, 0, index), firstEnd);
}

public @NotNull MqttTopicLevel after(final int index) {
assert array[index] == MqttTopic.TOPIC_LEVEL_SEPARATOR;
assert array[index] == MqttTopicImpl.TOPIC_LEVEL_SEPARATOR : "topic levels must only be split on /";
final int start = index + 1;
final int end = ByteArrayUtil.indexOf(array, start, (byte) MqttTopic.TOPIC_LEVEL_SEPARATOR);
final int end = ByteArrayUtil.indexOf(array, start, (byte) MqttTopicImpl.TOPIC_LEVEL_SEPARATOR);
if (end == array.length) {
return MqttTopicLevel.of(array, start, array.length);
}
Expand Down
Loading