Skip to content

Add composite metrics for kubernetes inference gateway metrics protocol #725

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions all_models/inflight_batcher_llm/tensorrt_llm/1/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -1149,6 +1149,11 @@ def create_metrics(self, model: str, version: str, is_v1_model: bool):
"request_type": "context",
**common_labels
}),
"num_waiting_requests":
self.request_metric_family.Metric(labels={
"request_type": "waiting",
**common_labels
}),
# Runtime metrics
"cpu_mem_usage":
self.runtime_memory_metric_family.Metric(labels={
Expand Down Expand Up @@ -1186,6 +1191,11 @@ def create_metrics(self, model: str, version: str, is_v1_model: bool):
"kv_cache_block_type": "tokens_per",
**common_labels
}),
"fraction_used_blocks":
self.kv_cache_metric_family.Metric(labels={
"kv_cache_block_type": "fraction",
**common_labels
}),
# General metrics
"timestamp":
self.general_metric_family.Metric(labels={
Expand Down Expand Up @@ -1493,12 +1503,49 @@ def update_metrics_per_request(self, req_id):
self.all_metrics[METRIC_TOTAL_OUTPUT_TOKENS].observe(output_tokens)
self.all_metrics[METRIC_TOTAL_INPUT_TOKENS].observe(input_tokens)

def get_composite_metric_map(self, stat):
def get_metric(metric_name, family_stats=None):
if family_stats is None:
if hasattr(stat, metric_name):
return getattr(stat, metric_name)
elif stat.kv_cache_stats is not None and hasattr(stat.kv_cache_stats, metric_name):
return getattr(stat.kv_cache_stats, metric_name)
elif stat.static_batching_stats is not None and hasattr(stat.static_batching_stats, metric_name):
return getattr(stat.static_batching_stats, metric_name)
elif stat.inflight_batching_stats is not None and hasattr(stat.inflight_batching_stats, metric_name):
return getattr(stat.inflight_batching_stats, metric_name)
elif family_stats is not None and hasattr(family_stats, metric_name):
return getattr(family_stats, metric_name)
pb_utils.Logger.log_warn(f"Constituent metric \"{metric_name}\" not found.")
return None

composite_metrics = {}

# compute fraction_used_blocks
max_blocks = get_metric("max_num_blocks", stat.kv_cache_stats)
used_blocks = get_metric("used_num_blocks", stat.kv_cache_stats)
if max_blocks is not None and used_blocks is not None:
composite_metrics["fraction_used_blocks"] = 0.0 if max_blocks <= 0 else used_blocks / max_blocks
else:
pb_utils.Logger.log_warn(f"fraction_used_blocks is missing one or more constituent metric.")

# compute num_waiting_requests
active_requests = get_metric("num_active_requests")
scheduled_requests = get_metric("num_scheduled_requests")
if active_requests is not None and scheduled_requests is not None:
composite_metrics["num_waiting_requests"] = active_requests - scheduled_requests
else:
pb_utils.Logger.log_warn(f"num_waiting_requests is missing one or more constituent metric.")

return composite_metrics

def metrics_loop(self):
"""Updates triton metrics using stats from the executor."""
while self.running:
time.sleep(self.stats_check_period_ms / 1000.0)
for stat in self.executor.get_latest_iteration_stats():
try:
composite_metrics = self.get_composite_metric_map(stat)
for key, metric in self.all_metrics.items():
# Skip processing for both histogram metrics
if isinstance(key, str) and key in [
Expand All @@ -1518,6 +1565,8 @@ def metrics_loop(self):
elif stat.inflight_batching_stats is not None and hasattr(
stat.inflight_batching_stats, key):
value = getattr(stat.inflight_batching_stats, key)
elif key in composite_metrics:
value = composite_metrics[key]
if value is not None:
if key == "timestamp":
value = convert_timestamp_to_seconds(value)
Expand Down
2 changes: 2 additions & 0 deletions ci/L0_backend_trtllm/custom_metrics_verification_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"request_type=scheduled": "Scheduled Requests",
"request_type=max": "Max Request Count",
"request_type=active": "Active Request Count",
"request_type=waiting": "Waiting Requests",
"memory_type=pinned": "Runtime Pinned Memory Usage",
"memory_type=gpu": "Runtime GPU Memory Usage",
"memory_type=cpu": "Runtime CPU Memory Usage",
Expand All @@ -45,6 +46,7 @@
"kv_cache_block_type=free": "Free KV cache blocks",
"kv_cache_block_type=max": "Max KV cache blocks",
"kv_cache_block_type=reused": "Reused KV cache blocks",
"kv_cache_block_type=fraction": "Fraction used KV cache blocks",
"inflight_batcher_specific_metric=total_context_tokens":
"Total Context Tokens",
"inflight_batcher_specific_metric=micro_batch_id": "MicroBatch ID",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,18 @@ namespace triton::backend::inflight_batcher_llm::custom_metrics_reporter
{

const std::vector<std::string> CustomMetricsReporter::request_keys_{
"Active Request Count", "Max Request Count", "Scheduled Requests", "Context Requests"};
const std::vector<std::string> CustomMetricsReporter::request_labels_{"active", "max", "scheduled", "context"};
"Active Request Count", "Max Request Count", "Scheduled Requests", "Context Requests", "Waiting Requests"};
const std::vector<std::string> CustomMetricsReporter::request_labels_{
"active", "max", "scheduled", "context", "waiting"};

const std::vector<std::string> CustomMetricsReporter::runtime_memory_keys_{
"Runtime CPU Memory Usage", "Runtime GPU Memory Usage", "Runtime Pinned Memory Usage"};
const std::vector<std::string> CustomMetricsReporter::runtime_memory_labels_{"cpu", "gpu", "pinned"};

const std::vector<std::string> CustomMetricsReporter::kv_cache_keys_{"Max KV cache blocks", "Free KV cache blocks",
"Used KV cache blocks", "Tokens per KV cache block", "Reused KV cache blocks"};
const std::vector<std::string> CustomMetricsReporter::kv_cache_labels_{"max", "free", "used", "tokens_per", "reused"};
"Used KV cache blocks", "Tokens per KV cache block", "Reused KV cache blocks", "Fraction used KV cache blocks"};
const std::vector<std::string> CustomMetricsReporter::kv_cache_labels_{
"max", "free", "used", "tokens_per", "reused", "fraction"};

const std::vector<std::string> CustomMetricsReporter::dis_serving_keys_{"KV cache transfer time", "Request count"};
const std::vector<std::string> CustomMetricsReporter::dis_serving_labels_{"kv_cache_transfer_ms", "request_count"};
Expand Down
9 changes: 9 additions & 0 deletions inflight_batcher_llm/src/model_instance_state.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1426,6 +1426,7 @@ void ModelInstanceState::WaitForStats()
statJson.append("\"Paused Requests\":" + std::to_string(modelStats.numPausedRequests) + ",");
statJson.append("\"Scheduled Requests\":" + std::to_string(modelStats.numScheduledRequests) + ",");
statJson.append("\"Total Context Tokens\":" + std::to_string(modelStats.numCtxTokens) + ",");
statJson.append("\"Waiting Requests\":" + std::to_string(stat.numActiveRequests - modelStats.numScheduledRequests) + ",");
}
else if (stat.staticBatchingStats.has_value())
{
Expand All @@ -1435,6 +1436,7 @@ void ModelInstanceState::WaitForStats()
statJson.append("\"Total Context Tokens\":" + std::to_string(modelStats.numCtxTokens) + ",");
statJson.append("\"Total Generation Tokens\":" + std::to_string(modelStats.numGenTokens) + ",");
statJson.append("\"Empty Generation Slots\":" + std::to_string(modelStats.emptyGenSlots) + ",");
statJson.append("\"Waiting Requests\":" + std::to_string(stat.numActiveRequests - modelStats.numScheduledRequests) + ",");
}
else
{
Expand All @@ -1450,6 +1452,13 @@ void ModelInstanceState::WaitForStats()
statJson.append("\"Tokens per KV cache block\":" + std::to_string(kvStats.tokensPerBlock) + ",");
statJson.append("\"Used KV cache blocks\":" + std::to_string(kvStats.usedNumBlocks) + ",");
statJson.append("\"Reused KV cache blocks\":" + std::to_string(kvStats.reusedBlocks) + ",");
// Calculate and append the used KV cache block fraction.
double fraction = 0.0;
if (static_cast<double>(kvStats.maxNumBlocks) > 0.0)
{
fraction = static_cast<double>(kvStats.usedNumBlocks) / static_cast<double>(kvStats.maxNumBlocks);
}
statJson.append("\"Fraction used KV cache blocks\":" + std::to_string(fraction) + ",");
}

// requestStats is a list where each item is associated with an iteration,
Expand Down