Skip to content

Commit 9bdf080

Browse files
committed
[V1][Metrics] Account for multi-engine DP configuration
In the case of DP, we will have a complete set of metrics for each DP rank. We could make get_metrics_snapshot() take a DP rank parameter to avoid this, but it is possible in future we will add further dimensions that we want to label on. Signed-off-by: Mark McLoughlin <[email protected]>
1 parent 13b02a5 commit 9bdf080

File tree

1 file changed

+124
-37
lines changed

1 file changed

+124
-37
lines changed

vllm/v1/metrics/reader.py

Lines changed: 124 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class Histogram(Metric):
5959
"""
6060
count: int
6161
sum: float
62-
buckets: dict[str, float]
62+
buckets: dict[str, int]
6363

6464

6565
def get_metrics_snapshot() -> list[Metric]:
@@ -83,11 +83,10 @@ def get_metrics_snapshot() -> list[Metric]:
8383
if not metric.name.startswith("vllm:"):
8484
continue
8585
if metric.type == "gauge":
86-
sample = _must_get_sample(metric)
87-
collected.append(
88-
Gauge(name=metric.name,
89-
labels=sample.labels,
90-
value=sample.value))
86+
samples = _get_samples(metric)
87+
for s in samples:
88+
collected.append(
89+
Gauge(name=metric.name, labels=s.labels, value=s.value))
9190
elif metric.type == "counter":
9291
samples = _get_samples(metric, "_total")
9392
if metric.name == "vllm:spec_decode_num_accepted_tokens_per_pos":
@@ -99,20 +98,16 @@ def get_metrics_snapshot() -> list[Metric]:
9998
# accepted tokens using a Counter labeled with 'position'.
10099
# We convert these into a vector of integer values.
101100
#
102-
values: list[int] = [0] * len(samples)
101+
for labels, values in _digest_num_accepted_by_pos_samples(
102+
samples):
103+
collected.append(
104+
Vector(name=metric.name, labels=labels, values=values))
105+
else:
103106
for s in samples:
104-
values[int(s.labels["position"])] = int(s.value)
105-
collected.append(
106-
Vector(name=metric.name,
107-
labels=_strip_label(s.labels, "position"),
108-
values=values))
109-
continue
110-
111-
for s in samples:
112-
collected.append(
113-
Counter(name=metric.name,
114-
labels=samples[0].labels,
115-
value=int(samples[0].value)))
107+
collected.append(
108+
Counter(name=metric.name,
109+
labels=s.labels,
110+
value=int(s.value)))
116111

117112
elif metric.type == "histogram":
118113
#
@@ -122,17 +117,17 @@ def get_metrics_snapshot() -> list[Metric]:
122117
# indexed by the value of the 'le' label. The 'le=+Inf'
123118
# label is a special case, catching all values observed.
124119
#
125-
count_sample = int(_must_get_sample(metric, "_count").value)
126-
sum_sample = _must_get_sample(metric, "_sum").value
127-
buckets: dict[str, float] = dict()
128-
for s in _get_samples(metric, "_bucket"):
129-
buckets[s.labels["le"]] = s.value
130-
collected.append(
131-
Histogram(name=metric.name,
132-
labels=_strip_label(s.labels, "le"),
133-
buckets=buckets,
134-
count=count_sample,
135-
sum=sum_sample))
120+
bucket_samples = _get_samples(metric, "_bucket")
121+
count_samples = _get_samples(metric, "_count")
122+
sum_samples = _get_samples(metric, "_sum")
123+
for labels, buckets, count_value, sum_value in _digest_histogram(
124+
bucket_samples, count_samples, sum_samples):
125+
collected.append(
126+
Histogram(name=metric.name,
127+
labels=labels,
128+
buckets=buckets,
129+
count=count_value,
130+
sum=sum_value))
136131
else:
137132
raise AssertionError(f"Unknown metric type {metric.type}")
138133

@@ -145,14 +140,106 @@ def _get_samples(metric: PromMetric,
145140
return [s for s in metric.samples if s.name == name]
146141

147142

148-
def _must_get_sample(metric: PromMetric,
149-
suffix: Optional[str] = None) -> Sample:
150-
samples = _get_samples(metric, suffix)
151-
assert len(samples) == 1
152-
return samples[0]
153-
154-
155143
def _strip_label(labels: dict[str, str], key_to_remove: str) -> dict[str, str]:
156144
labels_copy = labels.copy()
157145
labels_copy.pop(key_to_remove)
158146
return labels_copy
147+
148+
149+
def _digest_histogram(
150+
bucket_samples: list[Sample], count_samples: list[Sample],
151+
sum_samples: list[Sample]
152+
) -> list[tuple[dict[str, str], dict[str, int], int, float]]:
153+
#
154+
# In the case of DP, we have an indigestable
155+
# per-bucket-per-engine count as a list of labelled
156+
# samples, along with total and sum samples
157+
#
158+
# bucket_samples (in):
159+
# labels = {bucket: 100, idx: 0}, value = 2
160+
# labels = {bucket: 200, idx: 0}, value = 4
161+
# labels = {bucket: Inf, idx: 0}, value = 10
162+
# labels = {bucket: 100, idx: 1}, value = 1
163+
# labels = {bucket: 200, idx: 2}, value = 5
164+
# labels = {bucket: Inf, idx: 3}, value = 7
165+
# count_samples (in):
166+
# labels = {idx: 0}, value = 10
167+
# labels = {idx: 1}, value = 7
168+
# sum_samples (in):
169+
# labels = {idx: 0}, value = 2000
170+
# labels = {idx: 1}, value = 1200
171+
#
172+
# output: [
173+
# {idx: 0}, {"100": 2, "200": 4, "Inf": 10}, 10, 2000
174+
# {idx: 1}, {"100": 1, "200": 5, "Inf": 7}, 7, 1200
175+
# ]
176+
buckets_by_labels: dict[frozenset[tuple[str, str]], dict[str, int]] = {}
177+
for s in bucket_samples:
178+
bucket = s.labels["le"]
179+
labels_key = frozenset(_strip_label(s.labels, "le").items())
180+
if labels_key not in buckets_by_labels:
181+
buckets_by_labels[labels_key] = {}
182+
buckets_by_labels[labels_key][bucket] = int(s.value)
183+
184+
counts_by_labels: dict[frozenset[tuple[str, str]], int] = {}
185+
for s in count_samples:
186+
labels_key = frozenset(s.labels.items())
187+
counts_by_labels[labels_key] = int(s.value)
188+
189+
sums_by_labels: dict[frozenset[tuple[str, str]], float] = {}
190+
for s in sum_samples:
191+
labels_key = frozenset(s.labels.items())
192+
sums_by_labels[labels_key] = s.value
193+
194+
assert set(buckets_by_labels.keys()) == set(
195+
counts_by_labels.keys()) == set(sums_by_labels.keys())
196+
197+
output = []
198+
label_keys = list(buckets_by_labels.keys())
199+
for k in label_keys:
200+
labels = dict(k)
201+
output.append((labels, buckets_by_labels[k], counts_by_labels[k],
202+
sums_by_labels[k]))
203+
return output
204+
205+
206+
def _digest_num_accepted_by_pos_samples(
207+
samples: list[Sample]) -> list[tuple[dict[str, str], list[int]]]:
208+
#
209+
# In the case of DP, we have an indigestable
210+
# per-position-per-engine count as a list of
211+
# labelled samples
212+
#
213+
# samples (in):
214+
# labels = {pos: 0, idx: 0}, value = 10
215+
# labels = {pos: 1, idx: 0}, value = 7
216+
# labels = {pos: 2, idx: 0}, value = 2
217+
# labels = {pos: 0, idx: 1}, value = 5
218+
# labels = {pos: 1, idx: 1}, value = 3
219+
# labels = {pos: 2, idx: 1}, value = 1
220+
#
221+
# output: [
222+
# {idx: 0}, [10, 7, 2]
223+
# {idx: 1}, [5, 3, 1]
224+
# ]
225+
#
226+
max_pos = 0
227+
values_by_labels: dict[frozenset[tuple[str, str]], dict[int, int]] = {}
228+
229+
for s in samples:
230+
position = int(s.labels["position"])
231+
max_pos = max(max_pos, position)
232+
233+
labels_key = frozenset(_strip_label(s.labels, "position").items())
234+
if labels_key not in values_by_labels:
235+
values_by_labels[labels_key] = {}
236+
values_by_labels[labels_key][position] = int(s.value)
237+
238+
output = []
239+
for labels_key, values_by_position in values_by_labels.items():
240+
labels = dict(labels_key)
241+
values = [0] * (max_pos + 1)
242+
for pos, val in values_by_position.items():
243+
values[pos] = val
244+
output.append((labels, values))
245+
return output

0 commit comments

Comments
 (0)