Skip to content

Commit c19ebb0

Browse files
GitHKAndrei Neagu
and
Andrei Neagu
authored
API server rate limiting (#2031)
* enables rate limiting for API server * added utils for testing service rate limits * added wrapper Co-authored-by: Andrei Neagu <[email protected]>
1 parent e1933ca commit c19ebb0

File tree

2 files changed

+197
-1
lines changed

2 files changed

+197
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
import asyncio
2+
import logging
3+
import math
4+
import time
5+
6+
from typing import List, Awaitable
7+
from functools import wraps
8+
9+
from aiohttp import ClientSession, ClientTimeout, ClientResponse
10+
11+
log = logging.getLogger()
12+
13+
14+
def function_duration(func):
15+
@wraps(func)
16+
async def wrapper(*args, **kwargs):
17+
start = time.time()
18+
result = await func(*args, **kwargs)
19+
end = time.time()
20+
elapsed = end - start
21+
log.info("Function '%s' execution took '%0.2f' seconds", func.__name__, elapsed)
22+
return result
23+
24+
return wrapper
25+
26+
27+
def is_rate_limit_reached(result: ClientResponse) -> bool:
28+
return "Retry-After" in result.headers
29+
30+
31+
async def get_request_result(
32+
client: ClientSession, endpoint_to_check: str
33+
) -> ClientResponse:
34+
result = await client.get(endpoint_to_check)
35+
log.debug("%s\n%s\n%s", result, await result.text(), dict(result.headers))
36+
return result
37+
38+
39+
async def assert_burst_request(
40+
client: ClientSession,
41+
endpoint_to_check: str,
42+
burst: int,
43+
):
44+
functions = [get_request_result(client, endpoint_to_check) for x in range(burst)]
45+
results = await asyncio.gather(*functions)
46+
for result in results:
47+
assert is_rate_limit_reached(result) is False
48+
49+
50+
@function_duration
51+
async def assert_burst_rate_limit(
52+
endpoint_to_check: str, average: int, period_sec: int, burst: int
53+
) -> float:
54+
"""
55+
Runs 2 burst sequences with a pause in between and expects for the
56+
next result to fail.
57+
"""
58+
59+
max_rate = period_sec / average
60+
# sleeping 2 times the burst window
61+
burst_window = period_sec / burst
62+
sleep_internval = 2 * burst_window
63+
64+
log.info(
65+
"Sleeping params: burst_window=%s, sleep_interval=%s, max_rate=%s",
66+
burst_window,
67+
sleep_internval,
68+
max_rate,
69+
)
70+
71+
timeout = ClientTimeout(total=10, connect=1, sock_connect=1)
72+
async with ClientSession(timeout=timeout) as client:
73+
74+
# check can burst in timeframe
75+
await assert_burst_request(
76+
client=client, endpoint_to_check=endpoint_to_check, burst=burst
77+
)
78+
79+
log.info("First burst finished")
80+
81+
await asyncio.sleep(sleep_internval)
82+
83+
# check that burst in timeframe is ok
84+
await assert_burst_request(
85+
client=client, endpoint_to_check=endpoint_to_check, burst=burst
86+
)
87+
88+
log.info("Second burst finished")
89+
90+
# check that another request after the burst fails
91+
result = await get_request_result(client, endpoint_to_check)
92+
assert is_rate_limit_reached(result) is True
93+
94+
return sleep_internval
95+
96+
97+
@function_duration
98+
async def assert_steady_rate_in_5_seconds(
99+
endpoint_to_check: str, average: int, period_sec: int, **_
100+
) -> float:
101+
"""Creates a requests at a continuous rate without considering burst limits"""
102+
# run tests for at least 5 seconds
103+
max_rate = period_sec / average # reqs/ sec
104+
requests_to_make = int(math.ceil(max_rate * 5))
105+
106+
sleep_interval = max_rate
107+
108+
log.info(
109+
"Steady rate params: sleep_interval=%s, max_rate=%s, requests_to_make=%s",
110+
sleep_interval,
111+
max_rate,
112+
requests_to_make,
113+
)
114+
115+
timeout = ClientTimeout(total=10, connect=1, sock_connect=1)
116+
async with ClientSession(timeout=timeout) as client:
117+
118+
for i in range(requests_to_make):
119+
log.info("Request %s", i)
120+
result = await get_request_result(client, endpoint_to_check)
121+
assert is_rate_limit_reached(result) is False
122+
log.info("Sleeping for %s s", sleep_interval)
123+
await asyncio.sleep(sleep_interval)
124+
125+
return sleep_interval
126+
127+
128+
CHECKS_TO_RUN: List[Awaitable] = [
129+
assert_steady_rate_in_5_seconds,
130+
assert_burst_rate_limit,
131+
]
132+
133+
134+
@function_duration
135+
async def run_rate_limit_configuration_checks(
136+
endpoint_to_check: str, average: int = 0, period_sec: int = 1, burst: int = 1
137+
):
138+
"""
139+
Runner to start all the checks for the firewall configuration
140+
141+
All tests mut return the period to sleep before the next test can start.
142+
143+
All defaults are taken from Traefik's docs
144+
SEE https://doc.traefik.io/traefik/middlewares/ratelimit/
145+
"""
146+
147+
log.warning(
148+
"Runtime will vary based on the rate limit configuration of the service\n"
149+
)
150+
151+
for awaitable in CHECKS_TO_RUN:
152+
log.info("<<<< Starting test '%s'...", awaitable.__name__)
153+
sleep_before_next_test = await awaitable(
154+
endpoint_to_check=endpoint_to_check,
155+
average=average,
156+
period_sec=period_sec,
157+
burst=burst,
158+
)
159+
log.info(">>>> Finished testing '%s'\n", awaitable.__name__)
160+
161+
log.info(">>>> Sleeping '%s' seconds before next test", sleep_before_next_test)
162+
await asyncio.sleep(sleep_before_next_test)
163+
164+
log.info("All tests completed")
165+
166+
167+
if __name__ == "__main__":
168+
logging.basicConfig(
169+
level=logging.INFO,
170+
format="%(asctime)s %(levelname)s %(threadName)s [%(name)s] %(message)s",
171+
)
172+
173+
# How to use, the below parameters are derived from the following labels:
174+
# - traefik.http.middlewares.ratelimit-${SWARM_STACK_NAME}_api-server.ratelimit.average=1
175+
# - traefik.http.middlewares.ratelimit-${SWARM_STACK_NAME}_api-server.ratelimit.period=1m
176+
# - traefik.http.middlewares.ratelimit-${SWARM_STACK_NAME}_api-server.ratelimit.burst=10
177+
# Will result in: average=1, period_sec=60, burst=10
178+
# WARNING: in the above example the test will run for 5 hours :\
179+
180+
asyncio.get_event_loop().run_until_complete(
181+
run_rate_limit_configuration_checks(
182+
endpoint_to_check="http://localhost:10081/",
183+
average=1,
184+
period_sec=60,
185+
burst=10,
186+
)
187+
)

services/docker-compose.yml

+10-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ services:
1919
- traefik.http.routers.${SWARM_STACK_NAME}_api-server.rule=hostregexp(`{host:.+}`)
2020
- traefik.http.routers.${SWARM_STACK_NAME}_api-server.entrypoints=simcore_api
2121
- traefik.http.routers.${SWARM_STACK_NAME}_api-server.priority=1
22-
- traefik.http.routers.${SWARM_STACK_NAME}_api-server.middlewares=${SWARM_STACK_NAME}_gzip@docker
22+
- traefik.http.routers.${SWARM_STACK_NAME}_api-server.middlewares=${SWARM_STACK_NAME}_gzip@docker,ratelimit-${SWARM_STACK_NAME}_api-server
2323
networks:
2424
- default
2525

@@ -335,6 +335,15 @@ services:
335335
placement:
336336
constraints:
337337
- node.role == manager
338+
labels:
339+
# for each service in the stack a new middlaware for rate limiting needs to be registered here
340+
# requests = average / period this is how the limits are defined
341+
- traefik.http.middlewares.ratelimit-${SWARM_STACK_NAME}_api-server.ratelimit.average=1
342+
- traefik.http.middlewares.ratelimit-${SWARM_STACK_NAME}_api-server.ratelimit.period=1m
343+
# a burst is computed over a period of 1 second
344+
- traefik.http.middlewares.ratelimit-${SWARM_STACK_NAME}_api-server.ratelimit.burst=10
345+
# X-Forwarded-For header extracts second IP from the right, count starts at one
346+
- traefik.http.middlewares.ratelimit-${SWARM_STACK_NAME}_api-server.ratelimit.sourcecriterion.ipstrategy.depth=2
338347
networks:
339348
- default
340349
- interactive_services_subnet

0 commit comments

Comments
 (0)