Skip to content

Commit fb8327e

Browse files
Measure maximum throughput in a separate step
With this commit we measure maximum throughput in a separate step. This ensures that the system shows less fluctuations in throughput when in throttled mode. Relates elastic#46
1 parent a799169 commit fb8327e

File tree

3 files changed

+120
-72
lines changed

3 files changed

+120
-72
lines changed

eventdata/challenges/daily-log-volume-index-and-query.json

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,31 @@
1616
"benchmark_type": "logs-fixed-daily-volume"
1717
},
1818
"schedule": [
19+
{
20+
"name": "measure-maximum-utilization",
21+
"operation": {
22+
"operation-type": "bulk",
23+
"param-source": "elasticlogs_bulk",
24+
"index": "throughput-test-elasticlogs-2999-01-01",
25+
"bulk-size": {{p_bulk_size}},
26+
"daily_logging_volume": "{{p_daily_logging_volume}}",
27+
"number_of_days": 1,
28+
"record_raw_event_size": false
29+
},
30+
{# Whatever is shorter will win - either we run for this long or we finished ingesting the daily logging volume #}
31+
"time-period": 600,
32+
"schedule": "utilization",
33+
"record-response-times": true,
34+
"clients": {{ p_bulk_indexing_clients }},
35+
"include-in-reporting": false
36+
},
37+
{
38+
"name": "delete-measurement-index",
39+
"operation": {
40+
"operation-type": "delete-index",
41+
"index": "throughput-test-elasticlogs-2999-01-01"
42+
}
43+
},
1944
{% set comma = joiner() %}
2045
{% for day in range(p_number_of_days) %}
2146
{% set utilization = (day + 1) / p_number_of_days %}
@@ -25,6 +50,7 @@
2550
{
2651
"parallel": {
2752
"completed-by": "{{bulk_index_task_name}}",
53+
{# We are assuming that indexing one day of logs takes longer than the warmup-time-period #}
2854
"warmup-time-period": 600,
2955
"tasks": [
3056
{
@@ -40,8 +66,6 @@
4066
"number_of_days": 1,
4167
"record_raw_event_size": {{p_record_raw_event_size}}
4268
},
43-
{# We are assuming that indexing one day of logs takes longer than the warmup-time-period #}
44-
"warmup-time-period": 600,
4569
"schedule": "utilization",
4670
"target-utilization": {{ utilization }},
4771
"clients": {{ p_bulk_indexing_clients }},

eventdata/schedulers/utilization_scheduler.py

Lines changed: 39 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -22,57 +22,60 @@
2222

2323

2424
class UtilizationBasedScheduler:
25+
RESPONSE_TIMES = []
2526
"""
26-
This scheduler schedules events at 100% utilization (unthrottled) during the warmup time-period. It tracks this
27-
period itself (i.e. independently of Rally) using the task parameter ``warmup-time-period``. During this period
28-
it gathers response time metrics. The median response time and the provided target utilization (via the task
29-
parameter ``target-utilization``) determine the average waiting time during the actual measurement phase of the
30-
benchmark. In order to avoid that clients coordinate, we randomize waiting time using a Poisson distribution.
27+
This scheduler schedules events at 100% utilization (unthrottled) if it is in recording mode (enabled by setting
28+
``record-response-times`` to ``True``). Otherwise it runs in measurement mode where median response time and the
29+
provided target utilization (via the task parameter ``target-utilization``) determine the average waiting time.
30+
To prevent clients from coordinating (i.e. executing requests at exactly the same time), we randomize waiting
31+
time using a Poisson distribution.
3132
"""
3233
def __init__(self, params, perf_counter=time.perf_counter):
3334
self.logger = logging.getLogger(__name__)
3435
self.perf_counter = perf_counter
35-
self.target_utilization = float(params["target-utilization"])
36-
if self.target_utilization <= 0.0 or self.target_utilization > 1.0:
37-
raise ValueError("target-utilization must be in the range (0.0, 1.0] but is {}".format(
38-
self.target_utilization))
39-
self.warmup_time_period = int(params["warmup-time-period"])
40-
# to determine the target utilization
41-
self.response_times = []
42-
self.start_warmup = None
43-
self.end_warmup = None
44-
self.in_warmup = None
45-
self.last_request_start = None
46-
# determined by the utilization calculation
47-
self.wait_time = None
36+
self.recording = params.get("record-response-times", False)
37+
if self.recording:
38+
self.logger.info("Running in recording mode.")
39+
self.last_request_start = None
40+
else:
41+
self.logger.info("Running in measurement mode.")
42+
self.target_utilization = float(params["target-utilization"])
43+
if self.target_utilization <= 0.0 or self.target_utilization > 1.0:
44+
raise ValueError("target-utilization must be in the range (0.0, 1.0] but is {}".format(
45+
self.target_utilization))
46+
response_times = UtilizationBasedScheduler.RESPONSE_TIMES
47+
if len(response_times) == 0:
48+
raise ValueError("No response times recorded. Please run first with 'record-response-times'.")
49+
median_response_time_at_full_utilization = statistics.median(response_times)
50+
self.time_between_requests = median_response_time_at_full_utilization * (1 / self.target_utilization)
51+
self.logger.info("Time between requests is [%.3f] seconds for a utilization of [%.2f]%% (based on "
52+
"[%d] samples with a median response time of [%.3f] seconds).",
53+
self.time_between_requests, (self.target_utilization * 100), len(response_times),
54+
median_response_time_at_full_utilization)
4855

4956
def next(self, current):
50-
if self.in_warmup is None:
51-
self.in_warmup = True
52-
self.start_warmup = self.perf_counter()
53-
self.end_warmup = self.start_warmup + self.warmup_time_period
54-
self.last_request_start = self.start_warmup
55-
return 0
56-
elif self.in_warmup:
57+
if self.recording:
5758
now = self.perf_counter()
58-
self.response_times.append(now - self.last_request_start)
59+
# skip the very first sample
60+
if self.last_request_start is not None:
61+
UtilizationBasedScheduler.RESPONSE_TIMES.append(now - self.last_request_start)
5962
self.last_request_start = now
60-
if now >= self.end_warmup:
61-
self.in_warmup = False
62-
median_response_time_at_full_utilization = statistics.median(self.response_times)
63-
# To determine the waiting time we need to subtract the (expected) response time from the total expected
64-
# response time.
65-
self.wait_time = median_response_time_at_full_utilization * ((1 / self.target_utilization) - 1)
66-
self.logger.info("Waiting time is [%.2f] seconds for a utilization of [%.2f]%% (based on [%d] samples).",
67-
self.wait_time, (self.target_utilization * 100), len(self.response_times))
6863
# run unthrottled while determining the target utilization
6964
return 0
7065

7166
if self.target_utilization == 1.0:
7267
return 0
7368
else:
7469
# don't let every client send requests at the same time
75-
return current + random.expovariate(1 / self.wait_time)
70+
return current + random.expovariate(1 / self.time_between_requests)
71+
72+
# intended for testing
73+
@classmethod
74+
def reset_recorded_response_times(cls):
75+
UtilizationBasedScheduler.RESPONSE_TIMES = []
7676

7777
def __str__(self):
78-
return "Utilization scheduler with target utilization of {:.2f}%.".format(self.target_utilization * 100)
78+
if self.recording:
79+
return "Utilization scheduler in recording mode."
80+
else:
81+
return "Utilization scheduler with target utilization of {:.2f}%.".format(self.target_utilization * 100)

tests/schedulers/utilization_scheduler_test.py

Lines changed: 55 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@
2121
from eventdata.schedulers.utilization_scheduler import UtilizationBasedScheduler
2222

2323

24+
@pytest.fixture()
25+
def reset_recorded_times():
26+
UtilizationBasedScheduler.reset_recorded_response_times()
27+
28+
2429
class StaticPerfCounter:
2530
def __init__(self, start):
2631
self.now = start
@@ -29,28 +34,44 @@ def __call__(self, *args, **kwargs):
2934
return self.now
3035

3136

37+
@pytest.mark.usefixtures("reset_recorded_times")
3238
def test_invalid_target_utilization():
3339
with pytest.raises(ValueError) as ex:
3440
UtilizationBasedScheduler(params={
3541
"target-utilization": 200.432,
36-
"warmup-time-period": 100
42+
"record-response-times": False
3743
})
3844

3945
assert "target-utilization must be in the range (0.0, 1.0] but is 200.432" == str(ex.value)
4046

4147
with pytest.raises(ValueError) as ex:
4248
UtilizationBasedScheduler(params={
4349
"target-utilization": 0.0,
44-
"warmup-time-period": 100
50+
"record-response-times": False
4551
})
4652

4753
assert "target-utilization must be in the range (0.0, 1.0] but is 0.0" == str(ex.value)
4854

4955

56+
@pytest.mark.usefixtures("reset_recorded_times")
57+
def test_no_response_times_recorded():
58+
with pytest.raises(ValueError) as ex:
59+
UtilizationBasedScheduler(params={
60+
"target-utilization": 0.5,
61+
"record-response-times": False
62+
})
63+
64+
assert "No response times recorded. Please run first with 'record-response-times'." == str(ex.value)
65+
66+
67+
@pytest.mark.usefixtures("reset_recorded_times")
5068
def test_valid_params():
69+
# simulate that response times have been recorded previously...
70+
UtilizationBasedScheduler.RESPONSE_TIMES.append(1)
71+
5172
s = UtilizationBasedScheduler(params={
5273
"target-utilization": 0.0000001,
53-
"warmup-time-period": 100
74+
"record-response-times": False
5475
})
5576

5677
assert s is not None
@@ -63,60 +84,60 @@ def test_valid_params():
6384
assert s is not None
6485

6586

87+
@pytest.mark.usefixtures("reset_recorded_times")
6688
def test_unthrottled_calculation():
6789
perf_counter = StaticPerfCounter(start=0)
6890

6991
s = UtilizationBasedScheduler(params={
70-
"target-utilization": 1.0,
71-
"warmup-time-period": 100
92+
"record-response-times": True
7293
}, perf_counter=perf_counter)
7394

95+
# simulate two requests 10 seconds apart
96+
assert s.next(0) == 0
97+
perf_counter.now = 10
7498
assert s.next(0) == 0
75-
assert s.in_warmup
76-
assert s.start_warmup == 0
77-
assert s.end_warmup == 100
7899

79-
# simulate end of warmup
80-
perf_counter.now = 100
81-
assert s.next(100) == 0
82-
assert not s.in_warmup
100+
s = UtilizationBasedScheduler(params={
101+
"target-utilization": 1.0,
102+
"record-response-times": False
103+
}, perf_counter=perf_counter)
83104

84-
# normal mode of operation
105+
# normal mode of operation (unthrottled)
85106
assert s.next(200) == 0
86107
assert s.next(300) == 0
87108

88109

110+
@pytest.mark.usefixtures("reset_recorded_times")
89111
def test_throttled_calculation():
90112
perf_counter = StaticPerfCounter(start=0)
91113

92114
s = UtilizationBasedScheduler(params={
93-
"target-utilization": 0.1,
94-
"warmup-time-period": 100
115+
"record-response-times": True
95116
}, perf_counter=perf_counter)
96117

97-
# warmup phase, response time is always 20 seconds
118+
# recording phase, response time is always 20 seconds
119+
next_scheduled = 0
98120
for t in range(0, 100, 20):
99121
perf_counter.now = t
100-
assert s.next(t) == 0
101-
assert s.in_warmup
102-
assert s.start_warmup == 0
103-
assert s.end_warmup == 100
104-
105-
# simulate end of warmup
106-
perf_counter.now = 100
107-
assert s.next(100) == 0
108-
assert not s.in_warmup
109-
# 20 seconds * (1 / target utilization - 1) = 20 seconds * (1 / 0.1 - 1) = 20 seconds * 9 = 180 seconds
110-
assert s.wait_time == 180
122+
next_scheduled = s.next(next_scheduled)
123+
assert next_scheduled == 0
124+
125+
# now we're in throttled mode
126+
s = UtilizationBasedScheduler(params={
127+
"target-utilization": 0.1,
128+
"record-response-times": False
129+
}, perf_counter=perf_counter)
130+
# 20 seconds * (1 / target utilization) = 20 seconds * (1 / 0.1) = 20 seconds * 10 = 200 seconds
131+
assert s.time_between_requests == 200
111132

112133
# normal mode of operation
113-
t = 101
114134
waiting_times = []
115-
while t < 1000000:
116-
next_request = s.next(t)
117-
waiting_times.append((next_request - t))
135+
next_scheduled = 0
136+
while next_scheduled < 1000000:
137+
next_request = s.next(next_scheduled)
138+
waiting_times.append((next_request - next_scheduled))
118139
# 20 seconds is our expected response time
119-
t = next_request + 20
140+
next_scheduled = next_request
120141

121-
# mean response time should approach 180 seconds
122-
assert 170 <= statistics.mean(waiting_times) <= 190
142+
# mean response time should approach 200 seconds
143+
assert 190 <= statistics.mean(waiting_times) <= 210

0 commit comments

Comments
 (0)