Skip to content

Commit b3c8e81

Browse files
author
Azfaar Qureshi
committed
Adding exporter methods
1 parent a6732c4 commit b3c8e81

File tree

3 files changed

+108
-17
lines changed

3 files changed

+108
-17
lines changed

Diff for: dev-requirements.txt

+2
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,5 @@ readme-renderer~=24.0
1212
grpcio-tools==1.29.0
1313
mypy-protobuf>=1.23
1414
protobuf>=3.13.0
15+
snappy==0.5.4
16+
requests==2.25.0

Diff for: exporter/opentelemetry-exporter-prometheus-remote-write/src/opentelemetry/exporter/prometheus_remote_write/__init__.py

+51-5
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,13 @@
1111
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
14+
import logging
1415
import re
1516
from typing import Dict, Sequence
1617

18+
import requests
19+
20+
import snappy
1721
from opentelemetry.exporter.prometheus_remote_write.gen.remote_pb2 import (
1822
WriteRequest,
1923
)
@@ -35,6 +39,8 @@
3539
ValueObserverAggregator,
3640
)
3741

42+
logger = logging.getLogger(__name__)
43+
3844

3945
class PrometheusRemoteWriteMetricsExporter(MetricsExporter):
4046
"""
@@ -133,10 +139,13 @@ def headers(self, headers: Dict):
133139
def export(
134140
self, export_records: Sequence[ExportRecord]
135141
) -> MetricsExportResult:
136-
raise NotImplementedError()
142+
timeseries = self.convert_to_timeseries(export_records)
143+
message = self.build_message(timeseries)
144+
headers = self.get_headers()
145+
return self.send_message(message, headers)
137146

138147
def shutdown(self) -> None:
139-
raise NotImplementedError()
148+
pass
140149

141150
def convert_to_timeseries(
142151
self, export_records: Sequence[ExportRecord]
@@ -265,12 +274,49 @@ def create_label(self, name: str, value: str) -> Label:
265274
return label
266275

267276
def build_message(self, timeseries: Sequence[TimeSeries]) -> bytes:
268-
raise NotImplementedError()
277+
write_request = WriteRequest()
278+
write_request.timeseries.extend(timeseries)
279+
serialized_message = write_request.SerializeToString()
280+
return snappy.compress(serialized_message)
269281

270282
def get_headers(self) -> Dict:
271-
raise NotImplementedError()
283+
headers = {
284+
"Content-Encoding": "snappy",
285+
"Content-Type": "application/x-protobuf",
286+
"X-Prometheus-Remote-Write-Version": "0.1.0",
287+
}
288+
if hasattr(self, "headers"):
289+
for header_name, header_value in self.headers.items():
290+
headers[header_name] = header_value
291+
292+
if "Authorization" not in headers:
293+
if hasattr(self, "bearer_token"):
294+
headers["Authorization"] = "Bearer " + self.bearer_token
295+
elif hasattr(self, "bearer_token_file"):
296+
with open(self.bearer_token_file) as file:
297+
headers["Authorization"] = "Bearer " + file.readline()
298+
return headers
272299

273300
def send_message(
274301
self, message: bytes, headers: Dict
275302
) -> MetricsExportResult:
276-
raise NotImplementedError()
303+
auth = None
304+
if hasattr(self, "basic_auth"):
305+
basic_auth = self.basic_auth
306+
if "password" in basic_auth:
307+
auth = (basic_auth.username, basic_auth.password)
308+
else:
309+
with open(basic_auth.password_file) as file:
310+
auth = (basic_auth.username, file.readline())
311+
response = requests.post(
312+
self.endpoint, data=message, headers=headers, auth=auth
313+
)
314+
if response.status_code != 200:
315+
logger.warning(
316+
"POST request failed with status %s with reason: %s and content: %s",
317+
str(response.status_code),
318+
response.reason,
319+
str(response.content),
320+
)
321+
return MetricsExportResult.FAILURE
322+
return MetricsExportResult.SUCCESS

Diff for: exporter/opentelemetry-exporter-prometheus-remote-write/tests/test_prometheus_remote_write_exporter.py

+55-12
Original file line numberDiff line numberDiff line change
@@ -288,25 +288,68 @@ def test_create_timeseries(self):
288288
self.assertEqual(timeseries, expected_timeseries)
289289

290290

291+
class ResponseStub:
292+
def __init__(self, status_code):
293+
self.status_code = status_code
294+
self.reason = "dummy_reason"
295+
self.content = "dummy_content"
296+
297+
291298
class TestExport(unittest.TestCase):
292299
# Initializes test data that is reused across tests
293300
def setUp(self):
294-
pass
301+
self._exporter = PrometheusRemoteWriteMetricsExporter(
302+
endpoint="/prom/test_endpoint"
303+
)
295304

296305
# Ensures export is successful with valid export_records and config
297-
def test_export(self):
298-
pass
299-
300-
def test_valid_send_message(self):
301-
pass
302-
303-
def test_invalid_send_message(self):
304-
pass
306+
@mock.patch("requests.post", return_value=ResponseStub(200))
307+
def test_export(self, mock_post):
308+
test_metric = Counter("testname", "testdesc", "testunit", int, None)
309+
labels = {"environment": "testing"}
310+
record = ExportRecord(
311+
test_metric, labels, SumAggregator(), Resource({}),
312+
)
313+
result = self._exporter.export([record])
314+
self.assertIs(result, MetricsExportResult.SUCCESS)
315+
self.assertEqual(mock_post.call_count, 1)
316+
317+
@mock.patch("requests.post", return_value=ResponseStub(200))
318+
def test_valid_send_message(self, mock_post):
319+
result = self._exporter.send_message(bytes(), {})
320+
self.assertEqual(mock_post.call_count, 1)
321+
self.assertEqual(result, MetricsExportResult.SUCCESS)
322+
323+
@mock.patch("requests.post", return_value=ResponseStub(404))
324+
def test_invalid_send_message(self, mock_post):
325+
result = self._exporter.send_message(bytes(), {})
326+
self.assertEqual(mock_post.call_count, 1)
327+
self.assertEqual(result, MetricsExportResult.FAILURE)
305328

306329
# Verifies that build_message calls snappy.compress and returns SerializedString
307-
def test_build_message(self):
308-
pass
330+
@mock.patch("snappy.compress", return_value=bytes())
331+
def test_build_message(self, mock_compress):
332+
test_timeseries = [
333+
TimeSeries(),
334+
TimeSeries(),
335+
]
336+
message = self._exporter.build_message(test_timeseries)
337+
self.assertEqual(mock_compress.call_count, 1)
338+
self.assertIsInstance(message, bytes)
309339

310340
# Ensure correct headers are added when valid config is provided
311341
def test_get_headers(self):
312-
pass
342+
self._exporter.headers = {"Custom Header": "test_header"}
343+
self._exporter.headers = {"Custom Header": "test_header"}
344+
self._exporter.bearer_token = "test_token"
345+
346+
headers = self._exporter.get_headers()
347+
self.assertEqual(headers.get("Content-Encoding", ""), "snappy")
348+
self.assertEqual(
349+
headers.get("Content-Type", ""), "application/x-protobuf"
350+
)
351+
self.assertEqual(
352+
headers.get("X-Prometheus-Remote-Write-Version", ""), "0.1.0"
353+
)
354+
self.assertEqual(headers.get("Authorization", ""), "Bearer test_token")
355+
self.assertEqual(headers.get("Custom Header", ""), "test_header")

0 commit comments

Comments
 (0)