Skip to content

Commit dce7fdb

Browse files
lzchenl0lawrence
authored andcommitted
Implement live metrics filtering for charts (part 1) (Azure#37998)
1 parent 55ec9d9 commit dce7fdb

File tree

13 files changed

+1067
-115
lines changed

13 files changed

+1067
-115
lines changed

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_constants.py

+10-11
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Copyright (c) Microsoft Corporation. All rights reserved.
22
# Licensed under the MIT License.
3-
from enum import Enum
3+
import sys
44

55
# cSpell:disable
66

@@ -47,21 +47,20 @@
4747
_LONG_PING_INTERVAL_SECONDS = 60
4848
_POST_CANCEL_INTERVAL_SECONDS = 20
4949

50-
51-
# Live metrics data types
52-
class _DocumentIngressDocumentType(Enum):
53-
Request = "Request"
54-
RemoteDependency = "RemoteDependency"
55-
Exception = "Exception"
56-
Event = "Event"
57-
Trace = "Trace"
58-
59-
6050
# Response Headers
6151

6252
_QUICKPULSE_ETAG_HEADER_NAME = "x-ms-qps-configuration-etag"
6353
_QUICKPULSE_POLLING_HEADER_NAME = "x-ms-qps-service-polling-interval-hint"
6454
_QUICKPULSE_REDIRECT_HEADER_NAME = "x-ms-qps-service-endpoint-redirect-v2"
6555
_QUICKPULSE_SUBSCRIBED_HEADER_NAME = "x-ms-qps-subscribed"
6656

57+
# Projections (filtering)
58+
59+
_QUICKPULSE_PROJECTION_COUNT = "Count()"
60+
_QUICKPULSE_PROJECTION_DURATION = "Duration"
61+
_QUICKPULSE_PROJECTION_CUSTOM = "CustomDimensions."
62+
63+
_QUICKPULSE_PROJECTION_MAX_VALUE = sys.maxsize
64+
_QUICKPULSE_PROJECTION_MIN_VALUE = -sys.maxsize - 1
65+
6766
# cSpell:enable

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_exporter.py

+59-22
Original file line numberDiff line numberDiff line change
@@ -23,27 +23,30 @@
2323
MetricReader,
2424
)
2525

26-
from azure.core.exceptions import HttpResponseError
2726
from azure.core.pipeline.policies import ContentDecodePolicy
2827
from azure.monitor.opentelemetry.exporter._quickpulse._constants import (
2928
_LONG_PING_INTERVAL_SECONDS,
3029
_POST_CANCEL_INTERVAL_SECONDS,
3130
_POST_INTERVAL_SECONDS,
31+
_QUICKPULSE_ETAG_HEADER_NAME,
3232
_QUICKPULSE_SUBSCRIBED_HEADER_NAME,
3333
)
3434
from azure.monitor.opentelemetry.exporter._quickpulse._generated._configuration import QuickpulseClientConfiguration
3535
from azure.monitor.opentelemetry.exporter._quickpulse._generated._client import QuickpulseClient
3636
from azure.monitor.opentelemetry.exporter._quickpulse._generated.models import MonitoringDataPoint
3737
from azure.monitor.opentelemetry.exporter._quickpulse._policy import _QuickpulseRedirectPolicy
3838
from azure.monitor.opentelemetry.exporter._quickpulse._state import (
39+
_get_and_clear_quickpulse_documents,
3940
_get_global_quickpulse_state,
41+
_get_quickpulse_etag,
4042
_is_ping_state,
4143
_set_global_quickpulse_state,
42-
_get_and_clear_quickpulse_documents,
44+
_set_quickpulse_etag,
4345
_QuickpulseState,
4446
)
4547
from azure.monitor.opentelemetry.exporter._quickpulse._utils import (
4648
_metric_to_quick_pulse_data_points,
49+
_update_filter_configuration,
4750
)
4851
from azure.monitor.opentelemetry.exporter._connection_string_parser import ConnectionStringParser
4952
from azure.monitor.opentelemetry.exporter._utils import (
@@ -143,13 +146,14 @@ def export(
143146
base_monitoring_data_point=base_monitoring_data_point,
144147
documents=_get_and_clear_quickpulse_documents(),
145148
)
146-
149+
configuration_etag = _get_quickpulse_etag() or ""
147150
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
148151
try:
149152
post_response = self._client.publish( # type: ignore
150153
endpoint=self._live_endpoint,
151154
monitoring_data_points=data_points,
152-
ikey=self._instrumentation_key,
155+
ikey=self._instrumentation_key, # type: ignore
156+
configuration_etag=configuration_etag,
153157
transmission_time=_ticks_since_dot_net_epoch(),
154158
cls=_Response,
155159
)
@@ -163,6 +167,19 @@ def export(
163167
if header != "true":
164168
# User leaving the live metrics page will be treated as an unsuccessful
165169
result = MetricExportResult.FAILURE
170+
else:
171+
# Check if etag has changed
172+
etag = post_response._response_headers.get( # pylint: disable=protected-access
173+
_QUICKPULSE_ETAG_HEADER_NAME # pylint: disable=protected-access
174+
)
175+
if etag and etag != configuration_etag:
176+
config = (
177+
post_response._pipeline_response.http_response.content # pylint: disable=protected-access
178+
)
179+
# Content will only be populated if configuration has changed (etag is different)
180+
if config:
181+
# Update and apply configuration changes
182+
_update_filter_configuration(etag, config)
166183
except Exception: # pylint: disable=broad-except,invalid-name
167184
_logger.exception("Exception occurred while publishing live metrics.")
168185
result = MetricExportResult.FAILURE
@@ -201,21 +218,23 @@ def shutdown(
201218
def _ping(self, monitoring_data_point: MonitoringDataPoint) -> Optional[_Response]:
202219
ping_response = None
203220
token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True))
221+
etag = _get_quickpulse_etag() or ""
204222
try:
205223
ping_response = self._client.is_subscribed( # type: ignore
206224
endpoint=self._live_endpoint,
207225
monitoring_data_point=monitoring_data_point,
208-
ikey=self._instrumentation_key,
226+
ikey=self._instrumentation_key, # type: ignore
209227
transmission_time=_ticks_since_dot_net_epoch(),
210228
machine_name=monitoring_data_point.machine_name,
211229
instance_name=monitoring_data_point.instance,
212230
stream_id=monitoring_data_point.stream_id,
213231
role_name=monitoring_data_point.role_name,
214-
invariant_version=monitoring_data_point.invariant_version,
232+
invariant_version=monitoring_data_point.invariant_version, # type: ignore
233+
configuration_etag=etag,
215234
cls=_Response,
216235
)
217236
return ping_response # type: ignore
218-
except HttpResponseError:
237+
except Exception: # pylint: disable=broad-except,invalid-name
219238
_logger.exception("Exception occurred while pinging live metrics.")
220239
detach(token)
221240
return ping_response
@@ -243,28 +262,42 @@ def __init__(
243262
)
244263
self._worker.start()
245264

265+
# pylint: disable=protected-access
266+
# pylint: disable=too-many-nested-blocks
246267
def _ticker(self) -> None:
247268
if _is_ping_state():
248269
# Send a ping if elapsed number of request meets the threshold
249270
if self._elapsed_num_seconds % _get_global_quickpulse_state().value == 0:
250-
ping_response = self._exporter._ping( # pylint: disable=protected-access
271+
ping_response = self._exporter._ping(
251272
self._base_monitoring_data_point,
252273
)
253274
if ping_response:
254-
header = ping_response._response_headers.get( # pylint: disable=protected-access
255-
_QUICKPULSE_SUBSCRIBED_HEADER_NAME
256-
)
257-
if header and header == "true":
258-
# Switch state to post if subscribed
259-
_set_global_quickpulse_state(_QuickpulseState.POST_SHORT)
260-
self._elapsed_num_seconds = 0
261-
else:
262-
# Backoff after _LONG_PING_INTERVAL_SECONDS (60s) of no successful requests
263-
if (
264-
_get_global_quickpulse_state() is _QuickpulseState.PING_SHORT
265-
and self._elapsed_num_seconds >= _LONG_PING_INTERVAL_SECONDS
266-
):
267-
_set_global_quickpulse_state(_QuickpulseState.PING_LONG)
275+
try:
276+
subscribed = ping_response._response_headers.get(_QUICKPULSE_SUBSCRIBED_HEADER_NAME)
277+
if subscribed and subscribed == "true":
278+
# Switch state to post if subscribed
279+
_set_global_quickpulse_state(_QuickpulseState.POST_SHORT)
280+
self._elapsed_num_seconds = 0
281+
# Update config etag
282+
etag = ping_response._response_headers.get(_QUICKPULSE_ETAG_HEADER_NAME)
283+
if etag is None:
284+
etag = ""
285+
if _get_quickpulse_etag() != etag:
286+
_set_quickpulse_etag(etag)
287+
# TODO: Set default document filter config from response body
288+
# config = ping_response._pipeline_response.http_response.content
289+
else:
290+
# Backoff after _LONG_PING_INTERVAL_SECONDS (60s) of no successful requests
291+
if (
292+
_get_global_quickpulse_state() is _QuickpulseState.PING_SHORT
293+
and self._elapsed_num_seconds >= _LONG_PING_INTERVAL_SECONDS
294+
):
295+
_set_global_quickpulse_state(_QuickpulseState.PING_LONG)
296+
# Reset etag to default if not subscribed
297+
_set_quickpulse_etag("")
298+
except Exception: # pylint: disable=broad-except,invalid-name
299+
_logger.exception("Exception occurred while pinging live metrics.")
300+
_set_quickpulse_etag("")
268301
# TODO: Implement redirect
269302
else:
270303
# Erroneous ping responses instigate backoff logic
@@ -274,6 +307,8 @@ def _ticker(self) -> None:
274307
and self._elapsed_num_seconds >= _LONG_PING_INTERVAL_SECONDS
275308
):
276309
_set_global_quickpulse_state(_QuickpulseState.PING_LONG)
310+
# Reset etag to default if error
311+
_set_quickpulse_etag("")
277312
else:
278313
try:
279314
self.collect()
@@ -283,6 +318,8 @@ def _ticker(self) -> None:
283318
# And resume pinging
284319
if self._elapsed_num_seconds >= _POST_CANCEL_INTERVAL_SECONDS:
285320
_set_global_quickpulse_state(_QuickpulseState.PING_SHORT)
321+
# Reset etag to default
322+
_set_quickpulse_etag("")
286323
self._elapsed_num_seconds = 0
287324

288325
self._elapsed_num_seconds += 1

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_live_metrics.py

+57-29
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from datetime import datetime
55
from typing import Any, Iterable
66

7+
import logging
78
import platform
89
import psutil
910

@@ -39,6 +40,7 @@
3940
_QuickpulseState,
4041
_is_post_state,
4142
_append_quickpulse_document,
43+
_get_quickpulse_derived_metric_infos,
4244
_get_quickpulse_last_process_cpu,
4345
_get_quickpulse_last_process_time,
4446
_get_quickpulse_process_elapsed_time,
@@ -47,7 +49,9 @@
4749
_set_quickpulse_last_process_time,
4850
_set_quickpulse_process_elapsed_time,
4951
)
52+
from azure.monitor.opentelemetry.exporter._quickpulse._types import _TelemetryData
5053
from azure.monitor.opentelemetry.exporter._quickpulse._utils import (
54+
_derive_metrics_from_telemetry_data,
5155
_get_log_record_document,
5256
_get_span_document,
5357
)
@@ -61,6 +65,8 @@
6165
Singleton,
6266
)
6367

68+
_logger = logging.getLogger(__name__)
69+
6470

6571
PROCESS = psutil.Process()
6672
NUM_CPUS = psutil.cpu_count()
@@ -93,7 +99,8 @@ def __init__(self, **kwargs: Any) -> None:
9399
id_generator = RandomIdGenerator()
94100
self._base_monitoring_data_point = MonitoringDataPoint(
95101
version=_get_sdk_version(),
96-
invariant_version=1,
102+
# Invariant version 5 indicates filtering is supported
103+
invariant_version=5,
97104
instance=part_a_fields.get(ContextTagKeys.AI_CLOUD_ROLE_INSTANCE, ""),
98105
role_name=part_a_fields.get(ContextTagKeys.AI_CLOUD_ROLE, ""),
99106
machine_name=platform.node(),
@@ -152,39 +159,60 @@ def __init__(self, **kwargs: Any) -> None:
152159
def _record_span(self, span: ReadableSpan) -> None:
153160
# Only record if in post state
154161
if _is_post_state():
155-
document = _get_span_document(span)
156-
_append_quickpulse_document(document)
157-
duration_ms = 0
158-
if span.end_time and span.start_time:
159-
duration_ms = (span.end_time - span.start_time) / 1e9 # type: ignore
160-
# TODO: Spec out what "success" is
161-
success = span.status.is_ok
162-
163-
if span.kind in (SpanKind.SERVER, SpanKind.CONSUMER):
164-
if success:
165-
self._request_rate_counter.add(1)
166-
else:
167-
self._request_failed_rate_counter.add(1)
168-
self._request_duration.record(duration_ms)
169-
else:
170-
if success:
171-
self._dependency_rate_counter.add(1)
162+
try:
163+
document = _get_span_document(span)
164+
_append_quickpulse_document(document)
165+
duration_ms = 0
166+
if span.end_time and span.start_time:
167+
duration_ms = (span.end_time - span.start_time) / 1e9 # type: ignore
168+
# TODO: Spec out what "success" is
169+
success = span.status.is_ok
170+
171+
if span.kind in (SpanKind.SERVER, SpanKind.CONSUMER):
172+
if success:
173+
self._request_rate_counter.add(1)
174+
else:
175+
self._request_failed_rate_counter.add(1)
176+
self._request_duration.record(duration_ms)
172177
else:
173-
self._dependency_failure_rate_counter.add(1)
174-
self._dependency_duration.record(duration_ms)
178+
if success:
179+
self._dependency_rate_counter.add(1)
180+
else:
181+
self._dependency_failure_rate_counter.add(1)
182+
self._dependency_duration.record(duration_ms)
183+
184+
metric_infos_dict = _get_quickpulse_derived_metric_infos()
185+
# check if filtering is enabled
186+
if metric_infos_dict:
187+
# Derive metrics for quickpulse filtering
188+
data = _TelemetryData._from_span(span)
189+
_derive_metrics_from_telemetry_data(data)
190+
# TODO: derive exception metrics from span events
191+
except Exception: # pylint: disable=broad-except
192+
_logger.exception("Exception occurred while recording span.")
175193

176194
def _record_log_record(self, log_data: LogData) -> None:
177195
# Only record if in post state
178196
if _is_post_state():
179-
if log_data.log_record:
180-
log_record = log_data.log_record
181-
if log_record.attributes:
182-
document = _get_log_record_document(log_data)
183-
_append_quickpulse_document(document)
184-
exc_type = log_record.attributes.get(SpanAttributes.EXCEPTION_TYPE)
185-
exc_message = log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE)
186-
if exc_type is not None or exc_message is not None:
187-
self._exception_rate_counter.add(1)
197+
try:
198+
if log_data.log_record:
199+
log_record = log_data.log_record
200+
if log_record.attributes:
201+
document = _get_log_record_document(log_data)
202+
_append_quickpulse_document(document)
203+
exc_type = log_record.attributes.get(SpanAttributes.EXCEPTION_TYPE)
204+
exc_message = log_record.attributes.get(SpanAttributes.EXCEPTION_MESSAGE)
205+
if exc_type is not None or exc_message is not None:
206+
self._exception_rate_counter.add(1)
207+
208+
metric_infos_dict = _get_quickpulse_derived_metric_infos()
209+
# check if filtering is enabled
210+
if metric_infos_dict:
211+
# Derive metrics for quickpulse filtering
212+
data = _TelemetryData._from_log_record(log_record)
213+
_derive_metrics_from_telemetry_data(data)
214+
except Exception: # pylint: disable=broad-except
215+
_logger.exception("Exception occurred while recording log record.")
188216

189217

190218
# pylint: disable=unused-argument

sdk/monitor/azure-monitor-opentelemetry-exporter/azure/monitor/opentelemetry/exporter/_quickpulse/_processor.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
# pylint: disable=protected-access
1111
class _QuickpulseLogRecordProcessor(LogRecordProcessor):
1212

13-
def emit(self, log_data: LogData) -> None:
13+
def emit(self, log_data: LogData) -> None: # type: ignore
1414
qpm = _QuickpulseManager._instance
1515
if qpm:
1616
qpm._record_log_record(log_data)

0 commit comments

Comments
 (0)