Skip to content

Commit b6be650

Browse files
authored
Add basic IN/OUT bytes metrics (#1187)
### Motivation We need a way to measure the network traffic originating directly from KOP clients ### Modifications Add two simple metrics KOP_NETWORK_BYTES_IN and KOP_NETWORK_BYTES_OUT. Please note that this metrics are useful to track user usages of resources. Especially on the Consumer side, Kafka model is a pull model so even if a topic is empty, an active Consumer is continues to send FETCH requests (and this is a network cost)
1 parent f1abfe9 commit b6be650

File tree

5 files changed

+65
-4
lines changed

5 files changed

+65
-4
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaCommandDecoder.java

+17-4
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ protected Boolean channelReady() {
185185
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
186186
// Get a buffer that contains the full frame
187187
ByteBuf buffer = (ByteBuf) msg;
188+
requestStats.getNetworkTotalBytesIn().add(buffer.readableBytes());
188189

189190
// Update parse request latency metrics
190191
final BiConsumer<Long, Throwable> registerRequestParseLatency = (timeBeforeParse, throwable) -> {
@@ -405,7 +406,7 @@ protected void writeAndFlushResponseToClient(Channel channel) {
405406
if (responseFuture.isCompletedExceptionally()) {
406407
responseFuture.exceptionally(e -> {
407408
log.error("[{}] request {} completed exceptionally", channel, request.getHeader(), e);
408-
channel.writeAndFlush(request.createErrorResponse(e));
409+
sendErrorResponse(request, channel, e);
409410

410411
requestStats.getRequestStatsLogger(apiKey, KopServerStats.REQUEST_QUEUED_LATENCY)
411412
.registerFailedEvent(nanoSecondsSinceCreated, TimeUnit.NANOSECONDS);
@@ -421,7 +422,7 @@ protected void writeAndFlushResponseToClient(Channel channel) {
421422
// It should not be null, just check it for safety
422423
log.error("[{}] Unexpected null completed future for request {}",
423424
ctx.channel(), request.getHeader());
424-
channel.writeAndFlush(request.createErrorResponse(new ApiException("response is null")));
425+
sendErrorResponse(request, channel, new ApiException("response is null"));
425426
return;
426427
}
427428
if (log.isDebugEnabled()) {
@@ -432,12 +433,15 @@ protected void writeAndFlushResponseToClient(Channel channel) {
432433
}
433434

434435
final ByteBuf result = responseToByteBuf(response, request);
436+
final int resultSize = result.readableBytes();
435437
channel.writeAndFlush(result).addListener(future -> {
436438
if (response instanceof ResponseCallbackWrapper) {
437439
((ResponseCallbackWrapper) response).responseComplete();
438440
}
439441
if (!future.isSuccess()) {
440442
log.error("[{}] Failed to write {}", channel, request.getHeader(), future.cause());
443+
} else {
444+
requestStats.getNetworkTotalBytesOut().add(resultSize);
441445
}
442446
});
443447
requestStats.getRequestStatsLogger(apiKey, KopServerStats.REQUEST_QUEUED_LATENCY)
@@ -451,14 +455,23 @@ protected void writeAndFlushResponseToClient(Channel channel) {
451455
log.error("[{}] request {} is not completed for {} ns (> {} ms)",
452456
channel, request.getHeader(), nanoSecondsSinceCreated, kafkaConfig.getRequestTimeoutMs());
453457
responseFuture.cancel(true);
454-
channel.writeAndFlush(
455-
request.createErrorResponse(new ApiException("request is expired from server side")));
458+
sendErrorResponse(request, channel, new ApiException("request is expired from server side"));
456459
requestStats.getRequestStatsLogger(apiKey, KopServerStats.REQUEST_QUEUED_LATENCY)
457460
.registerFailedEvent(nanoSecondsSinceCreated, TimeUnit.NANOSECONDS);
458461
}
459462
}
460463
}
461464

465+
private void sendErrorResponse(KafkaHeaderAndRequest request, Channel channel, Throwable customError) {
466+
ByteBuf result = request.createErrorResponse(customError);
467+
final int resultSize = result.readableBytes();
468+
channel.writeAndFlush(result).addListener(future -> {
469+
if (future.isSuccess()) {
470+
requestStats.getNetworkTotalBytesOut().add(resultSize);
471+
}
472+
});
473+
}
474+
462475
protected abstract boolean hasAuthenticated();
463476

464477
protected abstract void channelPrepare(ChannelHandlerContext ctx,

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KopServerStats.java

+6
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,10 @@ public interface KopServerStats {
9292
String KOP_EVENT_QUEUE_SIZE = "KOP_EVENT_QUEUE_SIZE";
9393
String KOP_EVENT_QUEUED_LATENCY = "KOP_EVENT_QUEUED_LATENCY";
9494
String KOP_EVENT_LATENCY = "KOP_EVENT_LATENCY";
95+
96+
/**
97+
* Network stats.
98+
*/
99+
String NETWORK_TOTAL_BYTES_IN = "NETWORK_TOTAL_BYTES_IN";
100+
String NETWORK_TOTAL_BYTES_OUT = "NETWORK_TOTAL_BYTES_OUT";
95101
}

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/RequestStats.java

+16
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import static io.streamnative.pulsar.handlers.kop.KopServerStats.MESSAGE_PUBLISH;
2222
import static io.streamnative.pulsar.handlers.kop.KopServerStats.MESSAGE_QUEUED_LATENCY;
2323
import static io.streamnative.pulsar.handlers.kop.KopServerStats.MESSAGE_READ;
24+
import static io.streamnative.pulsar.handlers.kop.KopServerStats.NETWORK_TOTAL_BYTES_IN;
25+
import static io.streamnative.pulsar.handlers.kop.KopServerStats.NETWORK_TOTAL_BYTES_OUT;
2426
import static io.streamnative.pulsar.handlers.kop.KopServerStats.PREPARE_METADATA;
2527
import static io.streamnative.pulsar.handlers.kop.KopServerStats.PRODUCE_ENCODE;
2628
import static io.streamnative.pulsar.handlers.kop.KopServerStats.REQUEST_PARSE_LATENCY;
@@ -121,6 +123,18 @@ public class RequestStats {
121123
)
122124
private final OpStatsLogger fetchDecodeStats;
123125

126+
@StatsDoc(
127+
name = NETWORK_TOTAL_BYTES_IN,
128+
help = "total bytes received"
129+
)
130+
private final Counter networkTotalBytesIn;
131+
132+
@StatsDoc(
133+
name = NETWORK_TOTAL_BYTES_OUT,
134+
help = "total bytes sent out"
135+
)
136+
private final Counter networkTotalBytesOut;
137+
124138
private final Map<ApiKeys, StatsLogger> apiKeysToStatsLogger = new ConcurrentHashMap<>();
125139

126140
public RequestStats(StatsLogger statsLogger) {
@@ -138,6 +152,8 @@ public RequestStats(StatsLogger statsLogger) {
138152
this.prepareMetadataStats = statsLogger.getOpStatsLogger(PREPARE_METADATA);
139153
this.messageReadStats = statsLogger.getOpStatsLogger(MESSAGE_READ);
140154
this.fetchDecodeStats = statsLogger.getOpStatsLogger(FETCH_DECODE);
155+
this.networkTotalBytesIn = statsLogger.getCounter(NETWORK_TOTAL_BYTES_IN);
156+
this.networkTotalBytesOut = statsLogger.getCounter(NETWORK_TOTAL_BYTES_OUT);
141157

142158
statsLogger.registerGauge(REQUEST_QUEUE_SIZE, new Gauge<Number>() {
143159
@Override

tests/src/test/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandlerTest.java

+22
Original file line numberDiff line numberDiff line change
@@ -1093,6 +1093,28 @@ public void testMaxMessageSize() throws PulsarAdminException {
10931093
assertTrue(causeException instanceof RecordTooLargeException);
10941094
}
10951095

1096+
@Test
1097+
public void testNetworkMetrics() throws Exception {
1098+
String topicName = "testNetworkMetrics";
1099+
1100+
// create partitioned topic.
1101+
admin.topics().createPartitionedTopic(topicName, 1);
1102+
Properties props = new Properties();
1103+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost" + ":" + getKafkaBrokerPort());
1104+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
1105+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
1106+
@Cleanup
1107+
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
1108+
producer.send(new ProducerRecord<>(topicName, "key", "value")).get();
1109+
1110+
KafkaProtocolHandler protocolHandler = getProtocolHandler();
1111+
long bytesIn = protocolHandler.getRequestStats().getNetworkTotalBytesIn().get();
1112+
long bytesOut = protocolHandler.getRequestStats().getNetworkTotalBytesOut().get();
1113+
1114+
assertTrue(bytesIn > 0);
1115+
assertTrue(bytesOut > 0);
1116+
}
1117+
10961118

10971119
@DataProvider(name = "allowAutoTopicCreation")
10981120
public static Object[][] allowAutoTopicCreation() {

tests/src/test/java/io/streamnative/pulsar/handlers/kop/MetricsProviderTest.java

+4
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,10 @@ public void testMetricsProvider() throws Exception {
230230
Assert.assertTrue(sb.toString().contains("kop_server_PRODUCE_MESSAGE_CONVERSIONS{partition=\"0\","
231231
+ "topic=\"kopKafkaProducePulsarMetrics1\"} 10"));
232232
Assert.assertTrue(sb.toString().contains("kop_server_PRODUCE_MESSAGE_CONVERSIONS_TIME_NANOS"));
233+
234+
// network stats
235+
Assert.assertTrue(sb.toString().contains("NETWORK_TOTAL_BYTES_IN"));
236+
Assert.assertTrue(sb.toString().contains("NETWORK_TOTAL_BYTES_OUT"));
233237
}
234238

235239
@Test(timeOut = 20000)

0 commit comments

Comments
 (0)