|
16 | 16 | import re
|
17 | 17 | from typing import Dict, Sequence
|
18 | 18 |
|
| 19 | +import requests |
| 20 | + |
| 21 | +import snappy |
19 | 22 | from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import (
|
20 | 23 | WriteRequest,
|
21 | 24 | )
|
@@ -96,15 +99,15 @@ def basic_auth(self, basic_auth: Dict):
|
96 | 99 | if basic_auth:
|
97 | 100 | if "username" not in basic_auth:
|
98 | 101 | raise ValueError("username required in basic_auth")
|
99 |
| - if ( |
100 |
| - "password" not in basic_auth |
101 |
| - and "password_file" not in basic_auth |
102 |
| - ): |
| 102 | + if "password_file" in basic_auth: |
| 103 | + if "password" in basic_auth: |
| 104 | + raise ValueError( |
| 105 | + "basic_auth cannot contain password and password_file" |
| 106 | + ) |
| 107 | + with open(basic_auth["password_file"]) as file: |
| 108 | + basic_auth["password"] = file.readline().strip() |
| 109 | + elif "password" not in basic_auth: |
103 | 110 | raise ValueError("password required in basic_auth")
|
104 |
| - if "password" in basic_auth and "password_file" in basic_auth: |
105 |
| - raise ValueError( |
106 |
| - "basic_auth cannot contain password and password_file" |
107 |
| - ) |
108 | 111 | self._basic_auth = basic_auth
|
109 | 112 |
|
110 | 113 | @property
|
@@ -159,10 +162,16 @@ def headers(self, headers: Dict):
|
159 | 162 | def export(
|
160 | 163 | self, export_records: Sequence[ExportRecord]
|
161 | 164 | ) -> MetricsExportResult:
|
162 |
| - raise NotImplementedError() |
| 165 | + timeseries = self._convert_to_timeseries(export_records) |
| 166 | + if not timeseries: |
| 167 | + logger.warning("No valid records found, export aborted") |
| 168 | + return MetricsExportResult.FAILURE |
| 169 | + message = self._build_message(timeseries) |
| 170 | + headers = self._build_headers() |
| 171 | + return self._send_message(message, headers) |
163 | 172 |
|
164 | 173 | def shutdown(self) -> None:
|
165 |
| - raise NotImplementedError() |
| 174 | + pass |
166 | 175 |
|
167 | 176 | def _convert_to_timeseries(
|
168 | 177 | self, export_records: Sequence[ExportRecord]
|
@@ -304,13 +313,62 @@ def add_label(label_name: str, label_value: str):
|
304 | 313 | timeseries.samples.append(sample)
|
305 | 314 | return timeseries
|
306 | 315 |
|
307 |
| - def build_message(self, timeseries: Sequence[TimeSeries]) -> bytes: |
308 |
| - raise NotImplementedError() |
309 |
| - |
310 |
| - def get_headers(self) -> Dict: |
311 |
| - raise NotImplementedError() |
| 316 | + def _build_message(self, timeseries: Sequence[TimeSeries]) -> bytes: |
| 317 | + write_request = WriteRequest() |
| 318 | + write_request.timeseries.extend(timeseries) |
| 319 | + serialized_message = write_request.SerializeToString() |
| 320 | + return snappy.compress(serialized_message) |
| 321 | + |
| 322 | + def _build_headers(self) -> Dict: |
| 323 | + headers = { |
| 324 | + "Content-Encoding": "snappy", |
| 325 | + "Content-Type": "application/x-protobuf", |
| 326 | + "X-Prometheus-Remote-Write-Version": "0.1.0", |
| 327 | + } |
| 328 | + if self.headers: |
| 329 | + for header_name, header_value in self.headers.items(): |
| 330 | + headers[header_name] = header_value |
| 331 | + return headers |
312 | 332 |
|
313 |
| - def send_message( |
| 333 | + def _send_message( |
314 | 334 | self, message: bytes, headers: Dict
|
315 | 335 | ) -> MetricsExportResult:
|
316 |
| - raise NotImplementedError() |
| 336 | + auth = None |
| 337 | + if self.basic_auth: |
| 338 | + auth = (self.basic_auth["username"], self.basic_auth["password"]) |
| 339 | + |
| 340 | + cert = None |
| 341 | + verify = True |
| 342 | + if self.tls_config: |
| 343 | + if "ca_file" in self.tls_config: |
| 344 | + verify = self.tls_config["ca_file"] |
| 345 | + elif "insecure_skip_verify" in self.tls_config: |
| 346 | + verify = self.tls_config["insecure_skip_verify"] |
| 347 | + |
| 348 | + if ( |
| 349 | + "cert_file" in self.tls_config |
| 350 | + and "key_file" in self.tls_config |
| 351 | + ): |
| 352 | + cert = ( |
| 353 | + self.tls_config["cert_file"], |
| 354 | + self.tls_config["key_file"], |
| 355 | + ) |
| 356 | + response = requests.post( |
| 357 | + self.endpoint, |
| 358 | + data=message, |
| 359 | + headers=headers, |
| 360 | + auth=auth, |
| 361 | + timeout=self.timeout, |
| 362 | + proxies=self.proxies, |
| 363 | + cert=cert, |
| 364 | + verify=verify, |
| 365 | + ) |
| 366 | + if response.status_code != 200: |
| 367 | + logger.warning( |
| 368 | + "POST request failed with status %s with reason: %s and content: %s", |
| 369 | + str(response.status_code), |
| 370 | + response.reason, |
| 371 | + str(response.content), |
| 372 | + ) |
| 373 | + return MetricsExportResult.FAILURE |
| 374 | + return MetricsExportResult.SUCCESS |
0 commit comments