Skip to content

Commit 2c2b30d

Browse files
jolshancmccabe
authored andcommitted
MINOR: Add RandomComponentPayloadGenerator and update Trogdor documentation (apache#7103)
Add a new RandomComponentPayloadGenerator that gives a payload based on random selection of another PayloadGenerator. Additionally, add an example that uses a non-default PayloadGenerator configuration to TROGDOR.md. Reviewers: Colin P. McCabe <[email protected]>
1 parent 6fbac3c commit 2c2b30d

File tree

5 files changed

+290
-3
lines changed

5 files changed

+290
-3
lines changed

TROGDOR.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,36 @@ The task specification is usually written as JSON. For example, this task speci
8787
"durationMs": 30000,
8888
"partitions": [["node1", "node2"], ["node3"]]
8989
}
90+
91+
This task runs a simple ProduceBench test on a cluster with one producer node, 5 topics, and 10,000 messages per second.
92+
The keys are generated sequentially and the configured partitioner (DefaultPartitioner) is used.
93+
94+
{
95+
"class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
96+
"durationMs": 10000000,
97+
"producerNode": "node0",
98+
"bootstrapServers": "localhost:9092",
99+
"targetMessagesPerSec": 10000,
100+
"maxMessages": 50000,
101+
"activeTopics": {
102+
"foo[1-3]": {
103+
"numPartitions": 10,
104+
"replicationFactor": 1
105+
}
106+
},
107+
"inactiveTopics": {
108+
"foo[4-5]": {
109+
"numPartitions": 10,
110+
"replicationFactor": 1
111+
}
112+
},
113+
"keyGenerator": {
114+
"type": "sequential",
115+
"size": 8,
116+
"offset": 1
117+
},
118+
"useConfiguredPartitioner": true
119+
}
90120

91121
Tasks are submitted to the coordinator. Once the coordinator determines that it is time for the task to start, it creates workers on agent processes. The workers run until the task is done.
92122

tools/src/main/java/org/apache/kafka/trogdor/workload/PayloadGenerator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@
3434
@JsonSubTypes.Type(value = ConstantPayloadGenerator.class, name = "constant"),
3535
@JsonSubTypes.Type(value = SequentialPayloadGenerator.class, name = "sequential"),
3636
@JsonSubTypes.Type(value = UniformRandomPayloadGenerator.class, name = "uniformRandom"),
37-
@JsonSubTypes.Type(value = NullPayloadGenerator.class, name = "null")
37+
@JsonSubTypes.Type(value = NullPayloadGenerator.class, name = "null"),
38+
@JsonSubTypes.Type(value = RandomComponentPayloadGenerator.class, name = "randomComponent")
3839
})
3940
public interface PayloadGenerator {
4041
/**
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.trogdor.workload;
19+
20+
import com.fasterxml.jackson.annotation.JsonCreator;
21+
import com.fasterxml.jackson.annotation.JsonProperty;
22+
23+
/**
24+
* Contains a percent value represented as an integer between 1 and 100 and a PayloadGenerator to specify
25+
* how often that PayloadGenerator should be used.
26+
*/
27+
public class RandomComponent {
28+
private final int percent;
29+
private final PayloadGenerator component;
30+
31+
32+
@JsonCreator
33+
public RandomComponent(@JsonProperty("percent") int percent,
34+
@JsonProperty("component") PayloadGenerator component) {
35+
this.percent = percent;
36+
this.component = component;
37+
}
38+
39+
@JsonProperty
40+
public int percent() {
41+
return percent;
42+
}
43+
44+
@JsonProperty
45+
public PayloadGenerator component() {
46+
return component;
47+
}
48+
}
49+
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kafka.trogdor.workload;
19+
20+
import com.fasterxml.jackson.annotation.JsonCreator;
21+
import com.fasterxml.jackson.annotation.JsonProperty;
22+
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import java.util.Random;
26+
27+
28+
/**
29+
* A PayloadGenerator which generates pseudo-random payloads based on other PayloadGenerators.
30+
*
31+
* Given a seed and non-null list of RandomComponents, RandomComponentPayloadGenerator
32+
* will use any given generator in its list of components a percentage of the time based on the
33+
* percent field in the RandomComponent. These percent fields must be integers greater than 0
34+
* and together add up to 100. The payloads generated can be reproduced from run to run.
35+
*
36+
* An example of how to include this generator in a Trogdor taskSpec is shown below.
37+
* #{@code
38+
* "keyGenerator": {
39+
* "type": "randomComponent",
40+
* "seed": 456,
41+
* "components": [
42+
* {
43+
* "percent": 50,
44+
* "component": {
45+
* "type": "null"
46+
* }
47+
* },
48+
* {
49+
* "percent": 50,
50+
* "component": {
51+
* "type": "uniformRandom",
52+
* "size": 4,
53+
* "seed": 123,
54+
* "padding": 0
55+
* }
56+
* }
57+
* ]
58+
* }
59+
* }
60+
*/
61+
public class RandomComponentPayloadGenerator implements PayloadGenerator {
62+
private final long seed;
63+
private final List<RandomComponent> components;
64+
private final Random random = new Random();
65+
66+
@JsonCreator
67+
public RandomComponentPayloadGenerator(@JsonProperty("seed") long seed,
68+
@JsonProperty("components") List<RandomComponent> components) {
69+
this.seed = seed;
70+
if (components == null || components.isEmpty()) {
71+
throw new IllegalArgumentException("Components must be a specified, non-empty list of RandomComponents.");
72+
}
73+
int sum = 0;
74+
for (RandomComponent component : components) {
75+
if (component.percent() < 1) {
76+
throw new IllegalArgumentException("Percent value must be greater than zero.");
77+
}
78+
sum += component.percent();
79+
}
80+
if (sum != 100) {
81+
throw new IllegalArgumentException("Components must be a list of RandomComponents such that the percent fields sum to 100");
82+
}
83+
this.components = new ArrayList<>(components);
84+
}
85+
86+
@JsonProperty
87+
public long seed() {
88+
return seed;
89+
}
90+
91+
@JsonProperty
92+
public List<RandomComponent> components() {
93+
return components;
94+
}
95+
96+
@Override
97+
public byte[] generate(long position) {
98+
int randPercent;
99+
synchronized (random) {
100+
random.setSeed(seed + position);
101+
randPercent = random.nextInt(100);
102+
}
103+
int curPercent = 0;
104+
RandomComponent com = components.get(0);
105+
for (RandomComponent component : components) {
106+
curPercent += component.percent();
107+
if (curPercent > randPercent) {
108+
com = component;
109+
break;
110+
}
111+
}
112+
return com.component().generate(position);
113+
}
114+
}

tools/src/test/java/org/apache/kafka/trogdor/workload/PayloadGeneratorTest.java

Lines changed: 95 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,13 @@
2424
import java.nio.ByteBuffer;
2525
import java.nio.ByteOrder;
2626
import java.util.Arrays;
27+
import java.util.ArrayList;
28+
import java.util.List;
2729

2830
import static org.junit.Assert.assertArrayEquals;
2931
import static org.junit.Assert.assertEquals;
30-
32+
import static org.junit.Assert.assertNull;
33+
import static org.junit.Assert.assertThrows;
3134

3235
public class PayloadGeneratorTest {
3336
@Rule
@@ -104,7 +107,11 @@ private static void testReproducible(PayloadGenerator generator) {
104107
byte[] val = generator.generate(123);
105108
generator.generate(456);
106109
byte[] val2 = generator.generate(123);
107-
assertArrayEquals(val, val2);
110+
if (val == null) {
111+
assertNull(val2);
112+
} else {
113+
assertArrayEquals(val, val2);
114+
}
108115
}
109116

110117
@Test
@@ -123,6 +130,92 @@ public void testUniformRandomPayloadGeneratorPaddingBytes() {
123130
assertArrayEquals(val1End, val2End);
124131
assertArrayEquals(val1End, val3End);
125132
}
133+
134+
@Test
135+
public void testRandomComponentPayloadGenerator() {
136+
NullPayloadGenerator nullGenerator = new NullPayloadGenerator();
137+
RandomComponent nullConfig = new RandomComponent(50, nullGenerator);
138+
139+
UniformRandomPayloadGenerator uniformGenerator =
140+
new UniformRandomPayloadGenerator(5, 123, 0);
141+
RandomComponent uniformConfig = new RandomComponent(50, uniformGenerator);
142+
143+
SequentialPayloadGenerator sequentialGenerator =
144+
new SequentialPayloadGenerator(4, 10);
145+
RandomComponent sequentialConfig = new RandomComponent(75, sequentialGenerator);
146+
147+
ConstantPayloadGenerator constantGenerator =
148+
new ConstantPayloadGenerator(4, new byte[0]);
149+
RandomComponent constantConfig = new RandomComponent(25, constantGenerator);
150+
151+
List<RandomComponent> components1 = new ArrayList<>(Arrays.asList(nullConfig, uniformConfig));
152+
List<RandomComponent> components2 = new ArrayList<>(Arrays.asList(sequentialConfig, constantConfig));
153+
byte[] expected = new byte[4];
154+
155+
PayloadIterator iter = new PayloadIterator(
156+
new RandomComponentPayloadGenerator(4, components1));
157+
int notNull = 0;
158+
int isNull = 0;
159+
while (notNull < 1000 || isNull < 1000) {
160+
byte[] cur = iter.next();
161+
if (cur == null) {
162+
isNull++;
163+
} else {
164+
notNull++;
165+
}
166+
}
167+
168+
iter = new PayloadIterator(
169+
new RandomComponentPayloadGenerator(123, components2));
170+
int isZeroBytes = 0;
171+
int isNotZeroBytes = 0;
172+
while (isZeroBytes < 500 || isNotZeroBytes < 1500) {
173+
byte[] cur = iter.next();
174+
if (Arrays.equals(expected, cur)) {
175+
isZeroBytes++;
176+
} else {
177+
isNotZeroBytes++;
178+
}
179+
}
180+
181+
RandomComponent uniformConfig2 = new RandomComponent(25, uniformGenerator);
182+
RandomComponent sequentialConfig2 = new RandomComponent(25, sequentialGenerator);
183+
RandomComponent nullConfig2 = new RandomComponent(25, nullGenerator);
184+
185+
List<RandomComponent> components3 = new ArrayList<>(Arrays.asList(sequentialConfig2, uniformConfig2, nullConfig));
186+
List<RandomComponent> components4 = new ArrayList<>(Arrays.asList(uniformConfig2, sequentialConfig2, constantConfig, nullConfig2));
187+
188+
testReproducible(new RandomComponentPayloadGenerator(4, components1));
189+
testReproducible(new RandomComponentPayloadGenerator(123, components2));
190+
testReproducible(new RandomComponentPayloadGenerator(50, components3));
191+
testReproducible(new RandomComponentPayloadGenerator(0, components4));
192+
}
193+
194+
@Test
195+
public void testRandomComponentPayloadGeneratorErrors() {
196+
NullPayloadGenerator nullGenerator = new NullPayloadGenerator();
197+
RandomComponent nullConfig = new RandomComponent(25, nullGenerator);
198+
UniformRandomPayloadGenerator uniformGenerator =
199+
new UniformRandomPayloadGenerator(5, 123, 0);
200+
RandomComponent uniformConfig = new RandomComponent(25, uniformGenerator);
201+
ConstantPayloadGenerator constantGenerator =
202+
new ConstantPayloadGenerator(4, new byte[0]);
203+
RandomComponent constantConfig = new RandomComponent(-25, constantGenerator);
204+
205+
List<RandomComponent> components1 = new ArrayList<>(Arrays.asList(nullConfig, uniformConfig));
206+
List<RandomComponent> components2 = new ArrayList<>(Arrays.asList(
207+
nullConfig, constantConfig, uniformConfig, nullConfig, uniformConfig, uniformConfig));
208+
209+
assertThrows(IllegalArgumentException.class, () -> {
210+
new PayloadIterator(new RandomComponentPayloadGenerator(1, new ArrayList<>()));
211+
});
212+
assertThrows(IllegalArgumentException.class, () -> {
213+
new PayloadIterator(new RandomComponentPayloadGenerator(13, components2));
214+
});
215+
assertThrows(IllegalArgumentException.class, () -> {
216+
new PayloadIterator(new RandomComponentPayloadGenerator(123, components1));
217+
});
218+
}
126219

127220
@Test
128221
public void testPayloadIterator() {

0 commit comments

Comments
 (0)