Skip to content

Commit 080bae4

Browse files
authored
feat(tools/perf): support schema message perf (#2227)
Signed-off-by: Robin Han <[email protected]>
1 parent 504761b commit 080bae4

File tree

6 files changed

+79
-8
lines changed

6 files changed

+79
-8
lines changed

Diff for: build.gradle

+4
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,9 @@ allprojects {
130130

131131
repositories {
132132
mavenCentral()
133+
maven {
134+
url = uri("https://packages.confluent.io/maven/")
135+
}
133136
}
134137

135138
dependencyUpdates {
@@ -2246,6 +2249,7 @@ project(':tools') {
22462249
// AutoMQ inject start
22472250
implementation project(':automq-shell')
22482251
implementation libs.guava
2252+
implementation libs.kafkaAvroSerializer
22492253
// AutoMQ inject end
22502254

22512255
// for SASL/OAUTHBEARER JWT validation

Diff for: gradle/dependencies.gradle

+2
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ versions += [
178178
guava:"32.0.1-jre",
179179
hdrHistogram:"2.1.12",
180180
nettyTcnativeBoringSsl: "2.0.65.Final",
181+
confluentSchema: "7.4.0",
181182
// AutoMQ inject end
182183

183184
// When updating the zstd version, please do as well in docker/native/native-image-configs/resource-config.json
@@ -309,5 +310,6 @@ libs += [
309310
jna: "net.java.dev.jna:jna:$versions.jna",
310311
guava: "com.google.guava:guava:$versions.guava",
311312
hdrHistogram: "org.hdrhistogram:HdrHistogram:$versions.hdrHistogram",
313+
kafkaAvroSerializer: "io.confluent:kafka-avro-serializer:$versions.confluentSchema",
312314
spotbugsAnnotations: "com.github.spotbugs:spotbugs-annotations:$versions.spotbugs",
313315
]

Diff for: tools/src/main/java/org/apache/kafka/tools/automq/PerfCommand.java

+44-1
Original file line numberDiff line numberDiff line change
@@ -24,22 +24,34 @@
2424
import com.automq.stream.s3.metrics.TimerUtil;
2525
import com.fasterxml.jackson.databind.ObjectMapper;
2626
import com.fasterxml.jackson.databind.ObjectWriter;
27+
import com.google.common.base.Strings;
2728

2829
import org.slf4j.Logger;
2930
import org.slf4j.LoggerFactory;
3031

3132
import java.io.File;
33+
import java.io.IOException;
34+
import java.nio.charset.StandardCharsets;
35+
import java.nio.file.Files;
36+
import java.nio.file.Path;
3237
import java.text.SimpleDateFormat;
3338
import java.time.Duration;
3439
import java.util.ArrayList;
3540
import java.util.Collections;
3641
import java.util.Date;
42+
import java.util.HashMap;
3743
import java.util.List;
44+
import java.util.Map;
3845
import java.util.Random;
3946
import java.util.Set;
4047
import java.util.concurrent.ConcurrentHashMap;
4148
import java.util.concurrent.ThreadLocalRandom;
4249
import java.util.concurrent.TimeUnit;
50+
import java.util.function.Function;
51+
52+
import io.confluent.kafka.schemaregistry.avro.AvroSchema;
53+
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
54+
import io.confluent.kafka.serializers.KafkaAvroSerializer;
4355

4456
import static org.apache.kafka.tools.automq.perf.StatsCollector.printAndCollectStats;
4557

@@ -105,7 +117,7 @@ private void run() {
105117
waitTopicsReady(consumerService.consumerCount() > 0);
106118
LOGGER.info("Topics are ready, took {} ms", timer.elapsedAndResetAs(TimeUnit.MILLISECONDS));
107119

108-
List<byte[]> payloads = randomPayloads(config.recordSize, config.randomRatio, config.randomPoolSize);
120+
Function<String, List<byte[]>> payloads = payloads(config, topics);
109121
producerService.start(payloads, config.sendRate);
110122

111123
preparing = false;
@@ -209,6 +221,21 @@ private void waitTopicsReadyWithoutConsumer() {
209221
}
210222
}
211223

224+
private Function<String, List<byte[]>> payloads(PerfConfig config, List<Topic> topics) {
225+
if (Strings.isNullOrEmpty(config.valueSchema)) {
226+
List<byte[]> payloads = randomPayloads(config.recordSize, config.randomRatio, config.randomPoolSize);
227+
return topic -> payloads;
228+
} else {
229+
// The producer configs should contain:
230+
// - schema.registry.url: http://localhost:8081
231+
Map<String, List<byte[]>> topic2payloads = new HashMap<>();
232+
topics.forEach(topic -> {
233+
topic2payloads.put(topic.name(), schemaPayloads(topic.name(), config.valueSchema, config.valuesFile, config.producerConfigs));
234+
});
235+
return topic2payloads::get;
236+
}
237+
}
238+
212239
/**
213240
* Generates a list of byte arrays with specified size and mix of random and static content.
214241
*
@@ -271,4 +298,20 @@ public void close() {
271298
producerService.close();
272299
consumerService.close();
273300
}
301+
302+
private static List<byte[]> schemaPayloads(String topic, String schemaJson, String payloadsFile, Map<String, ?> configs) {
303+
try (KafkaAvroSerializer serializer = new KafkaAvroSerializer()) {
304+
List<byte[]> payloads = new ArrayList<>();
305+
AvroSchema schema = new AvroSchema(schemaJson);
306+
serializer.configure(configs, false);
307+
for (String payloadStr : Files.readAllLines(Path.of(payloadsFile), StandardCharsets.UTF_8)) {
308+
Object object = AvroSchemaUtils.toObject(payloadStr, schema);
309+
byte[] payload = serializer.serialize(topic, object);
310+
payloads.add(payload);
311+
}
312+
return payloads;
313+
} catch (IOException ex) {
314+
throw new RuntimeException(ex);
315+
}
316+
}
274317
}

Diff for: tools/src/main/java/org/apache/kafka/tools/automq/perf/PerfConfig.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ public class PerfConfig {
5656
public final int warmupDurationMinutes;
5757
public final int testDurationMinutes;
5858
public final int reportingIntervalSeconds;
59+
public final String valueSchema;
60+
public final String valuesFile;
5961

6062
public PerfConfig(String[] args) {
6163
ArgumentParser parser = parser();
@@ -77,7 +79,7 @@ public PerfConfig(String[] args) {
7779
producerConfigs = parseConfigs(ns.getList("producerConfigs"));
7880
consumerConfigs = parseConfigs(ns.getList("consumerConfigs"));
7981
reset = ns.getBoolean("reset");
80-
topicPrefix = ns.getString("topicPrefix") == null ? "test-topic-" + System.currentTimeMillis() : ns.getString("topicPrefix");
82+
topicPrefix = ns.getString("topicPrefix") == null ? "test_topic_" + System.currentTimeMillis() : ns.getString("topicPrefix");
8183
topics = ns.getInt("topics");
8284
partitionsPerTopic = ns.getInt("partitionsPerTopic");
8385
producersPerTopic = ns.getInt("producersPerTopic");
@@ -93,6 +95,8 @@ public PerfConfig(String[] args) {
9395
warmupDurationMinutes = ns.getInt("warmupDurationMinutes");
9496
testDurationMinutes = ns.getInt("testDurationMinutes");
9597
reportingIntervalSeconds = ns.getInt("reportingIntervalSeconds");
98+
valueSchema = ns.getString("valueSchema");
99+
valuesFile = ns.get("valuesFile");
96100

97101
if (backlogDurationSeconds < groupsPerTopic * groupStartDelaySeconds) {
98102
throw new IllegalArgumentException(String.format("BACKLOG_DURATION_SECONDS(%d) should not be less than GROUPS_PER_TOPIC(%d) * GROUP_START_DELAY_SECONDS(%d)",
@@ -233,6 +237,16 @@ public static ArgumentParser parser() {
233237
.dest("reportingIntervalSeconds")
234238
.metavar("REPORTING_INTERVAL_SECONDS")
235239
.help("The reporting interval in seconds.");
240+
parser.addArgument("--value-schema")
241+
.type(String.class)
242+
.dest("valueSchema")
243+
.metavar("VALUE_SCHEMA")
244+
.help("The schema of the values ex. {\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}");
245+
parser.addArgument("--values-file")
246+
.type(String.class)
247+
.dest("valuesFile")
248+
.metavar("VALUES_FILE")
249+
.help("The avro value file. Example file content {\"f1\": \"value1\"}");
236250
return parser;
237251
}
238252

Diff for: tools/src/main/java/org/apache/kafka/tools/automq/perf/ProducerService.java

+9-5
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.concurrent.ScheduledExecutorService;
4242
import java.util.concurrent.ThreadLocalRandom;
4343
import java.util.concurrent.TimeUnit;
44+
import java.util.function.Function;
4445
import java.util.stream.Collectors;
4546
import java.util.stream.IntStream;
4647

@@ -120,7 +121,7 @@ public int probe() {
120121
/**
121122
* Start sending messages using the given payloads at the given rate.
122123
*/
123-
public void start(List<byte[]> payloads, double rate) {
124+
public void start(Function<String, List<byte[]>> payloads, double rate) {
124125
adjustRate(rate);
125126
adjustRateExecutor.scheduleWithFixedDelay(this::adjustRate, 0, ADJUST_RATE_INTERVAL_SECONDS, TimeUnit.SECONDS);
126127
int processors = Runtime.getRuntime().availableProcessors();
@@ -176,7 +177,7 @@ private double calculateY(long x1, double y1, long x2, double y2, long x) {
176177
return y1 + (x - x1) * (y2 - y1) / (x2 - x1);
177178
}
178179

179-
private void start(List<Producer> producers, List<byte[]> payloads) {
180+
private void start(List<Producer> producers, Function<String, List<byte[]>> payloads) {
180181
executor.submit(() -> {
181182
try {
182183
sendMessages(producers, payloads);
@@ -186,11 +187,14 @@ private void start(List<Producer> producers, List<byte[]> payloads) {
186187
});
187188
}
188189

189-
private void sendMessages(List<Producer> producers, List<byte[]> payloads) {
190+
private void sendMessages(List<Producer> producers, Function<String, List<byte[]>> payloadsGet) {
190191
Random random = ThreadLocalRandom.current();
191192
while (!closed) {
192193
producers.forEach(
193-
p -> sendMessage(p, payloads.get(random.nextInt(payloads.size())))
194+
p -> {
195+
List<byte[]> payloads = payloadsGet.apply(p.topic.name);
196+
sendMessage(p, payloads.get(random.nextInt(payloads.size())));
197+
}
194198
);
195199
}
196200
}
@@ -290,7 +294,7 @@ private String nextKey() {
290294
*/
291295
public List<CompletableFuture<Void>> probe() {
292296
return IntStream.range(0, topic.partitions)
293-
.mapToObj(i -> sendAsync("probe", new byte[42], i))
297+
.mapToObj(i -> sendAsync("probe", new byte[] {1}, i))
294298
.collect(Collectors.toList());
295299
}
296300

Diff for: tools/src/main/java/org/apache/kafka/tools/automq/perf/TopicService.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ private void waitTopicCreated(String name, KafkaFuture<Void> future) {
114114
}
115115

116116
private String generateTopicName(String topicPrefix, int partitions, int index) {
117-
return String.format("%s-%04d-%07d", topicPrefix, partitions, index);
117+
return String.format("%s_%04d_%07d", topicPrefix, partitions, index);
118118
}
119119

120120
public static class TopicsConfig {
@@ -140,6 +140,10 @@ public Topic(String name, int partitions) {
140140
this.partitions = partitions;
141141
}
142142

143+
public String name() {
144+
return name;
145+
}
146+
143147
public List<TopicPartition> partitions() {
144148
return IntStream.range(0, partitions)
145149
.mapToObj(i -> new TopicPartition(name, i))

0 commit comments

Comments
 (0)