Skip to content

Commit a057665

Browse files
committed
Reduce flush calls for many subscribe packets (reduces used network bandwidth when restoring subscriptions)
1 parent d1b72ab commit a057665

File tree

1 file changed

+7
-2
lines changed

1 file changed

+7
-2
lines changed

Diff for: src/main/java/com/hivemq/client/internal/mqtt/handler/subscribe/MqttSubscriptionHandler.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ public void run() {
164164
if (ctx == null) {
165165
return;
166166
}
167+
int written = 0;
167168
for (MqttSubOrUnsubWithFlow subOrUnsubWithFlow = sendPending;
168169
(subOrUnsubWithFlow != null) && (pendingIndex.size() < MAX_SUB_PENDING);
169170
sendPending = subOrUnsubWithFlow = subOrUnsubWithFlow.getNext()) {
@@ -183,6 +184,10 @@ public void run() {
183184
} else {
184185
writeUnsubscribe(ctx, (MqttUnsubscribeWithFlow) subOrUnsubWithFlow);
185186
}
187+
written++;
188+
}
189+
if (written > 0) {
190+
ctx.flush();
186191
}
187192
}
188193

@@ -195,7 +200,7 @@ private void writeSubscribe(
195200
subscribeWithFlow.subscribe.createStateful(subscribeWithFlow.packetIdentifier, subscriptionIdentifier);
196201

197202
currentPending = subscribeWithFlow;
198-
ctx.writeAndFlush(statefulSubscribe, ctx.voidPromise());
203+
ctx.write(statefulSubscribe, ctx.voidPromise());
199204
currentPending = null;
200205
}
201206

@@ -206,7 +211,7 @@ private void writeUnsubscribe(
206211
unsubscribeWithFlow.unsubscribe.createStateful(unsubscribeWithFlow.packetIdentifier);
207212

208213
currentPending = unsubscribeWithFlow;
209-
ctx.writeAndFlush(statefulUnsubscribe, ctx.voidPromise());
214+
ctx.write(statefulUnsubscribe, ctx.voidPromise());
210215
currentPending = null;
211216
}
212217

0 commit comments

Comments
 (0)