Skip to content

Commit a8a6c70

Browse files
committed
test(s3stream): Unit tests for TrafficRateLimiter (#2365)
1 parent d90d73e commit a8a6c70

File tree

1 file changed

+80
-0
lines changed

1 file changed

+80
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright 2024, AutoMQ HK Limited.
3+
*
4+
* The use of this file is governed by the Business Source License,
5+
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
6+
*
7+
* As of the Change Date specified in that file, in accordance with
8+
* the Business Source License, use of this software will be governed
9+
* by the Apache License, Version 2.0
10+
*/
11+
12+
package com.automq.stream.s3.operator;
13+
import java.util.concurrent.ScheduledExecutorService;
14+
import java.util.concurrent.Executors;
15+
import java.util.concurrent.ExecutionException;
16+
import java.util.concurrent.ExecutorService;
17+
import java.util.concurrent.CountDownLatch;
18+
import java.util.concurrent.Future;
19+
import org.junit.jupiter.api.AfterEach;
20+
import org.junit.jupiter.api.BeforeEach;
21+
import org.junit.jupiter.api.Tag;
22+
import org.junit.jupiter.api.Test;
23+
import static org.junit.jupiter.api.Assertions.assertEquals;
24+
import static org.junit.jupiter.api.Assertions.assertTrue;
25+
26+
@Tag("S3Unit")
27+
public class TrafficRateLimiterTest {
28+
29+
private ScheduledExecutorService scheduler;
30+
31+
@BeforeEach
32+
public void setUp() {
33+
scheduler = Executors.newScheduledThreadPool(1);
34+
}
35+
36+
@AfterEach
37+
public void tearDown() {
38+
scheduler.shutdown();
39+
}
40+
41+
@Test
42+
public void testExceedsBoundary() {
43+
TrafficRateLimiter limiter = new TrafficRateLimiter(scheduler);
44+
long prev = limiter.currentRate();
45+
limiter.update(Long.MAX_VALUE);
46+
assertEquals(prev, limiter.currentRate());
47+
limiter.update(0);
48+
assertEquals(1L << 10, limiter.currentRate());
49+
}
50+
51+
@Test
52+
public void testConsumeBeforeUpdate() {
53+
long rateLimit = 1024 * 1024;
54+
long totalTraffic = 1024 * 1024 * 5;
55+
ExecutorService executor = Executors.newSingleThreadExecutor();
56+
CountDownLatch consumeStarted = new CountDownLatch(1);
57+
TrafficRateLimiter limiter = new TrafficRateLimiter(scheduler, rateLimit);
58+
Future<Long> future = executor.submit(() -> {
59+
long startTime = System.currentTimeMillis();
60+
limiter.consume(totalTraffic).join();
61+
consumeStarted.countDown();
62+
long endTime = System.currentTimeMillis();
63+
return endTime - startTime;
64+
});
65+
66+
try {
67+
consumeStarted.await(); // make sure update after the consume method is called
68+
long prevRate = limiter.currentRate();
69+
limiter.update(0);
70+
long duration = future.get();
71+
double actualRate = ((double) totalTraffic / 1024 / duration) * 1000;
72+
assertTrue(actualRate > limiter.currentRate() && actualRate <= prevRate);
73+
assertTrue(duration / 1000 <= 5);
74+
} catch (InterruptedException | ExecutionException e) {
75+
throw new RuntimeException(e);
76+
} finally {
77+
executor.shutdown();
78+
}
79+
}
80+
}

0 commit comments

Comments
 (0)