Skip to content

Commit fb0a761

Browse files
emdnetoxrmx
authored andcommitted
Add support to instrument httpx when using proxy (open-telemetry#2664)
1 parent bfd47a6 commit fb0a761

File tree

3 files changed

+205
-7
lines changed

3 files changed

+205
-7
lines changed

CHANGELOG.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
3131
([#2630](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2630))
3232
- `opentelemetry-instrumentation-system-metrics` Add support for capture open file descriptors
3333
([#2652](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2652))
34+
- `opentelemetry-instrumentation-httpx` Add support for instrument client with proxy
35+
([#2664](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2664))
3436
- `opentelemetry-instrumentation-aiohttp-client` Implement new semantic convention opt-in migration
3537
([#2673](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2673))
3638
- `opentelemetry-instrumentation-django` Add `http.target` to Django duration metric attributes
@@ -63,7 +65,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
6365
- `opentelemetry-instrumentation-asgi` Fix generation of `http.target` and `http.url` attributes for ASGI apps
6466
using sub apps
6567
([#2477](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2477))
66-
- `opentelemetry-instrumentation-aws-lambda` Bugfix: AWS Lambda event source key incorrect for SNS in instrumentation library.
68+
- `opentelemetry-instrumentation-aws-lambda` Bugfix: AWS Lambda event source key incorrect for SNS in instrumentation library.
6769
([#2612](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2612))
6870
- `opentelemetry-instrumentation-asyncio` instrumented `asyncio.wait_for` properly raises `asyncio.TimeoutError` as expected
6971
([#2637](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2637))

instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py

+69
Original file line numberDiff line numberDiff line change
@@ -640,6 +640,7 @@ def __init__(self, *args, **kwargs):
640640
super().__init__(*args, **kwargs)
641641

642642
self._original_transport = self._transport
643+
self._original_mounts = self._mounts.copy()
643644
self._is_instrumented_by_opentelemetry = True
644645

645646
self._transport = SyncOpenTelemetryTransport(
@@ -648,6 +649,21 @@ def __init__(self, *args, **kwargs):
648649
request_hook=_InstrumentedClient._request_hook,
649650
response_hook=_InstrumentedClient._response_hook,
650651
)
652+
self._mounts.update(
653+
{
654+
url_pattern: (
655+
SyncOpenTelemetryTransport(
656+
transport,
657+
tracer_provider=_InstrumentedClient._tracer_provider,
658+
request_hook=_InstrumentedClient._request_hook,
659+
response_hook=_InstrumentedClient._response_hook,
660+
)
661+
if transport is not None
662+
else transport
663+
)
664+
for url_pattern, transport in self._original_mounts.items()
665+
}
666+
)
651667

652668

653669
class _InstrumentedAsyncClient(httpx.AsyncClient):
@@ -659,6 +675,7 @@ def __init__(self, *args, **kwargs):
659675
super().__init__(*args, **kwargs)
660676

661677
self._original_transport = self._transport
678+
self._original_mounts = self._mounts.copy()
662679
self._is_instrumented_by_opentelemetry = True
663680

664681
self._transport = AsyncOpenTelemetryTransport(
@@ -668,6 +685,22 @@ def __init__(self, *args, **kwargs):
668685
response_hook=_InstrumentedAsyncClient._response_hook,
669686
)
670687

688+
self._mounts.update(
689+
{
690+
url_pattern: (
691+
AsyncOpenTelemetryTransport(
692+
transport,
693+
tracer_provider=_InstrumentedAsyncClient._tracer_provider,
694+
request_hook=_InstrumentedAsyncClient._request_hook,
695+
response_hook=_InstrumentedAsyncClient._response_hook,
696+
)
697+
if transport is not None
698+
else transport
699+
)
700+
for url_pattern, transport in self._original_mounts.items()
701+
}
702+
)
703+
671704

672705
class HTTPXClientInstrumentor(BaseInstrumentor):
673706
# pylint: disable=protected-access,attribute-defined-outside-init
@@ -752,6 +785,7 @@ def instrument_client(
752785
if not client._is_instrumented_by_opentelemetry:
753786
if isinstance(client, httpx.Client):
754787
client._original_transport = client._transport
788+
client._original_mounts = client._mounts.copy()
755789
transport = client._transport or httpx.HTTPTransport()
756790
client._transport = SyncOpenTelemetryTransport(
757791
transport,
@@ -760,15 +794,47 @@ def instrument_client(
760794
response_hook=response_hook,
761795
)
762796
client._is_instrumented_by_opentelemetry = True
797+
client._mounts.update(
798+
{
799+
url_pattern: (
800+
SyncOpenTelemetryTransport(
801+
transport,
802+
tracer_provider=tracer_provider,
803+
request_hook=request_hook,
804+
response_hook=response_hook,
805+
)
806+
if transport is not None
807+
else transport
808+
)
809+
for url_pattern, transport in client._original_mounts.items()
810+
}
811+
)
812+
763813
if isinstance(client, httpx.AsyncClient):
764814
transport = client._transport or httpx.AsyncHTTPTransport()
815+
client._original_mounts = client._mounts.copy()
765816
client._transport = AsyncOpenTelemetryTransport(
766817
transport,
767818
tracer_provider=tracer_provider,
768819
request_hook=request_hook,
769820
response_hook=response_hook,
770821
)
771822
client._is_instrumented_by_opentelemetry = True
823+
client._mounts.update(
824+
{
825+
url_pattern: (
826+
AsyncOpenTelemetryTransport(
827+
transport,
828+
tracer_provider=tracer_provider,
829+
request_hook=request_hook,
830+
response_hook=response_hook,
831+
)
832+
if transport is not None
833+
else transport
834+
)
835+
for url_pattern, transport in client._original_mounts.items()
836+
}
837+
)
772838
else:
773839
_logger.warning(
774840
"Attempting to instrument Httpx client while already instrumented"
@@ -787,6 +853,9 @@ def uninstrument_client(
787853
client._transport = client._original_transport
788854
del client._original_transport
789855
client._is_instrumented_by_opentelemetry = False
856+
if hasattr(client, "_original_mounts"):
857+
client._mounts = client._original_mounts.copy()
858+
del client._original_mounts
790859
else:
791860
_logger.warning(
792861
"Attempting to uninstrument Httpx "

instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py

+133-6
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,7 @@ def create_transport(
530530
tracer_provider: typing.Optional["TracerProvider"] = None,
531531
request_hook: typing.Optional["RequestHook"] = None,
532532
response_hook: typing.Optional["ResponseHook"] = None,
533+
**kwargs,
533534
):
534535
pass
535536

@@ -539,6 +540,7 @@ def create_client(
539540
transport: typing.Union[
540541
SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport, None
541542
] = None,
543+
**kwargs,
542544
):
543545
pass
544546

@@ -643,22 +645,70 @@ def test_not_recording_not_set_attribute_in_exception_new_semconv(
643645
self.assertFalse(mock_span.set_attribute.called)
644646
self.assertFalse(mock_span.set_status.called)
645647

648+
@respx.mock
649+
def test_client_mounts_with_instrumented_transport(self):
650+
https_url = "https://mock/status/200"
651+
respx.get(https_url).mock(httpx.Response(200))
652+
proxy_mounts = {
653+
"http://": self.create_transport(
654+
proxy=httpx.Proxy("http://localhost:8080")
655+
),
656+
"https://": self.create_transport(
657+
proxy=httpx.Proxy("http://localhost:8443")
658+
),
659+
}
660+
client1 = self.create_client(mounts=proxy_mounts)
661+
client2 = self.create_client(mounts=proxy_mounts)
662+
self.perform_request(self.URL, client=client1)
663+
self.perform_request(https_url, client=client2)
664+
spans = self.assert_span(num_spans=2)
665+
self.assertEqual(
666+
spans[0].attributes[SpanAttributes.HTTP_URL], self.URL
667+
)
668+
self.assertEqual(
669+
spans[1].attributes[SpanAttributes.HTTP_URL], https_url
670+
)
671+
646672
class BaseInstrumentorTest(BaseTest, metaclass=abc.ABCMeta):
647673
@abc.abstractmethod
648674
def create_client(
649675
self,
650676
transport: typing.Union[
651677
SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport, None
652678
] = None,
679+
**kwargs,
653680
):
654681
pass
655682

683+
@abc.abstractmethod
684+
def create_proxy_transport(self, url: str):
685+
pass
686+
656687
def setUp(self):
657688
super().setUp()
658689
HTTPXClientInstrumentor().instrument()
659690
self.client = self.create_client()
660691
HTTPXClientInstrumentor().uninstrument()
661692

693+
def create_proxy_mounts(self):
694+
return {
695+
"http://": self.create_proxy_transport(
696+
"http://localhost:8080"
697+
),
698+
"https://": self.create_proxy_transport(
699+
"http://localhost:8080"
700+
),
701+
}
702+
703+
def assert_proxy_mounts(self, mounts, num_mounts, transport_type):
704+
self.assertEqual(len(mounts), num_mounts)
705+
for transport in mounts:
706+
with self.subTest(transport):
707+
self.assertIsInstance(
708+
transport,
709+
transport_type,
710+
)
711+
662712
def test_custom_tracer_provider(self):
663713
resource = resources.Resource.create({})
664714
result = self.create_tracer_provider(resource=resource)
@@ -855,6 +905,71 @@ def test_uninstrument_new_client(self):
855905
self.assertEqual(result.text, "Hello!")
856906
self.assert_span()
857907

908+
def test_instrument_proxy(self):
909+
proxy_mounts = self.create_proxy_mounts()
910+
HTTPXClientInstrumentor().instrument()
911+
client = self.create_client(mounts=proxy_mounts)
912+
self.perform_request(self.URL, client=client)
913+
self.assert_span(num_spans=1)
914+
self.assert_proxy_mounts(
915+
client._mounts.values(),
916+
2,
917+
(SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport),
918+
)
919+
HTTPXClientInstrumentor().uninstrument()
920+
921+
def test_instrument_client_with_proxy(self):
922+
proxy_mounts = self.create_proxy_mounts()
923+
client = self.create_client(mounts=proxy_mounts)
924+
self.assert_proxy_mounts(
925+
client._mounts.values(),
926+
2,
927+
(httpx.HTTPTransport, httpx.AsyncHTTPTransport),
928+
)
929+
HTTPXClientInstrumentor().instrument_client(client)
930+
result = self.perform_request(self.URL, client=client)
931+
self.assertEqual(result.text, "Hello!")
932+
self.assert_span(num_spans=1)
933+
self.assert_proxy_mounts(
934+
client._mounts.values(),
935+
2,
936+
(SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport),
937+
)
938+
HTTPXClientInstrumentor().uninstrument_client(client)
939+
940+
def test_uninstrument_client_with_proxy(self):
941+
proxy_mounts = self.create_proxy_mounts()
942+
HTTPXClientInstrumentor().instrument()
943+
client = self.create_client(mounts=proxy_mounts)
944+
self.assert_proxy_mounts(
945+
client._mounts.values(),
946+
2,
947+
(SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport),
948+
)
949+
950+
HTTPXClientInstrumentor().uninstrument_client(client)
951+
result = self.perform_request(self.URL, client=client)
952+
953+
self.assertEqual(result.text, "Hello!")
954+
self.assert_span(num_spans=0)
955+
self.assert_proxy_mounts(
956+
client._mounts.values(),
957+
2,
958+
(httpx.HTTPTransport, httpx.AsyncHTTPTransport),
959+
)
960+
# Test that other clients as well as instance client is still
961+
# instrumented
962+
client2 = self.create_client()
963+
result = self.perform_request(self.URL, client=client2)
964+
self.assertEqual(result.text, "Hello!")
965+
self.assert_span()
966+
967+
self.memory_exporter.clear()
968+
969+
result = self.perform_request(self.URL)
970+
self.assertEqual(result.text, "Hello!")
971+
self.assert_span()
972+
858973

859974
class TestSyncIntegration(BaseTestCases.BaseManualTest):
860975
def setUp(self):
@@ -871,8 +986,9 @@ def create_transport(
871986
tracer_provider: typing.Optional["TracerProvider"] = None,
872987
request_hook: typing.Optional["RequestHook"] = None,
873988
response_hook: typing.Optional["ResponseHook"] = None,
989+
**kwargs,
874990
):
875-
transport = httpx.HTTPTransport()
991+
transport = httpx.HTTPTransport(**kwargs)
876992
telemetry_transport = SyncOpenTelemetryTransport(
877993
transport,
878994
tracer_provider=tracer_provider,
@@ -884,8 +1000,9 @@ def create_transport(
8841000
def create_client(
8851001
self,
8861002
transport: typing.Optional[SyncOpenTelemetryTransport] = None,
1003+
**kwargs,
8871004
):
888-
return httpx.Client(transport=transport)
1005+
return httpx.Client(transport=transport, **kwargs)
8891006

8901007
def perform_request(
8911008
self,
@@ -921,8 +1038,9 @@ def create_transport(
9211038
tracer_provider: typing.Optional["TracerProvider"] = None,
9221039
request_hook: typing.Optional["AsyncRequestHook"] = None,
9231040
response_hook: typing.Optional["AsyncResponseHook"] = None,
1041+
**kwargs,
9241042
):
925-
transport = httpx.AsyncHTTPTransport()
1043+
transport = httpx.AsyncHTTPTransport(**kwargs)
9261044
telemetry_transport = AsyncOpenTelemetryTransport(
9271045
transport,
9281046
tracer_provider=tracer_provider,
@@ -934,8 +1052,9 @@ def create_transport(
9341052
def create_client(
9351053
self,
9361054
transport: typing.Optional[AsyncOpenTelemetryTransport] = None,
1055+
**kwargs,
9371056
):
938-
return httpx.AsyncClient(transport=transport)
1057+
return httpx.AsyncClient(transport=transport, **kwargs)
9391058

9401059
def perform_request(
9411060
self,
@@ -977,8 +1096,9 @@ class TestSyncInstrumentationIntegration(BaseTestCases.BaseInstrumentorTest):
9771096
def create_client(
9781097
self,
9791098
transport: typing.Optional[SyncOpenTelemetryTransport] = None,
1099+
**kwargs,
9801100
):
981-
return httpx.Client()
1101+
return httpx.Client(**kwargs)
9821102

9831103
def perform_request(
9841104
self,
@@ -991,6 +1111,9 @@ def perform_request(
9911111
return self.client.request(method, url, headers=headers)
9921112
return client.request(method, url, headers=headers)
9931113

1114+
def create_proxy_transport(self, url):
1115+
return httpx.HTTPTransport(proxy=httpx.Proxy(url))
1116+
9941117

9951118
class TestAsyncInstrumentationIntegration(BaseTestCases.BaseInstrumentorTest):
9961119
response_hook = staticmethod(_async_response_hook)
@@ -1007,8 +1130,9 @@ def setUp(self):
10071130
def create_client(
10081131
self,
10091132
transport: typing.Optional[AsyncOpenTelemetryTransport] = None,
1133+
**kwargs,
10101134
):
1011-
return httpx.AsyncClient()
1135+
return httpx.AsyncClient(**kwargs)
10121136

10131137
def perform_request(
10141138
self,
@@ -1027,6 +1151,9 @@ async def _perform_request():
10271151

10281152
return _async_call(_perform_request())
10291153

1154+
def create_proxy_transport(self, url):
1155+
return httpx.AsyncHTTPTransport(proxy=httpx.Proxy(url))
1156+
10301157
def test_basic_multiple(self):
10311158
# We need to create separate clients because in httpx >= 0.19,
10321159
# closing the client after "with" means the second http call fails

0 commit comments

Comments
 (0)