Skip to content

Commit 62abf70

Browse files
authored
perf(produce): fix validate compressed records alloc too many memory (#2257)
Signed-off-by: Robin Han <[email protected]>
1 parent cd7d337 commit 62abf70

File tree

3 files changed

+16
-5
lines changed

3 files changed

+16
-5
lines changed

Diff for: core/src/main/java/kafka/automq/zonerouter/NoopProduceRouter.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
package kafka.automq.zonerouter;
1313

1414
import kafka.server.MetadataCache;
15+
import kafka.server.RequestLocal;
1516
import kafka.server.streamaspect.ElasticKafkaApis;
1617

1718
import org.apache.kafka.common.Node;
@@ -44,7 +45,9 @@ public NoopProduceRouter(ElasticKafkaApis kafkaApis, MetadataCache metadataCache
4445
public void handleProduceRequest(short apiVersion, ClientIdMetadata clientId, int timeout, short requiredAcks,
4546
boolean internalTopicsAllowed, String transactionId, Map<TopicPartition, MemoryRecords> entriesPerPartition,
4647
Consumer<Map<TopicPartition, ProduceResponse.PartitionResponse>> responseCallback,
47-
Consumer<Map<TopicPartition, RecordValidationStats>> recordValidationStatsCallback) {
48+
Consumer<Map<TopicPartition, RecordValidationStats>> recordValidationStatsCallback,
49+
RequestLocal requestLocal
50+
) {
4851
kafkaApis.handleProduceAppendJavaCompatible(
4952
timeout,
5053
requiredAcks,
@@ -59,7 +62,8 @@ public void handleProduceRequest(short apiVersion, ClientIdMetadata clientId, in
5962
recordValidationStatsCallback.accept(rst);
6063
return null;
6164
},
62-
apiVersion
65+
apiVersion,
66+
requestLocal
6367
);
6468
}
6569

Diff for: core/src/main/java/kafka/automq/zonerouter/ProduceRouter.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
package kafka.automq.zonerouter;
1313

14+
import kafka.server.RequestLocal;
15+
1416
import org.apache.kafka.common.Node;
1517
import org.apache.kafka.common.TopicPartition;
1618
import org.apache.kafka.common.message.MetadataResponseData;
@@ -36,7 +38,8 @@ void handleProduceRequest(
3638
String transactionId,
3739
Map<TopicPartition, MemoryRecords> entriesPerPartition,
3840
Consumer<Map<TopicPartition, ProduceResponse.PartitionResponse>> responseCallback,
39-
Consumer<Map<TopicPartition, RecordValidationStats>> recordValidationStatsCallback
41+
Consumer<Map<TopicPartition, RecordValidationStats>> recordValidationStatsCallback,
42+
RequestLocal requestLocal
4043
);
4144

4245
CompletableFuture<AutomqZoneRouterResponse> handleZoneRouterRequest(byte[] metadata);

Diff for: core/src/main/scala/kafka/server/streamaspect/ElasticKafkaApis.scala

+6-2
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,7 @@ class ElasticKafkaApis(
395395
authorizedRequestInfo.asJava,
396396
sendResponseCallbackJava,
397397
processingStatsCallbackJava,
398+
requestLocal,
398399
)
399400

400401
// if the request is put into the purgatory, it will have a held reference and hence cannot be garbage collected;
@@ -425,7 +426,9 @@ class ElasticKafkaApis(
425426
entriesPerPartition: util.Map[TopicPartition, MemoryRecords],
426427
responseCallback: util.Map[TopicPartition, PartitionResponse] => Unit,
427428
recordValidationStatsCallback: util.Map[TopicPartition, RecordValidationStats] => Unit = _ => (),
428-
apiVersion: Short): Unit = {
429+
apiVersion: Short,
430+
requestLocal: RequestLocal
431+
): Unit = {
429432
val transactionSupportedOperation = if (apiVersion > 10) genericError else defaultError
430433
replicaManager.handleProduceAppend(
431434
timeout = timeout,
@@ -435,7 +438,8 @@ class ElasticKafkaApis(
435438
entriesPerPartition = entriesPerPartition.asScala,
436439
responseCallback = rst => responseCallback.apply(rst.asJava),
437440
recordValidationStatsCallback = rst => recordValidationStatsCallback.apply(rst.asJava),
438-
transactionSupportedOperation = transactionSupportedOperation
441+
transactionSupportedOperation = transactionSupportedOperation,
442+
requestLocal = requestLocal,
439443
)
440444
}
441445

0 commit comments

Comments
 (0)