|
12 | 12 | # See the License for the specific language governing permissions and
|
13 | 13 | # limitations under the License.
|
14 | 14 |
|
| 15 | +import logging |
15 | 16 | import re
|
16 | 17 | from typing import Dict, Sequence
|
17 | 18 |
|
| 19 | +import requests |
| 20 | + |
| 21 | +import snappy |
18 | 22 | from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import (
|
19 | 23 | WriteRequest,
|
20 | 24 | )
|
|
36 | 40 | ValueObserverAggregator,
|
37 | 41 | )
|
38 | 42 |
|
| 43 | +logger = logging.getLogger(__name__) |
| 44 | + |
39 | 45 |
|
40 | 46 | class PrometheusRemoteWriteMetricsExporter(MetricsExporter):
|
41 | 47 | """
|
@@ -108,7 +114,7 @@ def timeout(self, timeout: int):
|
108 | 114 |
|
109 | 115 | @property
|
110 | 116 | def tls_config(self):
|
111 |
| - return self.tls_config |
| 117 | + return self._tls_config |
112 | 118 |
|
113 | 119 | @tls_config.setter
|
114 | 120 | def tls_config(self, tls_config: Dict):
|
@@ -148,10 +154,13 @@ def headers(self, headers: Dict):
|
148 | 154 | def export(
|
149 | 155 | self, export_records: Sequence[ExportRecord]
|
150 | 156 | ) -> MetricsExportResult:
|
151 |
| - raise NotImplementedError() |
| 157 | + timeseries = self.convert_to_timeseries(export_records) |
| 158 | + message = self.build_message(timeseries) |
| 159 | + headers = self.get_headers() |
| 160 | + return self.send_message(message, headers) |
152 | 161 |
|
153 | 162 | def shutdown(self) -> None:
|
154 |
| - raise NotImplementedError() |
| 163 | + pass |
155 | 164 |
|
156 | 165 | def convert_to_timeseries(
|
157 | 166 | self, export_records: Sequence[ExportRecord]
|
@@ -280,12 +289,66 @@ def create_label(self, name: str, value: str) -> Label:
|
280 | 289 | return label
|
281 | 290 |
|
282 | 291 | def build_message(self, timeseries: Sequence[TimeSeries]) -> bytes:
|
283 |
| - raise NotImplementedError() |
| 292 | + write_request = WriteRequest() |
| 293 | + write_request.timeseries.extend(timeseries) |
| 294 | + serialized_message = write_request.SerializeToString() |
| 295 | + return snappy.compress(serialized_message) |
284 | 296 |
|
285 | 297 | def get_headers(self) -> Dict:
|
286 |
| - raise NotImplementedError() |
| 298 | + headers = { |
| 299 | + "Content-Encoding": "snappy", |
| 300 | + "Content-Type": "application/x-protobuf", |
| 301 | + "X-Prometheus-Remote-Write-Version": "0.1.0", |
| 302 | + } |
| 303 | + if self.headers: |
| 304 | + for header_name, header_value in self.headers.items(): |
| 305 | + headers[header_name] = header_value |
| 306 | + return headers |
287 | 307 |
|
288 | 308 | def send_message(
|
289 | 309 | self, message: bytes, headers: Dict
|
290 | 310 | ) -> MetricsExportResult:
|
291 |
| - raise NotImplementedError() |
| 311 | + auth = None |
| 312 | + if self.basic_auth: |
| 313 | + basic_auth = self.basic_auth |
| 314 | + if "password" in basic_auth: |
| 315 | + auth = (basic_auth.username, basic_auth.password) |
| 316 | + else: |
| 317 | + with open(basic_auth.password_file) as file: |
| 318 | + auth = (basic_auth.username, file.readline()) |
| 319 | + |
| 320 | + cert = None |
| 321 | + verify = True |
| 322 | + if self.tls_config: |
| 323 | + if "ca_file" in self.tls_config: |
| 324 | + verify = self.tls_config["ca_file"] |
| 325 | + elif "insecure_skip_verify" in self.tls_config: |
| 326 | + verify = self.tls_config["insecure_skip_verify"] |
| 327 | + |
| 328 | + if ( |
| 329 | + "cert_file" in self.tls_config |
| 330 | + and "key_file" in self.tls_config |
| 331 | + ): |
| 332 | + cert = ( |
| 333 | + self.tls_config["cert_file"], |
| 334 | + self.tls_config["key_file"], |
| 335 | + ) |
| 336 | + response = requests.post( |
| 337 | + self.endpoint, |
| 338 | + data=message, |
| 339 | + headers=headers, |
| 340 | + auth=auth, |
| 341 | + timeout=self.timeout, |
| 342 | + proxies=self.proxies, |
| 343 | + cert=cert, |
| 344 | + verify=verify, |
| 345 | + ) |
| 346 | + if response.status_code != 200: |
| 347 | + logger.warning( |
| 348 | + "POST request failed with status %s with reason: %s and content: %s", |
| 349 | + str(response.status_code), |
| 350 | + response.reason, |
| 351 | + str(response.content), |
| 352 | + ) |
| 353 | + return MetricsExportResult.FAILURE |
| 354 | + return MetricsExportResult.SUCCESS |
0 commit comments