Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/manual ack #403

Merged
merged 26 commits into from
Apr 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
3854d5f
Added manualAcknowledgement to MqttIncomingPublishFlow, Mqtt3/5Subscr…
SgtSilvio Apr 7, 2020
035980b
Fixed Mqtt3RxClientViewExceptionsTest
SgtSilvio Apr 7, 2020
86fe847
Moved default methods of Mqtt3/5Rx/Async/BlockingClient to implementa…
SgtSilvio Apr 7, 2020
f9ef7a4
Added CheckReturnValue annotations to Mqtt3/RxClient
SgtSilvio Apr 7, 2020
2ef1882
Moved default methods of Mqtt3/5ReactorClient to implementation
SgtSilvio Apr 7, 2020
9fbbbb9
Added manualAcknowledgement to Mqtt3/5ReactorClient publishes methods
SgtSilvio Apr 7, 2020
af85357
Added manualAcknowledgement to subscribe(Stream) methods instead of p…
SgtSilvio Apr 7, 2020
c0fbb85
Fixed Mqtt3RxClientViewExceptionsTest
SgtSilvio Apr 7, 2020
b31f4ab
Refactored new IllegalStateException calls with Checks.state
SgtSilvio Apr 8, 2020
cc41b15
Deprecated subscribeStream(With), replaced with subscribePublishes(Wi…
SgtSilvio Apr 8, 2020
a447a16
Readded default toRx/Async/Blocking methods (as they do not require i…
SgtSilvio Apr 8, 2020
c5b7187
Temporarily skip classes from japicc to avoid false positives
SgtSilvio Apr 8, 2020
de94000
Removed unnecessary formatter:off tags
SgtSilvio Apr 8, 2020
2a88446
Added javadoc for Mqtt3/5AsyncClient.Mqtt3/5SubscribeAndCallbackBuilder
SgtSilvio Apr 8, 2020
c01efc3
Implemented manual acknowledgement
SgtSilvio Apr 15, 2020
26cd99b
Added Mqtt3Publish.acknowledge
SgtSilvio Apr 15, 2020
d6e5106
Fixed unit tests
SgtSilvio Apr 15, 2020
0345b2c
Improved exceptional cases for Mqtt3/5Publish.acknowledge
SgtSilvio Apr 15, 2020
a914cb9
Reduced visibility of internal classes
SgtSilvio Apr 15, 2020
eeb76d3
Fixed NotNull/Nullable annotation inspections
SgtSilvio Apr 15, 2020
ce34592
Improved acknowledge drain
SgtSilvio Apr 15, 2020
fece2dd
Improved MqttMatchingPublishFlows.acknowledge
SgtSilvio Apr 15, 2020
efb30da
Fixed missing final inspection
SgtSilvio Apr 15, 2020
2109e50
Added missing javadoc parameter
SgtSilvio Apr 15, 2020
0d9a541
Fixed javadoc typos
SgtSilvio Apr 15, 2020
c2df96c
Fixed javadoc typo
SgtSilvio Apr 15, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ public static void main(final String[] args) {
responder.toBlocking().connect();

responder.toRx()
.publish(responder.toRx()
.subscribeStreamWith()
.publish(responder.toRx().subscribePublishesWith()
.topicFilter("request/topic")
.applySubscribe()
.map(requestPublish -> Mqtt5Publish.builder()
Expand Down
1 change: 1 addition & 0 deletions gradle/japicc.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def addCheck(Jar jarTask) {
def command = ['perl', rootProject.tasks.japiccDownload.executable.getPath(), '-lib', archiveName,
'-skip-internal-packages', 'com.hivemq.client.internal',
'-skip-internal-packages', 'com.hivemq.shaded',
'-skip-internal-types', 'com.hivemq.client.mqtt.mqtt(5|3).Mqtt(5|3)(Rx|Async|Blocking)Client',
'-non-impl', tasks.japiccNonImpl.nonImplFile.getPath(),
'-check-annotations', '-s',
lastJar.getPath(), jarTask.archiveFile.get().getAsFile().getPath()]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@

package com.hivemq.client.internal.mqtt.mqtt3.reactor;

import com.hivemq.client.internal.mqtt.message.connect.mqtt3.Mqtt3ConnectView;
import com.hivemq.client.internal.mqtt.message.connect.mqtt3.Mqtt3ConnectViewBuilder;
import com.hivemq.client.internal.mqtt.message.subscribe.mqtt3.Mqtt3SubscribeViewBuilder;
import com.hivemq.client.internal.mqtt.message.unsubscribe.mqtt3.Mqtt3UnsubscribeViewBuilder;
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient;
import com.hivemq.client.mqtt.mqtt3.Mqtt3BlockingClient;
Expand Down Expand Up @@ -49,30 +53,78 @@ public Mqtt3ReactorClientView(final @NotNull Mqtt3RxClient delegate) {
this.delegate = delegate;
}

@Override
public @NotNull Mono<Mqtt3ConnAck> connect() {
return connect(Mqtt3ConnectView.DEFAULT);
}

@Override
public @NotNull Mono<Mqtt3ConnAck> connect(final @NotNull Mqtt3Connect connect) {
return RxJava2Adapter.singleToMono(delegate.connect(connect));
}

@Override
public @NotNull Mqtt3ConnectViewBuilder.Nested<Mono<Mqtt3ConnAck>> connectWith() {
return new Mqtt3ConnectViewBuilder.Nested<>(this::connect);
}

@Override
public @NotNull Mono<Mqtt3SubAck> subscribe(final @NotNull Mqtt3Subscribe subscribe) {
return RxJava2Adapter.singleToMono(delegate.subscribe(subscribe));
}

public @NotNull FluxWithSingle<Mqtt3Publish, Mqtt3SubAck> subscribeStream(final @NotNull Mqtt3Subscribe subscribe) {
return FluxWithSingle.from(delegate.subscribeStream(subscribe));
@Override
public @NotNull Mqtt3SubscribeViewBuilder.Nested<Mono<Mqtt3SubAck>> subscribeWith() {
return new Mqtt3SubscribeViewBuilder.Nested<>(this::subscribe);
}

@Override
public @NotNull FluxWithSingle<Mqtt3Publish, Mqtt3SubAck> subscribePublishes(
final @NotNull Mqtt3Subscribe subscribe) {

return subscribePublishes(subscribe, false);
}

@Override
public @NotNull FluxWithSingle<Mqtt3Publish, Mqtt3SubAck> subscribePublishes(
final @NotNull Mqtt3Subscribe subscribe, final boolean manualAcknowledgement) {

return FluxWithSingle.from(delegate.subscribePublishes(subscribe, manualAcknowledgement));
}

@Override
public @NotNull Mqtt3SubscribeViewPublishesBuilder subscribePublishesWith() {
return new Mqtt3SubscribeViewPublishesBuilder();
}

@Override
public @NotNull Flux<Mqtt3Publish> publishes(final @NotNull MqttGlobalPublishFilter filter) {
return RxJava2Adapter.flowableToFlux(delegate.publishes(filter));
return publishes(filter, false);
}

@Override
public @NotNull Flux<Mqtt3Publish> publishes(
final @NotNull MqttGlobalPublishFilter filter, final boolean manualAcknowledgement) {

return RxJava2Adapter.flowableToFlux(delegate.publishes(filter, manualAcknowledgement));
}

@Override
public @NotNull Mono<Void> unsubscribe(final @NotNull Mqtt3Unsubscribe unsubscribe) {
return RxJava2Adapter.completableToMono(delegate.unsubscribe(unsubscribe));
}

@Override
public @NotNull Mqtt3UnsubscribeViewBuilder.Nested<Mono<Void>> unsubscribeWith() {
return new Mqtt3UnsubscribeViewBuilder.Nested<>(this::unsubscribe);
}

@Override
public @NotNull Flux<Mqtt3PublishResult> publish(final @NotNull Publisher<Mqtt3Publish> publisher) {
return RxJava2Adapter.flowableToFlux(delegate.publish(Flowable.fromPublisher(publisher)));
}

@Override
public @NotNull Mono<Void> disconnect() {
return RxJava2Adapter.completableToMono(delegate.disconnect());
}
Expand All @@ -96,4 +148,13 @@ public Mqtt3ReactorClientView(final @NotNull Mqtt3RxClient delegate) {
public @NotNull Mqtt3BlockingClient toBlocking() {
return delegate.toBlocking();
}

private class Mqtt3SubscribeViewPublishesBuilder
extends Mqtt3SubscribeViewBuilder.Publishes<FluxWithSingle<Mqtt3Publish, Mqtt3SubAck>> {

@Override
public @NotNull FluxWithSingle<Mqtt3Publish, Mqtt3SubAck> applySubscribe() {
return subscribePublishes(build(), manualAcknowledgement);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@

package com.hivemq.client.internal.mqtt.reactor;

import com.hivemq.client.internal.mqtt.message.connect.MqttConnect;
import com.hivemq.client.internal.mqtt.message.connect.MqttConnectBuilder;
import com.hivemq.client.internal.mqtt.message.disconnect.MqttDisconnect;
import com.hivemq.client.internal.mqtt.message.disconnect.MqttDisconnectBuilder;
import com.hivemq.client.internal.mqtt.message.subscribe.MqttSubscribeBuilder;
import com.hivemq.client.internal.mqtt.message.unsubscribe.MqttUnsubscribeBuilder;
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
Expand Down Expand Up @@ -51,38 +57,97 @@ public MqttReactorClient(final @NotNull Mqtt5RxClient delegate) {
this.delegate = delegate;
}

@Override
public @NotNull Mono<Mqtt5ConnAck> connect() {
return connect(MqttConnect.DEFAULT);
}

@Override
public @NotNull Mono<Mqtt5ConnAck> connect(final @NotNull Mqtt5Connect connect) {
return RxJava2Adapter.singleToMono(delegate.connect(connect));
}

@Override
public @NotNull MqttConnectBuilder.Nested<Mono<Mqtt5ConnAck>> connectWith() {
return new MqttConnectBuilder.Nested<>(this::connect);
}

@Override
public @NotNull Mono<Mqtt5SubAck> subscribe(final @NotNull Mqtt5Subscribe subscribe) {
return RxJava2Adapter.singleToMono(delegate.subscribe(subscribe));
}

public @NotNull FluxWithSingle<Mqtt5Publish, Mqtt5SubAck> subscribeStream(final @NotNull Mqtt5Subscribe subscribe) {
return FluxWithSingle.from(delegate.subscribeStream(subscribe));
@Override
public @NotNull MqttSubscribeBuilder.Nested<Mono<Mqtt5SubAck>> subscribeWith() {
return new MqttSubscribeBuilder.Nested<>(this::subscribe);
}

@Override
public @NotNull FluxWithSingle<Mqtt5Publish, Mqtt5SubAck> subscribePublishes(
final @NotNull Mqtt5Subscribe subscribe) {

return subscribePublishes(subscribe, false);
}

@Override
public @NotNull FluxWithSingle<Mqtt5Publish, Mqtt5SubAck> subscribePublishes(
final @NotNull Mqtt5Subscribe subscribe, final boolean manualAcknowledgement) {

return FluxWithSingle.from(delegate.subscribePublishes(subscribe, manualAcknowledgement));
}

@Override
public @NotNull MqttSubscribePublishesBuilder subscribePublishesWith() {
return new MqttSubscribePublishesBuilder();
}

@Override
public @NotNull Flux<Mqtt5Publish> publishes(final @NotNull MqttGlobalPublishFilter filter) {
return RxJava2Adapter.flowableToFlux(delegate.publishes(filter));
return publishes(filter, false);
}

@Override
public @NotNull Flux<Mqtt5Publish> publishes(
final @NotNull MqttGlobalPublishFilter filter, final boolean manualAcknowledgement) {

return RxJava2Adapter.flowableToFlux(delegate.publishes(filter, manualAcknowledgement));
}

@Override
public @NotNull Mono<Mqtt5UnsubAck> unsubscribe(final @NotNull Mqtt5Unsubscribe unsubscribe) {
return RxJava2Adapter.singleToMono(delegate.unsubscribe(unsubscribe));
}

@Override
public @NotNull MqttUnsubscribeBuilder.Nested<Mono<Mqtt5UnsubAck>> unsubscribeWith() {
return new MqttUnsubscribeBuilder.Nested<>(this::unsubscribe);
}

@Override
public @NotNull Flux<Mqtt5PublishResult> publish(final @NotNull Publisher<Mqtt5Publish> publisher) {
return RxJava2Adapter.flowableToFlux(delegate.publish(Flowable.fromPublisher(publisher)));
}

@Override
public @NotNull Mono<Void> reauth() {
return RxJava2Adapter.completableToMono(delegate.reauth());
}

@Override
public @NotNull Mono<Void> disconnect() {
return disconnect(MqttDisconnect.DEFAULT);
}

@Override
public @NotNull Mono<Void> disconnect(final @NotNull Mqtt5Disconnect disconnect) {
return RxJava2Adapter.completableToMono(delegate.disconnect(disconnect));
}

@Override
public @NotNull MqttDisconnectBuilder.Nested<Mono<Void>> disconnectWith() {
return new MqttDisconnectBuilder.Nested<>(this::disconnect);
}

@Override
public @NotNull Mqtt5ClientConfig getConfig() {
return delegate.getConfig();
Expand All @@ -102,4 +167,13 @@ public MqttReactorClient(final @NotNull Mqtt5RxClient delegate) {
public @NotNull Mqtt5BlockingClient toBlocking() {
return delegate.toBlocking();
}

private class MqttSubscribePublishesBuilder
extends MqttSubscribeBuilder.Publishes<FluxWithSingle<Mqtt5Publish, Mqtt5SubAck>> {

@Override
public @NotNull FluxWithSingle<Mqtt5Publish, Mqtt5SubAck> applySubscribe() {
return subscribePublishes(build(), manualAcknowledgement);
}
}
}
Loading