Skip to content

feat(tools/perf): support schema message perf #2227

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ allprojects {

repositories {
mavenCentral()
maven {
url = uri("https://packages.confluent.io/maven/")
}
}

dependencyUpdates {
Expand Down Expand Up @@ -2246,6 +2249,7 @@ project(':tools') {
// AutoMQ inject start
implementation project(':automq-shell')
implementation libs.guava
implementation libs.kafkaAvroSerializer
// AutoMQ inject end

// for SASL/OAUTHBEARER JWT validation
Expand Down
2 changes: 2 additions & 0 deletions gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ versions += [
guava:"32.0.1-jre",
hdrHistogram:"2.1.12",
nettyTcnativeBoringSsl: "2.0.65.Final",
confluentSchema: "7.4.0",
// AutoMQ inject end

// When updating the zstd version, please do as well in docker/native/native-image-configs/resource-config.json
Expand Down Expand Up @@ -309,5 +310,6 @@ libs += [
jna: "net.java.dev.jna:jna:$versions.jna",
guava: "com.google.guava:guava:$versions.guava",
hdrHistogram: "org.hdrhistogram:HdrHistogram:$versions.hdrHistogram",
kafkaAvroSerializer: "io.confluent:kafka-avro-serializer:$versions.confluentSchema",
spotbugsAnnotations: "com.github.spotbugs:spotbugs-annotations:$versions.spotbugs",
]
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,34 @@
import com.automq.stream.s3.metrics.TimerUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.base.Strings;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import io.confluent.kafka.schemaregistry.avro.AvroSchema;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils;
import io.confluent.kafka.serializers.KafkaAvroSerializer;

import static org.apache.kafka.tools.automq.perf.StatsCollector.printAndCollectStats;

Expand Down Expand Up @@ -105,7 +117,7 @@ private void run() {
waitTopicsReady(consumerService.consumerCount() > 0);
LOGGER.info("Topics are ready, took {} ms", timer.elapsedAndResetAs(TimeUnit.MILLISECONDS));

List<byte[]> payloads = randomPayloads(config.recordSize, config.randomRatio, config.randomPoolSize);
Function<String, List<byte[]>> payloads = payloads(config, topics);
producerService.start(payloads, config.sendRate);

preparing = false;
Expand Down Expand Up @@ -209,6 +221,21 @@ private void waitTopicsReadyWithoutConsumer() {
}
}

private Function<String, List<byte[]>> payloads(PerfConfig config, List<Topic> topics) {
if (Strings.isNullOrEmpty(config.valueSchema)) {
List<byte[]> payloads = randomPayloads(config.recordSize, config.randomRatio, config.randomPoolSize);
return topic -> payloads;
} else {
// The producer configs should contain:
// - schema.registry.url: http://localhost:8081
Map<String, List<byte[]>> topic2payloads = new HashMap<>();
topics.forEach(topic -> {
topic2payloads.put(topic.name(), schemaPayloads(topic.name(), config.valueSchema, config.valuesFile, config.producerConfigs));
});
return topic2payloads::get;
}
}

/**
* Generates a list of byte arrays with specified size and mix of random and static content.
*
Expand Down Expand Up @@ -271,4 +298,20 @@ public void close() {
producerService.close();
consumerService.close();
}

private static List<byte[]> schemaPayloads(String topic, String schemaJson, String payloadsFile, Map<String, ?> configs) {
try (KafkaAvroSerializer serializer = new KafkaAvroSerializer()) {
List<byte[]> payloads = new ArrayList<>();
AvroSchema schema = new AvroSchema(schemaJson);
serializer.configure(configs, false);
for (String payloadStr : Files.readAllLines(Path.of(payloadsFile), StandardCharsets.UTF_8)) {
Object object = AvroSchemaUtils.toObject(payloadStr, schema);
byte[] payload = serializer.serialize(topic, object);
payloads.add(payload);
}
return payloads;
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public class PerfConfig {
public final int warmupDurationMinutes;
public final int testDurationMinutes;
public final int reportingIntervalSeconds;
public final String valueSchema;
public final String valuesFile;

public PerfConfig(String[] args) {
ArgumentParser parser = parser();
Expand All @@ -77,7 +79,7 @@ public PerfConfig(String[] args) {
producerConfigs = parseConfigs(ns.getList("producerConfigs"));
consumerConfigs = parseConfigs(ns.getList("consumerConfigs"));
reset = ns.getBoolean("reset");
topicPrefix = ns.getString("topicPrefix") == null ? "test-topic-" + System.currentTimeMillis() : ns.getString("topicPrefix");
topicPrefix = ns.getString("topicPrefix") == null ? "test_topic_" + System.currentTimeMillis() : ns.getString("topicPrefix");
topics = ns.getInt("topics");
partitionsPerTopic = ns.getInt("partitionsPerTopic");
producersPerTopic = ns.getInt("producersPerTopic");
Expand All @@ -93,6 +95,8 @@ public PerfConfig(String[] args) {
warmupDurationMinutes = ns.getInt("warmupDurationMinutes");
testDurationMinutes = ns.getInt("testDurationMinutes");
reportingIntervalSeconds = ns.getInt("reportingIntervalSeconds");
valueSchema = ns.getString("valueSchema");
valuesFile = ns.get("valuesFile");

if (backlogDurationSeconds < groupsPerTopic * groupStartDelaySeconds) {
throw new IllegalArgumentException(String.format("BACKLOG_DURATION_SECONDS(%d) should not be less than GROUPS_PER_TOPIC(%d) * GROUP_START_DELAY_SECONDS(%d)",
Expand Down Expand Up @@ -233,6 +237,16 @@ public static ArgumentParser parser() {
.dest("reportingIntervalSeconds")
.metavar("REPORTING_INTERVAL_SECONDS")
.help("The reporting interval in seconds.");
parser.addArgument("--value-schema")
.type(String.class)
.dest("valueSchema")
.metavar("VALUE_SCHEMA")
.help("The schema of the values ex. {\"type\":\"record\",\"name\":\"myrecord\",\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}");
parser.addArgument("--values-file")
.type(String.class)
.dest("valuesFile")
.metavar("VALUES_FILE")
.help("The avro value file. Example file content {\"f1\": \"value1\"}");
return parser;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand Down Expand Up @@ -120,7 +121,7 @@ public int probe() {
/**
* Start sending messages using the given payloads at the given rate.
*/
public void start(List<byte[]> payloads, double rate) {
public void start(Function<String, List<byte[]>> payloads, double rate) {
adjustRate(rate);
adjustRateExecutor.scheduleWithFixedDelay(this::adjustRate, 0, ADJUST_RATE_INTERVAL_SECONDS, TimeUnit.SECONDS);
int processors = Runtime.getRuntime().availableProcessors();
Expand Down Expand Up @@ -176,7 +177,7 @@ private double calculateY(long x1, double y1, long x2, double y2, long x) {
return y1 + (x - x1) * (y2 - y1) / (x2 - x1);
}

private void start(List<Producer> producers, List<byte[]> payloads) {
private void start(List<Producer> producers, Function<String, List<byte[]>> payloads) {
executor.submit(() -> {
try {
sendMessages(producers, payloads);
Expand All @@ -186,11 +187,14 @@ private void start(List<Producer> producers, List<byte[]> payloads) {
});
}

private void sendMessages(List<Producer> producers, List<byte[]> payloads) {
private void sendMessages(List<Producer> producers, Function<String, List<byte[]>> payloadsGet) {
Random random = ThreadLocalRandom.current();
while (!closed) {
producers.forEach(
p -> sendMessage(p, payloads.get(random.nextInt(payloads.size())))
p -> {
List<byte[]> payloads = payloadsGet.apply(p.topic.name);
sendMessage(p, payloads.get(random.nextInt(payloads.size())));
}
);
}
}
Expand Down Expand Up @@ -290,7 +294,7 @@ private String nextKey() {
*/
public List<CompletableFuture<Void>> probe() {
return IntStream.range(0, topic.partitions)
.mapToObj(i -> sendAsync("probe", new byte[42], i))
.mapToObj(i -> sendAsync("probe", new byte[] {1}, i))
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private void waitTopicCreated(String name, KafkaFuture<Void> future) {
}

private String generateTopicName(String topicPrefix, int partitions, int index) {
return String.format("%s-%04d-%07d", topicPrefix, partitions, index);
return String.format("%s_%04d_%07d", topicPrefix, partitions, index);
}

public static class TopicsConfig {
Expand All @@ -140,6 +140,10 @@ public Topic(String name, int partitions) {
this.partitions = partitions;
}

public String name() {
return name;
}

public List<TopicPartition> partitions() {
return IntStream.range(0, partitions)
.mapToObj(i -> new TopicPartition(name, i))
Expand Down
Loading