|
16 | 16 | import re
|
17 | 17 | from typing import Dict, Sequence
|
18 | 18 |
|
| 19 | +import requests |
| 20 | + |
| 21 | +import snappy |
19 | 22 | from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import (
|
20 | 23 | WriteRequest,
|
21 | 24 | )
|
@@ -48,7 +51,7 @@ class PrometheusRemoteWriteMetricsExporter(MetricsExporter):
|
48 | 51 | endpoint: url where data will be sent (Required)
|
49 | 52 | basic_auth: username and password for authentication (Optional)
|
50 | 53 | headers: additional headers for remote write request (Optional)
|
51 |
| - timeout: timeout for requests to the remote write endpoint in seconds (Optional) |
| 54 | + timeout: timeout for remote write requests in seconds, defaults to 30 (Optional) |
52 | 55 | proxies: dict mapping request proxy protocols to proxy urls (Optional)
|
53 | 56 | tls_config: configuration for remote write TLS settings (Optional)
|
54 | 57 | """
|
@@ -96,15 +99,15 @@ def basic_auth(self, basic_auth: Dict):
|
96 | 99 | if basic_auth:
|
97 | 100 | if "username" not in basic_auth:
|
98 | 101 | raise ValueError("username required in basic_auth")
|
99 |
| - if ( |
100 |
| - "password" not in basic_auth |
101 |
| - and "password_file" not in basic_auth |
102 |
| - ): |
| 102 | + if "password_file" in basic_auth: |
| 103 | + if "password" in basic_auth: |
| 104 | + raise ValueError( |
| 105 | + "basic_auth cannot contain password and password_file" |
| 106 | + ) |
| 107 | + with open(basic_auth["password_file"]) as file: |
| 108 | + basic_auth["password"] = file.readline().strip() |
| 109 | + elif "password" not in basic_auth: |
103 | 110 | raise ValueError("password required in basic_auth")
|
104 |
| - if "password" in basic_auth and "password_file" in basic_auth: |
105 |
| - raise ValueError( |
106 |
| - "basic_auth cannot contain password and password_file" |
107 |
| - ) |
108 | 111 | self._basic_auth = basic_auth
|
109 | 112 |
|
110 | 113 | @property
|
@@ -159,10 +162,20 @@ def headers(self, headers: Dict):
|
159 | 162 | def export(
|
160 | 163 | self, export_records: Sequence[ExportRecord]
|
161 | 164 | ) -> MetricsExportResult:
|
162 |
| - raise NotImplementedError() |
| 165 | + if not export_records: |
| 166 | + return MetricsExportResult.SUCCESS |
| 167 | + timeseries = self._convert_to_timeseries(export_records) |
| 168 | + if not timeseries: |
| 169 | + logger.error( |
| 170 | + "All records contain unsupported aggregators, export aborted" |
| 171 | + ) |
| 172 | + return MetricsExportResult.FAILURE |
| 173 | + message = self._build_message(timeseries) |
| 174 | + headers = self._build_headers() |
| 175 | + return self._send_message(message, headers) |
163 | 176 |
|
164 | 177 | def shutdown(self) -> None:
|
165 |
| - raise NotImplementedError() |
| 178 | + pass |
166 | 179 |
|
167 | 180 | def _convert_to_timeseries(
|
168 | 181 | self, export_records: Sequence[ExportRecord]
|
@@ -304,13 +317,60 @@ def add_label(label_name: str, label_value: str):
|
304 | 317 | timeseries.samples.append(sample)
|
305 | 318 | return timeseries
|
306 | 319 |
|
307 |
| - def build_message(self, timeseries: Sequence[TimeSeries]) -> bytes: |
308 |
| - raise NotImplementedError() |
309 |
| - |
310 |
| - def get_headers(self) -> Dict: |
311 |
| - raise NotImplementedError() |
| 320 | + def _build_message(self, timeseries: Sequence[TimeSeries]) -> bytes: |
| 321 | + write_request = WriteRequest() |
| 322 | + write_request.timeseries.extend(timeseries) |
| 323 | + serialized_message = write_request.SerializeToString() |
| 324 | + return snappy.compress(serialized_message) |
| 325 | + |
| 326 | + def _build_headers(self) -> Dict: |
| 327 | + headers = { |
| 328 | + "Content-Encoding": "snappy", |
| 329 | + "Content-Type": "application/x-protobuf", |
| 330 | + "X-Prometheus-Remote-Write-Version": "0.1.0", |
| 331 | + } |
| 332 | + if self.headers: |
| 333 | + for header_name, header_value in self.headers.items(): |
| 334 | + headers[header_name] = header_value |
| 335 | + return headers |
312 | 336 |
|
313 |
| - def send_message( |
| 337 | + def _send_message( |
314 | 338 | self, message: bytes, headers: Dict
|
315 | 339 | ) -> MetricsExportResult:
|
316 |
| - raise NotImplementedError() |
| 340 | + auth = None |
| 341 | + if self.basic_auth: |
| 342 | + auth = (self.basic_auth["username"], self.basic_auth["password"]) |
| 343 | + |
| 344 | + cert = None |
| 345 | + verify = True |
| 346 | + if self.tls_config: |
| 347 | + if "ca_file" in self.tls_config: |
| 348 | + verify = self.tls_config["ca_file"] |
| 349 | + elif "insecure_skip_verify" in self.tls_config: |
| 350 | + verify = self.tls_config["insecure_skip_verify"] |
| 351 | + |
| 352 | + if ( |
| 353 | + "cert_file" in self.tls_config |
| 354 | + and "key_file" in self.tls_config |
| 355 | + ): |
| 356 | + cert = ( |
| 357 | + self.tls_config["cert_file"], |
| 358 | + self.tls_config["key_file"], |
| 359 | + ) |
| 360 | + try: |
| 361 | + response = requests.post( |
| 362 | + self.endpoint, |
| 363 | + data=message, |
| 364 | + headers=headers, |
| 365 | + auth=auth, |
| 366 | + timeout=self.timeout, |
| 367 | + proxies=self.proxies, |
| 368 | + cert=cert, |
| 369 | + verify=verify, |
| 370 | + ) |
| 371 | + if not response.ok: |
| 372 | + response.raise_for_status() |
| 373 | + except requests.exceptions.RequestException as e: |
| 374 | + logger.error("Export POST request failed with reason: %s", e) |
| 375 | + return MetricsExportResult.FAILURE |
| 376 | + return MetricsExportResult.SUCCESS |
0 commit comments