Skip to content

Commit ed2db58

Browse files
sandereggmrnicegyu11
authored andcommitted
🐛⚗️Prometheus instrumentation incorrectly setup (ITISFoundation#6398)
1 parent b44eb75 commit ed2db58

File tree

10 files changed

+126
-76
lines changed

10 files changed

+126
-76
lines changed

packages/service-library/src/servicelib/fastapi/prometheus_instrumentation.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,27 @@
22

33

44
from fastapi import FastAPI
5+
from prometheus_client import CollectorRegistry
56
from prometheus_fastapi_instrumentator import Instrumentator
67

78

89
def setup_prometheus_instrumentation(app: FastAPI) -> Instrumentator:
10+
# NOTE: use that registry to prevent having a global one
11+
app.state.prometheus_registry = registry = CollectorRegistry(auto_describe=True)
12+
instrumentator = Instrumentator(
13+
should_instrument_requests_inprogress=False, # bug in https://github.com/trallnag/prometheus-fastapi-instrumentator/issues/317
14+
inprogress_labels=False,
15+
registry=registry,
16+
).instrument(app)
917

10-
instrumentator = (
11-
Instrumentator(
12-
should_instrument_requests_inprogress=True, inprogress_labels=False
13-
)
14-
.instrument(app)
15-
.expose(app, include_in_schema=False)
16-
)
18+
async def _on_startup() -> None:
19+
instrumentator.expose(app, include_in_schema=False)
1720

18-
def _unregister():
19-
for collector in list(instrumentator.registry._collector_to_names.keys()):
20-
instrumentator.registry.unregister(collector)
21+
def _unregister() -> None:
22+
# NOTE: avoid registering collectors multiple times when running unittests consecutively (https://stackoverflow.com/a/62489287)
23+
for collector in list(registry._collector_to_names.keys()): # noqa: SLF001
24+
registry.unregister(collector)
2125

22-
# avoid registering collectors multiple times when running unittests consecutively (https://stackoverflow.com/a/62489287)
26+
app.add_event_handler("startup", _on_startup)
2327
app.add_event_handler("shutdown", _unregister)
2428
return instrumentator
Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
""" Adds fastapi middleware for tracing using opentelemetry instrumentation.
22
33
"""
4+
45
import logging
56

67
from fastapi import FastAPI
78
from opentelemetry import trace
89
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
910
OTLPSpanExporter as OTLPSpanExporterHTTP,
1011
)
11-
from opentelemetry.instrumentation.fastapi import (
12-
FastAPIInstrumentor, # pylint: disable=no-name-in-module
13-
)
12+
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
1413
from opentelemetry.sdk.resources import Resource
1514
from opentelemetry.sdk.trace import TracerProvider
1615
from opentelemetry.sdk.trace.export import BatchSpanProcessor
@@ -21,24 +20,19 @@
2120

2221
def setup_tracing(
2322
app: FastAPI, tracing_settings: TracingSettings, service_name: str
24-
) -> FastAPIInstrumentor | None:
23+
) -> None:
2524
if (
2625
not tracing_settings.TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT
2726
and not tracing_settings.TRACING_OPENTELEMETRY_COLLECTOR_PORT
2827
):
2928
log.warning("Skipping opentelemetry tracing setup")
30-
return None
31-
if (
32-
not tracing_settings.TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT
33-
or not tracing_settings.TRACING_OPENTELEMETRY_COLLECTOR_PORT
34-
):
35-
raise RuntimeError(
36-
f"Variable opentelemetry_collector_endpoint [{tracing_settings.TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT}] or opentelemetry_collector_port [{tracing_settings.TRACING_OPENTELEMETRY_COLLECTOR_PORT}] unset. Tracing options incomplete."
37-
)
29+
return
30+
3831
# Set up the tracer provider
3932
resource = Resource(attributes={"service.name": service_name})
4033
trace.set_tracer_provider(TracerProvider(resource=resource))
41-
tracer_provider = trace.get_tracer_provider()
34+
global_tracer_provider = trace.get_tracer_provider()
35+
assert isinstance(global_tracer_provider, TracerProvider) # nosec
4236
tracing_destination: str = f"{tracing_settings.TRACING_OPENTELEMETRY_COLLECTOR_ENDPOINT}:{tracing_settings.TRACING_OPENTELEMETRY_COLLECTOR_PORT}/v1/traces"
4337
log.info(
4438
"Trying to connect service %s to tracing collector at %s.",
@@ -48,7 +42,6 @@ def setup_tracing(
4842
# Configure OTLP exporter to send spans to the collector
4943
otlp_exporter = OTLPSpanExporterHTTP(endpoint=tracing_destination)
5044
span_processor = BatchSpanProcessor(otlp_exporter)
51-
# Mypy bug --> https://github.com/open-telemetry/opentelemetry-python/issues/3713
52-
tracer_provider.add_span_processor(span_processor) # type: ignore[attr-defined]
45+
global_tracer_provider.add_span_processor(span_processor)
5346
# Instrument FastAPI
54-
return FastAPIInstrumentor().instrument_app(app) # type: ignore[no-any-return]
47+
FastAPIInstrumentor().instrument_app(app)
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,13 @@
1+
from dataclasses import dataclass
2+
3+
from prometheus_client import CollectorRegistry
4+
5+
6+
@dataclass(slots=True, kw_only=True)
7+
class MetricsBase:
8+
subsystem: str
9+
registry: CollectorRegistry
10+
11+
112
def get_metrics_namespace(application_name: str) -> str:
213
return application_name.replace("-", "_")

services/agent/tests/unit/test_core_routes.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,15 @@ def test_client(initialized_app: FastAPI) -> TestClient:
2929
def test_health_ok(env: None, test_client: TestClient):
3030
response = test_client.get("/health")
3131
assert response.status_code == status.HTTP_200_OK
32-
assert response.json() == None
32+
assert response.json() is None
3333

3434

3535
def test_health_fails_not_started(
3636
env: None, initialized_app: FastAPI, test_client: TestClient
3737
):
3838
task_monitor: TaskMonitor = initialized_app.state.task_monitor
3939
# emulate monitor not being started
40-
task_monitor._was_started = False
40+
task_monitor._was_started = False # noqa: SLF001
4141

4242
response = test_client.get("/health")
4343
assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE
@@ -50,8 +50,8 @@ def test_health_fails_hanging_tasks(
5050
task_monitor: TaskMonitor = initialized_app.state.task_monitor
5151

5252
# emulate tasks hanging
53-
for task_data in task_monitor._to_start.values():
54-
task_data._start_time = time() - 1e6
53+
for task_data in task_monitor._to_start.values(): # noqa: SLF001
54+
task_data._start_time = time() - 1e6 # noqa: SLF001
5555

5656
response = test_client.get("/health")
5757
assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE

services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_core.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@ async def on_startup() -> None:
2222
metrics_subsystem = (
2323
"dynamic" if app_settings.AUTOSCALING_NODES_MONITORING else "computational"
2424
)
25-
app.state.instrumentation = AutoscalingInstrumentation(
26-
registry=instrumentator.registry, subsystem=metrics_subsystem
25+
app.state.instrumentation = (
26+
AutoscalingInstrumentation( # pylint: disable=unexpected-keyword-arg
27+
registry=instrumentator.registry, subsystem=metrics_subsystem
28+
)
2729
)
2830

2931
async def on_shutdown() -> None:

services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_models.py

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from typing import Final
33

44
from prometheus_client import CollectorRegistry, Counter, Histogram
5+
from servicelib.instrumentation import MetricsBase
56

67
from ...models import BufferPoolManager, Cluster
78
from ._constants import (
@@ -13,11 +14,6 @@
1314
from ._utils import TrackedGauge, create_gauge
1415

1516

16-
@dataclass(slots=True, kw_only=True)
17-
class MetricsBase:
18-
subsystem: str
19-
20-
2117
@dataclass(slots=True, kw_only=True)
2218
class ClusterMetrics(MetricsBase): # pylint: disable=too-many-instance-attributes
2319
active_nodes: TrackedGauge = field(init=False)
@@ -36,7 +32,12 @@ def __post_init__(self) -> None:
3632
cluster_subsystem = f"{self.subsystem}_cluster"
3733
# Creating and assigning gauges using the field names and the metric definitions
3834
for field_name, definition in CLUSTER_METRICS_DEFINITIONS.items():
39-
gauge = create_gauge(field_name, definition, cluster_subsystem)
35+
gauge = create_gauge(
36+
field_name=field_name,
37+
definition=definition,
38+
subsystem=cluster_subsystem,
39+
registry=self.registry,
40+
)
4041
setattr(self, field_name, gauge)
4142

4243
def update_from_cluster(self, cluster: Cluster) -> None:
@@ -65,27 +66,31 @@ def __post_init__(self) -> None:
6566
labelnames=EC2_INSTANCE_LABELS,
6667
namespace=METRICS_NAMESPACE,
6768
subsystem=self.subsystem,
69+
registry=self.registry,
6870
)
6971
self.started_instances = Counter(
7072
"started_instances_total",
7173
"Number of EC2 instances that were started",
7274
labelnames=EC2_INSTANCE_LABELS,
7375
namespace=METRICS_NAMESPACE,
7476
subsystem=self.subsystem,
77+
registry=self.registry,
7578
)
7679
self.stopped_instances = Counter(
7780
"stopped_instances_total",
7881
"Number of EC2 instances that were stopped",
7982
labelnames=EC2_INSTANCE_LABELS,
8083
namespace=METRICS_NAMESPACE,
8184
subsystem=self.subsystem,
85+
registry=self.registry,
8286
)
8387
self.terminated_instances = Counter(
8488
"terminated_instances_total",
8589
"Number of EC2 instances that were terminated",
8690
labelnames=EC2_INSTANCE_LABELS,
8791
namespace=METRICS_NAMESPACE,
8892
subsystem=self.subsystem,
93+
registry=self.registry,
8994
)
9095

9196
def instance_started(self, instance_type: str) -> None:
@@ -123,7 +128,12 @@ def __post_init__(self) -> None:
123128
setattr(
124129
self,
125130
field_name,
126-
create_gauge(field_name, definition, buffer_pools_subsystem),
131+
create_gauge(
132+
field_name=field_name,
133+
definition=definition,
134+
subsystem=buffer_pools_subsystem,
135+
registry=self.registry,
136+
),
127137
)
128138
self.instances_ready_to_pull_seconds = Histogram(
129139
"instances_ready_to_pull_duration_seconds",
@@ -132,6 +142,7 @@ def __post_init__(self) -> None:
132142
namespace=METRICS_NAMESPACE,
133143
subsystem=buffer_pools_subsystem,
134144
buckets=(10, 20, 30, 40, 50, 60, 120),
145+
registry=self.registry,
135146
)
136147
self.instances_completed_pulling_seconds = Histogram(
137148
"instances_completed_pulling_duration_seconds",
@@ -150,6 +161,7 @@ def __post_init__(self) -> None:
150161
30 * _MINUTE,
151162
40 * _MINUTE,
152163
),
164+
registry=self.registry,
153165
)
154166

155167
def update_from_buffer_pool_manager(
@@ -174,8 +186,16 @@ class AutoscalingInstrumentation(MetricsBase):
174186
buffer_machines_pools_metrics: BufferPoolsMetrics = field(init=False)
175187

176188
def __post_init__(self) -> None:
177-
self.cluster_metrics = ClusterMetrics(subsystem=self.subsystem)
178-
self.ec2_client_metrics = EC2ClientMetrics(subsystem=self.subsystem)
179-
self.buffer_machines_pools_metrics = BufferPoolsMetrics(
180-
subsystem=self.subsystem
189+
self.cluster_metrics = ClusterMetrics( # pylint: disable=unexpected-keyword-arg
190+
subsystem=self.subsystem, registry=self.registry
191+
)
192+
self.ec2_client_metrics = (
193+
EC2ClientMetrics( # pylint: disable=unexpected-keyword-arg
194+
subsystem=self.subsystem, registry=self.registry
195+
)
196+
)
197+
self.buffer_machines_pools_metrics = (
198+
BufferPoolsMetrics( # pylint: disable=unexpected-keyword-arg
199+
subsystem=self.subsystem, registry=self.registry
200+
)
181201
)

services/autoscaling/src/simcore_service_autoscaling/modules/instrumentation/_utils.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from dataclasses import dataclass, field
44

55
from aws_library.ec2._models import EC2InstanceData
6-
from prometheus_client import Gauge
6+
from prometheus_client import CollectorRegistry, Gauge
77

88
from ._constants import METRICS_NAMESPACE
99

@@ -27,9 +27,11 @@ def update_from_instances(self, instances: Iterable[EC2InstanceData]) -> None:
2727

2828

2929
def create_gauge(
30+
*,
3031
field_name: str,
3132
definition: tuple[str, tuple[str, ...]],
3233
subsystem: str,
34+
registry: CollectorRegistry,
3335
) -> TrackedGauge:
3436
description, labelnames = definition
3537
return TrackedGauge(
@@ -39,5 +41,6 @@ def create_gauge(
3941
labelnames=labelnames,
4042
namespace=METRICS_NAMESPACE,
4143
subsystem=subsystem,
44+
registry=registry,
4245
)
4346
)

services/autoscaling/tests/unit/conftest.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -378,11 +378,17 @@ def enabled_rabbitmq(
378378
return rabbit_service
379379

380380

381+
_LIFESPAN_TIMEOUT: Final[int] = 10
382+
383+
381384
@pytest.fixture
382385
async def initialized_app(app_environment: EnvVarsDict) -> AsyncIterator[FastAPI]:
383386
settings = ApplicationSettings.create_from_envs()
384387
app = create_app(settings)
385-
async with LifespanManager(app):
388+
# NOTE: the timeout is sometime too small for CI machines, and even larger machines
389+
async with LifespanManager(
390+
app, startup_timeout=_LIFESPAN_TIMEOUT, shutdown_timeout=_LIFESPAN_TIMEOUT
391+
):
386392
yield app
387393

388394

services/autoscaling/tests/unit/test_modules_instrumentation_utils.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
from typing import TypedDict
33

44
from aws_library.ec2._models import EC2InstanceData
5+
from prometheus_client import CollectorRegistry
56
from prometheus_client.metrics import MetricWrapperBase
67
from simcore_service_autoscaling.modules.instrumentation._constants import (
78
EC2_INSTANCE_LABELS,
@@ -40,10 +41,12 @@ def test_update_gauge_sets_old_entries_to_0(
4041
fake_ec2_instance_data: Callable[..., EC2InstanceData]
4142
):
4243
# Create a Gauge with example labels
44+
registry = CollectorRegistry()
4345
tracked_gauge = create_gauge(
44-
"example_gauge",
46+
field_name="example_gauge",
4547
definition=("An example gauge", EC2_INSTANCE_LABELS),
4648
subsystem="whatever",
49+
registry=registry,
4750
)
4851

4952
ec2_instance_type_1 = fake_ec2_instance_data()

0 commit comments

Comments
 (0)