Skip to content

Commit 8e0f694

Browse files
authored
fix(s3stream): Add unit tests for TrafficRegulator (#2395)
* Add unit tests for TrafficRegulator (#2366) * Add unit tests for TrafficRegulator (#2366) * Add unit tests for TrafficRegulator (#2366)
1 parent aa43105 commit 8e0f694

File tree

1 file changed

+262
-0
lines changed

1 file changed

+262
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
/*
2+
* Copyright 2024, AutoMQ HK Limited.
3+
*
4+
* The use of this file is governed by the Business Source License,
5+
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
6+
*
7+
* As of the Change Date specified in that file, in accordance with
8+
* the Business Source License, use of this software will be governed
9+
* by the Apache License, Version 2.0
10+
*/
11+
12+
package com.automq.stream.s3.operator;
13+
14+
import com.google.common.collect.EvictingQueue;
15+
16+
import org.junit.jupiter.api.BeforeEach;
17+
import org.junit.jupiter.api.Tag;
18+
import org.junit.jupiter.api.Test;
19+
import org.slf4j.Logger;
20+
21+
import java.lang.reflect.Field;
22+
23+
import static org.junit.jupiter.api.Assertions.assertTrue;
24+
import static org.mockito.Mockito.mock;
25+
import static org.mockito.Mockito.verify;
26+
import static org.mockito.Mockito.when;
27+
28+
@Tag("S3Unit")
29+
class TrafficRegulatorTest {
30+
31+
private static final long MIN_RATE_LIMITER_RATE = kbpsToBps(1);
32+
private static final long MAX_RATE_LIMITER_RATE = TrafficRateLimiter.MAX_BUCKET_TOKENS_PER_SECOND << 10;
33+
private static final long NEAR_MAX_RATE_LIMITER_RATE = (long) (MAX_RATE_LIMITER_RATE * 0.95);
34+
35+
private static final long FAILURE_RATE = mbpsToBps(10);
36+
37+
private TrafficMonitor successMonitor;
38+
private TrafficMonitor failureMonitor;
39+
private TrafficRateLimiter rateLimiter;
40+
private TrafficVolumeLimiter volumeLimiter;
41+
private Logger logger;
42+
private TrafficRegulator regulator;
43+
44+
/**
45+
* Converts a rate given in MB/s to bytes/s.
46+
*/
47+
private static long mbpsToBps(double mbRate) {
48+
return (long) (mbRate * (1 << 20));
49+
}
50+
51+
/**
52+
* Converts a rate given in KB/s to bytes/s.
53+
*/
54+
private static long kbpsToBps(double kbRate) {
55+
return (long) (kbRate * (1 << 10));
56+
}
57+
58+
@BeforeEach
59+
void setUp() {
60+
successMonitor = mock(TrafficMonitor.class);
61+
failureMonitor = mock(TrafficMonitor.class);
62+
rateLimiter = mock(TrafficRateLimiter.class);
63+
volumeLimiter = mock(TrafficVolumeLimiter.class);
64+
logger = mock(Logger.class);
65+
regulator = new TrafficRegulator("testOperation", successMonitor, failureMonitor, rateLimiter, volumeLimiter, logger);
66+
}
67+
68+
// ---------------- Decrease tests (failure rate value does not affect if greater than 0) ----------------
69+
70+
@Test
71+
void testRegulateDecreaseSuccessAboveMinWithFailure() {
72+
long successRate = mbpsToBps(100);
73+
setRegulatorDecreaseEnv(successRate, MIN_RATE_LIMITER_RATE);
74+
regulator.regulate();
75+
checkRegulate(successRate);
76+
}
77+
78+
@Test
79+
void testRegulateDecreaseSuccessBelowMinWithFailure() {
80+
long successRate = mbpsToBps(5);
81+
setRegulatorDecreaseEnv(successRate, MIN_RATE_LIMITER_RATE);
82+
regulator.regulate();
83+
checkRegulate(getMinRateFromRegulator());
84+
}
85+
86+
// ---------------- Increase tests ----------------
87+
88+
@Test
89+
void testRegulateIncreaseWithMaxRateLimiter() {
90+
setRegulatorIncreaseEnv(0, MAX_RATE_LIMITER_RATE);
91+
regulator.regulate();
92+
checkRegulate(MAX_RATE_LIMITER_RATE);
93+
}
94+
95+
@Test
96+
void testRegulateIncreaseWithSuccessBelowMinAndRateLimiterMinNoHistory() {
97+
long successRate = mbpsToBps(5);
98+
setRegulatorIncreaseEnv(successRate, MIN_RATE_LIMITER_RATE);
99+
regulator.regulate();
100+
checkRegulate((long) (MIN_RATE_LIMITER_RATE + successRate * getFastIncrementRatio()));
101+
}
102+
103+
@Test
104+
void testRegulateIncreaseWithSuccessBelowMinAndRateLimiterNearMaxNoHistory() {
105+
long successRate = mbpsToBps(5);
106+
setRegulatorIncreaseEnv(successRate, NEAR_MAX_RATE_LIMITER_RATE);
107+
regulator.regulate();
108+
checkRegulate(MAX_RATE_LIMITER_RATE);
109+
}
110+
111+
@Test
112+
void testRegulateIncreaseWithSuccessNearMaxAndRateLimiterNearMaxNoHistory() {
113+
long successRate = (long) (getMaxRateFromRegulator() * 0.95);
114+
setRegulatorIncreaseEnv(successRate, NEAR_MAX_RATE_LIMITER_RATE);
115+
regulator.regulate();
116+
checkRegulate((long) (NEAR_MAX_RATE_LIMITER_RATE + successRate * getSlowIncrementRatio()));
117+
}
118+
119+
// ---------------- Tests involving success history ----------------
120+
121+
@Test
122+
void testRegulateIncreaseWithHistoryNotFull() {
123+
// Populate the success history queue with 5 entries
124+
EvictingQueue<Double> queue = getSuccessRateQueue();
125+
queue.add((double) mbpsToBps(10.0));
126+
queue.add((double) mbpsToBps(20.0));
127+
queue.add((double) mbpsToBps(30.0));
128+
queue.add((double) mbpsToBps(40.0));
129+
queue.add((double) mbpsToBps(50.0));
130+
// Setup current rate to 60 MB/s, failure rate 0, success rate 0
131+
when(rateLimiter.currentRate()).thenReturn(mbpsToBps(60.0));
132+
when(successMonitor.getRateAndReset()).thenReturn(0.0);
133+
when(failureMonitor.getRateAndReset()).thenReturn(0.0);
134+
regulator.regulate();
135+
// Expected new rate: second largest of [60 + 50*0.5=85, 60 +50*0.05=62.5, 50] → 62.5 MB/s
136+
long expectedNewRate = mbpsToBps(62.5);
137+
verify(rateLimiter).update(expectedNewRate);
138+
verify(volumeLimiter).update(expectedNewRate * getWindowSize());
139+
}
140+
141+
@Test
142+
void testRegulateIncreaseWithFullHistory() {
143+
// Inject 64 entries into the queue (top 4: 100, 90, 80, 70)
144+
EvictingQueue<Double> queue = getSuccessRateQueue();
145+
queue.add((double) mbpsToBps(100.0));
146+
queue.add((double) mbpsToBps(90.0));
147+
queue.add((double) mbpsToBps(80.0));
148+
queue.add((double) mbpsToBps(70.0));
149+
for (int i = 0; i < 60; i++) {
150+
queue.add((double) mbpsToBps(60.0));
151+
}
152+
// Setup current rate 80 MB/s, failure rate 0, success rate 0
153+
when(rateLimiter.currentRate()).thenReturn(mbpsToBps(80.0));
154+
when(successMonitor.getRateAndReset()).thenReturn(0.0);
155+
when(failureMonitor.getRateAndReset()).thenReturn(0.0);
156+
regulator.regulate();
157+
// Expected new rate: 85 MB/s (mean of top 4 entries)
158+
long expectedNewRate = mbpsToBps(85.0);
159+
verify(rateLimiter).update(expectedNewRate);
160+
verify(volumeLimiter).update(expectedNewRate * getWindowSize());
161+
}
162+
163+
@Test
164+
void testRegulateIncreaseJumpsToMaxWhenCurrentLimitExceedsThreshold() {
165+
// Setup history with mean 100 MB/s
166+
EvictingQueue<Double> queue = getSuccessRateQueue();
167+
for (int i = 0; i < 4; i++) {
168+
queue.add((double) mbpsToBps(100.0));
169+
}
170+
for (int i = 0; i < 60; i++) {
171+
queue.add((double) mbpsToBps(50.0));
172+
}
173+
// Current rate is 701 MB/s (exceeds 7x history rate)
174+
when(rateLimiter.currentRate()).thenReturn(mbpsToBps(701.0));
175+
when(successMonitor.getRateAndReset()).thenReturn(0.0);
176+
when(failureMonitor.getRateAndReset()).thenReturn(0.0);
177+
regulator.regulate();
178+
// Verify rate jumps to MAX
179+
verify(rateLimiter).update(MAX_RATE_LIMITER_RATE);
180+
verify(volumeLimiter).update(MAX_RATE_LIMITER_RATE * getWindowSize());
181+
}
182+
183+
@Test
184+
void testRegulateFailureDoesNotRecordSuccess() {
185+
when(successMonitor.getRateAndReset()).thenReturn((double) mbpsToBps(100.0));
186+
when(failureMonitor.getRateAndReset()).thenReturn((double) mbpsToBps(10.0));
187+
regulator.regulate();
188+
EvictingQueue<Double> queue = getSuccessRateQueue();
189+
assertTrue(queue.isEmpty(), "Queue should be empty as failure occurred");
190+
}
191+
192+
@SuppressWarnings("unchecked")
193+
private EvictingQueue<Double> getSuccessRateQueue() {
194+
Object field = getField(regulator, "successRateQueue");
195+
if (field instanceof EvictingQueue) {
196+
return (EvictingQueue<Double>) field;
197+
}
198+
throw new IllegalStateException("Field 'successRateQueue' is not of expected type EvictingQueue<Double>");
199+
}
200+
201+
/**
202+
* Retrieves a static field value via reflection.
203+
*/
204+
private Object getStaticField(String fieldName) {
205+
try {
206+
Field field = TrafficRegulator.class.getDeclaredField(fieldName);
207+
field.setAccessible(true);
208+
return field.get(null);
209+
} catch (Exception e) {
210+
throw new RuntimeException("Unable to access " + fieldName + " field", e);
211+
}
212+
}
213+
214+
private Object getField(Object instance, String fieldName) {
215+
try {
216+
Field field = TrafficRegulator.class.getDeclaredField(fieldName);
217+
field.setAccessible(true);
218+
return field.get(instance);
219+
} catch (Exception e) {
220+
throw new RuntimeException(e);
221+
}
222+
}
223+
224+
private int getWindowSize() {
225+
return (int) getStaticField("WINDOW_SIZE");
226+
}
227+
228+
private long getMinRateFromRegulator() {
229+
return (long) getStaticField("MIN");
230+
}
231+
232+
private long getMaxRateFromRegulator() {
233+
return (long) getStaticField("MAX");
234+
}
235+
236+
private double getFastIncrementRatio() {
237+
return (double) getStaticField("FAST_INCREMENT_RATIO");
238+
}
239+
240+
private double getSlowIncrementRatio() {
241+
return (double) getStaticField("SLOW_INCREMENT_RATIO");
242+
}
243+
244+
private void setRegulatorDecreaseEnv(double successRate, long limiterRate) {
245+
setRegulatorEnv(successRate, FAILURE_RATE, limiterRate);
246+
}
247+
248+
private void setRegulatorIncreaseEnv(double successRate, long limiterRate) {
249+
setRegulatorEnv(successRate, 0, limiterRate);
250+
}
251+
252+
private void setRegulatorEnv(double successRate, double failureRate, long limiterRate) {
253+
when(successMonitor.getRateAndReset()).thenReturn(successRate);
254+
when(failureMonitor.getRateAndReset()).thenReturn(failureRate);
255+
when(rateLimiter.currentRate()).thenReturn(limiterRate);
256+
}
257+
258+
private void checkRegulate(long expectedNewRate) {
259+
verify(rateLimiter).update(expectedNewRate);
260+
verify(volumeLimiter).update(expectedNewRate * getWindowSize());
261+
}
262+
}

0 commit comments

Comments
 (0)