Skip to content

Commit 8b57f1c

Browse files
✨ Is994/resource tracking service auto populate missing data (#4420)
1 parent bfe4d86 commit 8b57f1c

19 files changed

+329
-70
lines changed

services/docker-compose.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,8 @@ services:
198198
- PROMETHEUS_USERNAME=${RESOURCE_USAGE_TRACKER_PROMETHEUS_USERNAME}
199199
- PROMETHEUS_PASSWORD=${RESOURCE_USAGE_TRACKER_PROMETHEUS_PASSWORD}
200200
- RESOURCE_USAGE_TRACKER_LOGLEVEL=${LOG_LEVEL:-INFO}
201+
- REDIS_HOST=${REDIS_HOST}
202+
- REDIS_PORT=${REDIS_PORT}
201203

202204
static-webserver:
203205
image: ${DOCKER_REGISTRY:-itisfoundation}/static-webserver:${DOCKER_IMAGE_TAG:-latest}

services/resource-usage-tracker/requirements/_test.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ coverage
1616
docker
1717
faker
1818
httpx
19+
fakeredis[lua]
1920
pytest
2021
pytest-asyncio
2122
pytest-cov

services/resource-usage-tracker/requirements/_test.txt

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,14 @@ anyio==3.6.2
1414
# httpcore
1515
asgi-lifespan==2.1.0
1616
# via -r _test.in
17+
async-timeout==4.0.2
18+
# via
19+
# -c _base.txt
20+
# redis
21+
asyncpg==0.27.0
22+
# via
23+
# -c _base.txt
24+
# sqlalchemy
1725
certifi==2023.5.7
1826
# via
1927
# -c _base.txt
@@ -34,6 +42,8 @@ exceptiongroup==1.1.1
3442
# via pytest
3543
faker==18.10.1
3644
# via -r _test.in
45+
fakeredis[lua]==2.15.0
46+
# via -r _test.in
3747
greenlet==2.0.2
3848
# via
3949
# -c _base.txt
@@ -60,6 +70,8 @@ idna==3.4
6070
# requests
6171
iniconfig==2.0.0
6272
# via pytest
73+
lupa==1.14.1
74+
# via fakeredis
6375
mako==1.2.4
6476
# via
6577
# -c ../../../requirements/constraints.txt
@@ -110,6 +122,11 @@ python-dotenv==1.0.0
110122
# via
111123
# -c _base.txt
112124
# -r _test.in
125+
redis==4.5.5
126+
# via
127+
# -c ../../../requirements/constraints.txt
128+
# -c _base.txt
129+
# fakeredis
113130
requests==2.30.0
114131
# via
115132
# -c _base.txt
@@ -131,7 +148,9 @@ sniffio==1.3.0
131148
# asgi-lifespan
132149
# httpcore
133150
# httpx
134-
sqlalchemy[asyncio,mypy,postgresql_psycopg2binary]==1.4.48
151+
sortedcontainers==2.4.0
152+
# via fakeredis
153+
sqlalchemy[mypy,postgresql_asyncpg,postgresql_psycopg2binary]==1.4.48
135154
# via
136155
# -c ../../../requirements/constraints.txt
137156
# -c _base.txt

services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/core/application.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from ..api.routes import setup_api_routes
1515
from ..modules.db import setup as setup_db
1616
from ..modules.prometheus import setup as setup_prometheus_api_client
17+
from ..modules.redis import setup as setup_redis
1718
from ..resource_tracker import setup as setup_background_task
1819
from .settings import ApplicationSettings
1920

@@ -46,6 +47,7 @@ def create_app(settings: ApplicationSettings) -> FastAPI:
4647
setup_prometheus_api_client(app)
4748
if settings.RESOURCE_USAGE_TRACKER_POSTGRES:
4849
setup_db(app)
50+
setup_redis(app)
4951

5052
setup_background_task(app)
5153

services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/core/settings.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from settings_library.basic_types import BuildTargetEnum, LogLevel, VersionTag
99
from settings_library.postgres import PostgresSettings
1010
from settings_library.prometheus import PrometheusSettings
11+
from settings_library.redis import RedisSettings
1112
from settings_library.utils_logging import MixinLoggingSettings
1213

1314
from .._meta import API_VERSION, API_VTAG, PROJECT_NAME
@@ -88,6 +89,8 @@ class MinimalApplicationSettings(_BaseApplicationSettings):
8889
auto_default_from_env=True
8990
)
9091

92+
RESOURCE_USAGE_TRACKER_REDIS: RedisSettings = Field(auto_default_from_env=True)
93+
9194

9295
class ApplicationSettings(MinimalApplicationSettings):
9396
"""Web app's environment variables

services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/modules/db/repositories/resource_tracker.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import logging
2+
from datetime import datetime
23

4+
import sqlalchemy as sa
35
from simcore_postgres_database.models.resource_tracker import resource_tracker_container
4-
from sqlalchemy import func
56
from sqlalchemy.dialects.postgresql import insert as pg_insert
67

78
from ....models.resource_tracker_container import ContainerResourceUsage
@@ -11,7 +12,16 @@
1112

1213

1314
class ResourceTrackerRepository(BaseRepository):
14-
async def upsert_resource_tracker_container_data_(
15+
async def get_prometheus_last_scraped_timestamp(self) -> datetime | None:
16+
async with self.db_engine.begin() as conn:
17+
max_last_scraped_timestamp: datetime | None = await conn.scalar(
18+
sa.select(
19+
sa.func.max(resource_tracker_container.c.prometheus_last_scraped)
20+
)
21+
)
22+
return max_last_scraped_timestamp
23+
24+
async def upsert_resource_tracker_container_data(
1525
self, data: ContainerResourceUsage
1626
) -> None:
1727
async with self.db_engine.begin() as conn:
@@ -27,27 +37,27 @@ async def upsert_resource_tracker_container_data_(
2737
container_cpu_usage_seconds_total=data.container_cpu_usage_seconds_total,
2838
prometheus_created=data.prometheus_created.datetime,
2939
prometheus_last_scraped=data.prometheus_last_scraped.datetime,
30-
modified=func.now(),
40+
modified=sa.func.now(),
3141
)
3242

3343
on_update_stmt = insert_stmt.on_conflict_do_update(
3444
index_elements=[
3545
resource_tracker_container.c.container_id,
3646
],
3747
set_={
38-
"container_cpu_usage_seconds_total": func.greatest(
48+
"container_cpu_usage_seconds_total": sa.func.greatest(
3949
resource_tracker_container.c.container_cpu_usage_seconds_total,
4050
insert_stmt.excluded.container_cpu_usage_seconds_total,
4151
),
42-
"prometheus_created": func.least(
52+
"prometheus_created": sa.func.least(
4353
resource_tracker_container.c.prometheus_created,
4454
insert_stmt.excluded.prometheus_created,
4555
),
46-
"prometheus_last_scraped": func.greatest(
56+
"prometheus_last_scraped": sa.func.greatest(
4757
resource_tracker_container.c.prometheus_last_scraped,
4858
insert_stmt.excluded.prometheus_last_scraped,
4959
),
50-
"modified": func.now(),
60+
"modified": sa.func.now(),
5161
},
5262
)
5363

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import logging
2+
from typing import cast
3+
4+
from fastapi import FastAPI
5+
from servicelib.redis import RedisClientSDK
6+
from settings_library.redis import RedisDatabase, RedisSettings
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
def setup(app: FastAPI) -> None:
12+
async def on_startup() -> None:
13+
app.state.redis_client_sdk = None
14+
settings: RedisSettings = app.state.settings.RESOURCE_USAGE_TRACKER_REDIS
15+
redis_locks_dsn = settings.build_redis_dsn(RedisDatabase.LOCKS)
16+
app.state.redis_client_sdk = client = RedisClientSDK(redis_locks_dsn)
17+
await client.setup()
18+
19+
async def on_shutdown() -> None:
20+
redis_client_sdk: None | RedisClientSDK = app.state.redis_client_sdk
21+
if redis_client_sdk:
22+
await redis_client_sdk.shutdown()
23+
24+
app.add_event_handler("startup", on_startup)
25+
app.add_event_handler("shutdown", on_shutdown)
26+
27+
28+
def get_redis_client(app: FastAPI) -> RedisClientSDK:
29+
return cast(RedisClientSDK, app.state.redis_client_sdk)

services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/resource_tracker.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33

44
from fastapi import FastAPI
55
from servicelib.background_task import start_periodic_task, stop_periodic_task
6+
from servicelib.redis_utils import exclusive
67
from settings_library.prometheus import PrometheusSettings
78

89
from .core.settings import ApplicationSettings
10+
from .modules.redis import get_redis_client
911
from .resource_tracker_core import collect_container_resource_usage_task
1012

1113
_TASK_NAME = "periodic_prometheus_polling"
@@ -23,8 +25,12 @@ async def _startup() -> None:
2325
if not settings:
2426
_logger.warning("Prometheus API client is de-activated in the settings")
2527
return
28+
lock_key = f"{app.title}:collect_container_resource_usage_task_lock"
29+
lock_value = "locked"
2630
app.state.resource_tracker_task = start_periodic_task(
27-
collect_container_resource_usage_task,
31+
exclusive(get_redis_client(app), lock_key=lock_key, lock_value=lock_value)(
32+
collect_container_resource_usage_task
33+
),
2834
interval=app_settings.RESOURCE_USAGE_TRACKER_EVALUATION_INTERVAL_SEC,
2935
task_name=_TASK_NAME,
3036
app=app,

services/resource-usage-tracker/src/simcore_service_resource_usage_tracker/resource_tracker_core.py

Lines changed: 103 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import asyncio
22
import json
33
import logging
4+
from datetime import datetime, timedelta, timezone
45
from typing import Any
56

67
import arrow
78
from fastapi import FastAPI
89
from prometheus_api_client import PrometheusConnect
10+
from pydantic import BaseModel
911
from simcore_service_resource_usage_tracker.modules.prometheus import (
1012
get_prometheus_api_client,
1113
)
@@ -16,30 +18,58 @@
1618
_logger = logging.getLogger(__name__)
1719

1820

21+
_PAST_X_MINUTES = 30 # in promql query the first part: [30m:1m]
22+
_RESOLUTION_MINUTES = 1 # in promql query the second part: [30m:1m]
23+
_PROME_QUERY_PARAMS_TIMEDELTA_MINUTES = (
24+
25 # should be less than _PAST_X_MINUTES (so there is small overlap)
25+
)
26+
27+
28+
class _PromQueryParameters(BaseModel):
29+
image_regex: str
30+
scrape_timestamp: datetime
31+
32+
class Config:
33+
validation = False
34+
35+
1936
def _prometheus_sync_client_custom_query(
20-
prometheus_client: PrometheusConnect, promql_cpu_query: str
37+
prometheus_client: PrometheusConnect,
38+
promql_cpu_query: str,
39+
scrape_timestamp: datetime,
2140
) -> list[dict]:
22-
_logger.info("Querying prometheus with: %s", promql_cpu_query)
23-
data: list[dict] = prometheus_client.custom_query(promql_cpu_query)
41+
rfc3339: str = scrape_timestamp.isoformat("T")
42+
_logger.info("Querying prometheus at <%s> with: <%s>", rfc3339, promql_cpu_query)
43+
data: list[dict] = prometheus_client.custom_query(
44+
promql_cpu_query, params={"time": rfc3339}
45+
)
2446
return data
2547

2648

27-
async def _scrape_and_upload_container_resource_usage(
49+
async def _scrape_container_resource_usage(
2850
prometheus_client: PrometheusConnect,
29-
resource_tracker_repo: ResourceTrackerRepository,
3051
image_regex: str,
31-
) -> None:
52+
scrape_timestamp: datetime = datetime.now(tz=timezone.utc),
53+
) -> list[ContainerResourceUsage]:
3254
# Query CPU seconds
33-
promql_cpu_query = f"sum without (cpu) (container_cpu_usage_seconds_total{{image=~'{image_regex}'}})[30m:1m]"
55+
promql_cpu_query = f"sum without (cpu) (container_cpu_usage_seconds_total{{image=~'{image_regex}'}})[{_PAST_X_MINUTES}m:{_RESOLUTION_MINUTES}m]"
3456
containers_cpu_seconds_usage: list[
3557
dict
3658
] = await asyncio.get_event_loop().run_in_executor(
37-
None, _prometheus_sync_client_custom_query, prometheus_client, promql_cpu_query
59+
None,
60+
_prometheus_sync_client_custom_query,
61+
prometheus_client,
62+
promql_cpu_query,
63+
scrape_timestamp,
3864
)
3965
_logger.info(
40-
"Received %s containers from Prometheus", len(containers_cpu_seconds_usage)
66+
"Received <%s> containers from Prometheus for image <%s> at <%s>",
67+
len(containers_cpu_seconds_usage),
68+
image_regex,
69+
scrape_timestamp,
4170
)
4271

72+
data: list[ContainerResourceUsage] = []
4373
for item in containers_cpu_seconds_usage:
4474
# Prepare metric
4575
metric: dict[str, Any] = item["metric"]
@@ -87,21 +117,79 @@ async def _scrape_and_upload_container_resource_usage(
87117
prometheus_last_scraped=arrow.get(last_value[0]),
88118
)
89119

90-
await resource_tracker_repo.upsert_resource_tracker_container_data_(
91-
container_resource_usage
120+
data.append(container_resource_usage)
121+
122+
return data
123+
124+
125+
def _prepare_prom_query_parameters(
126+
machine_fqdn: str,
127+
prometheus_last_scraped_timestamp: datetime | None,
128+
current_timestamp: datetime = datetime.now(tz=timezone.utc),
129+
) -> list[_PromQueryParameters]:
130+
image_regex = f"registry.{machine_fqdn}/simcore/services/dynamic/jupyter-smash:.*"
131+
132+
# When we start this service for a first time (meaning there is empty database) we start from current timestamp
133+
scrape_timestamp = (
134+
prometheus_last_scraped_timestamp
135+
if prometheus_last_scraped_timestamp
136+
else current_timestamp
137+
)
138+
139+
data = []
140+
while scrape_timestamp < current_timestamp - timedelta(
141+
minutes=_PROME_QUERY_PARAMS_TIMEDELTA_MINUTES
142+
):
143+
data.append(
144+
_PromQueryParameters(
145+
image_regex=image_regex, scrape_timestamp=scrape_timestamp
146+
)
147+
)
148+
scrape_timestamp = scrape_timestamp + timedelta(
149+
minutes=_PROME_QUERY_PARAMS_TIMEDELTA_MINUTES
92150
)
151+
data.append(
152+
_PromQueryParameters(
153+
image_regex=image_regex, scrape_timestamp=current_timestamp
154+
)
155+
)
156+
return data
93157

94158

95159
async def collect_container_resource_usage(
96160
prometheus_client: PrometheusConnect,
97161
resource_tracker_repo: ResourceTrackerRepository,
98162
machine_fqdn: str,
99163
) -> None:
100-
await _scrape_and_upload_container_resource_usage(
101-
prometheus_client=prometheus_client,
102-
resource_tracker_repo=resource_tracker_repo,
103-
image_regex=f"registry.{machine_fqdn}/simcore/services/dynamic/jupyter-smash:.*",
164+
prometheus_last_scraped_timestamp: datetime | None = (
165+
await resource_tracker_repo.get_prometheus_last_scraped_timestamp()
104166
)
167+
prom_query_params: list[_PromQueryParameters] = _prepare_prom_query_parameters(
168+
machine_fqdn, prometheus_last_scraped_timestamp
169+
)
170+
171+
for i, parameter in enumerate(prom_query_params):
172+
_logger.info(
173+
"Collecting %s/%s with parameter <%s>",
174+
i + 1,
175+
len(prom_query_params),
176+
parameter,
177+
)
178+
data: list[ContainerResourceUsage] = await _scrape_container_resource_usage(
179+
prometheus_client=prometheus_client,
180+
image_regex=parameter.image_regex,
181+
scrape_timestamp=parameter.scrape_timestamp,
182+
)
183+
184+
# Upload to the database
185+
await asyncio.gather(
186+
*[
187+
resource_tracker_repo.upsert_resource_tracker_container_data(
188+
container_resource_usage
189+
)
190+
for container_resource_usage in data
191+
]
192+
)
105193

106194

107195
async def collect_container_resource_usage_task(app: FastAPI) -> None:

services/resource-usage-tracker/tests/unit/api/test__oas_spec.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
def test_openapi_json_is_in_sync_with_app_oas(
1212
disabled_database: None,
1313
disabled_prometheus: None,
14+
mocked_redis_server: None,
1415
client: TestClient,
1516
project_slug_dir: Path,
1617
):

0 commit comments

Comments
 (0)