diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c756d7bcb..77b58df8f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#2630](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2630)) - `opentelemetry-instrumentation-system-metrics` Add support for capture open file descriptors ([#2652](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2652)) +- `opentelemetry-instrumentation-httpx` Add support for instrument client with proxy + ([#2664](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2664)) - `opentelemetry-instrumentation-aiohttp-client` Implement new semantic convention opt-in migration ([#2673](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2673)) - `opentelemetry-instrumentation-django` Add `http.target` to Django duration metric attributes diff --git a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py index d2ff0be292..e3ce383d7e 100644 --- a/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-httpx/src/opentelemetry/instrumentation/httpx/__init__.py @@ -640,6 +640,7 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._original_transport = self._transport + self._original_mounts = self._mounts.copy() self._is_instrumented_by_opentelemetry = True self._transport = SyncOpenTelemetryTransport( @@ -648,6 +649,21 @@ def __init__(self, *args, **kwargs): request_hook=_InstrumentedClient._request_hook, response_hook=_InstrumentedClient._response_hook, ) + self._mounts.update( + { + url_pattern: ( + SyncOpenTelemetryTransport( + transport, + tracer_provider=_InstrumentedClient._tracer_provider, + request_hook=_InstrumentedClient._request_hook, + response_hook=_InstrumentedClient._response_hook, + ) + if transport is not None + else transport + ) + for url_pattern, transport in self._original_mounts.items() + } + ) class _InstrumentedAsyncClient(httpx.AsyncClient): @@ -659,6 +675,7 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._original_transport = self._transport + self._original_mounts = self._mounts.copy() self._is_instrumented_by_opentelemetry = True self._transport = AsyncOpenTelemetryTransport( @@ -668,6 +685,22 @@ def __init__(self, *args, **kwargs): response_hook=_InstrumentedAsyncClient._response_hook, ) + self._mounts.update( + { + url_pattern: ( + AsyncOpenTelemetryTransport( + transport, + tracer_provider=_InstrumentedAsyncClient._tracer_provider, + request_hook=_InstrumentedAsyncClient._request_hook, + response_hook=_InstrumentedAsyncClient._response_hook, + ) + if transport is not None + else transport + ) + for url_pattern, transport in self._original_mounts.items() + } + ) + class HTTPXClientInstrumentor(BaseInstrumentor): # pylint: disable=protected-access,attribute-defined-outside-init @@ -752,6 +785,7 @@ def instrument_client( if not client._is_instrumented_by_opentelemetry: if isinstance(client, httpx.Client): client._original_transport = client._transport + client._original_mounts = client._mounts.copy() transport = client._transport or httpx.HTTPTransport() client._transport = SyncOpenTelemetryTransport( transport, @@ -760,8 +794,25 @@ def instrument_client( response_hook=response_hook, ) client._is_instrumented_by_opentelemetry = True + client._mounts.update( + { + url_pattern: ( + SyncOpenTelemetryTransport( + transport, + tracer_provider=tracer_provider, + request_hook=request_hook, + response_hook=response_hook, + ) + if transport is not None + else transport + ) + for url_pattern, transport in client._original_mounts.items() + } + ) + if isinstance(client, httpx.AsyncClient): transport = client._transport or httpx.AsyncHTTPTransport() + client._original_mounts = client._mounts.copy() client._transport = AsyncOpenTelemetryTransport( transport, tracer_provider=tracer_provider, @@ -769,6 +820,21 @@ def instrument_client( response_hook=response_hook, ) client._is_instrumented_by_opentelemetry = True + client._mounts.update( + { + url_pattern: ( + AsyncOpenTelemetryTransport( + transport, + tracer_provider=tracer_provider, + request_hook=request_hook, + response_hook=response_hook, + ) + if transport is not None + else transport + ) + for url_pattern, transport in client._original_mounts.items() + } + ) else: _logger.warning( "Attempting to instrument Httpx client while already instrumented" @@ -787,6 +853,9 @@ def uninstrument_client( client._transport = client._original_transport del client._original_transport client._is_instrumented_by_opentelemetry = False + if hasattr(client, "_original_mounts"): + client._mounts = client._original_mounts.copy() + del client._original_mounts else: _logger.warning( "Attempting to uninstrument Httpx " diff --git a/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py b/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py index 84bab598e6..03141e61b5 100644 --- a/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py +++ b/instrumentation/opentelemetry-instrumentation-httpx/tests/test_httpx_integration.py @@ -530,6 +530,7 @@ def create_transport( tracer_provider: typing.Optional["TracerProvider"] = None, request_hook: typing.Optional["RequestHook"] = None, response_hook: typing.Optional["ResponseHook"] = None, + **kwargs, ): pass @@ -539,6 +540,7 @@ def create_client( transport: typing.Union[ SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport, None ] = None, + **kwargs, ): pass @@ -643,6 +645,30 @@ def test_not_recording_not_set_attribute_in_exception_new_semconv( self.assertFalse(mock_span.set_attribute.called) self.assertFalse(mock_span.set_status.called) + @respx.mock + def test_client_mounts_with_instrumented_transport(self): + https_url = "https://mock/status/200" + respx.get(https_url).mock(httpx.Response(200)) + proxy_mounts = { + "http://": self.create_transport( + proxy=httpx.Proxy("http://localhost:8080") + ), + "https://": self.create_transport( + proxy=httpx.Proxy("http://localhost:8443") + ), + } + client1 = self.create_client(mounts=proxy_mounts) + client2 = self.create_client(mounts=proxy_mounts) + self.perform_request(self.URL, client=client1) + self.perform_request(https_url, client=client2) + spans = self.assert_span(num_spans=2) + self.assertEqual( + spans[0].attributes[SpanAttributes.HTTP_URL], self.URL + ) + self.assertEqual( + spans[1].attributes[SpanAttributes.HTTP_URL], https_url + ) + class BaseInstrumentorTest(BaseTest, metaclass=abc.ABCMeta): @abc.abstractmethod def create_client( @@ -650,15 +676,39 @@ def create_client( transport: typing.Union[ SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport, None ] = None, + **kwargs, ): pass + @abc.abstractmethod + def create_proxy_transport(self, url: str): + pass + def setUp(self): super().setUp() HTTPXClientInstrumentor().instrument() self.client = self.create_client() HTTPXClientInstrumentor().uninstrument() + def create_proxy_mounts(self): + return { + "http://": self.create_proxy_transport( + "http://localhost:8080" + ), + "https://": self.create_proxy_transport( + "http://localhost:8080" + ), + } + + def assert_proxy_mounts(self, mounts, num_mounts, transport_type): + self.assertEqual(len(mounts), num_mounts) + for transport in mounts: + with self.subTest(transport): + self.assertIsInstance( + transport, + transport_type, + ) + def test_custom_tracer_provider(self): resource = resources.Resource.create({}) result = self.create_tracer_provider(resource=resource) @@ -855,6 +905,71 @@ def test_uninstrument_new_client(self): self.assertEqual(result.text, "Hello!") self.assert_span() + def test_instrument_proxy(self): + proxy_mounts = self.create_proxy_mounts() + HTTPXClientInstrumentor().instrument() + client = self.create_client(mounts=proxy_mounts) + self.perform_request(self.URL, client=client) + self.assert_span(num_spans=1) + self.assert_proxy_mounts( + client._mounts.values(), + 2, + (SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport), + ) + HTTPXClientInstrumentor().uninstrument() + + def test_instrument_client_with_proxy(self): + proxy_mounts = self.create_proxy_mounts() + client = self.create_client(mounts=proxy_mounts) + self.assert_proxy_mounts( + client._mounts.values(), + 2, + (httpx.HTTPTransport, httpx.AsyncHTTPTransport), + ) + HTTPXClientInstrumentor().instrument_client(client) + result = self.perform_request(self.URL, client=client) + self.assertEqual(result.text, "Hello!") + self.assert_span(num_spans=1) + self.assert_proxy_mounts( + client._mounts.values(), + 2, + (SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport), + ) + HTTPXClientInstrumentor().uninstrument_client(client) + + def test_uninstrument_client_with_proxy(self): + proxy_mounts = self.create_proxy_mounts() + HTTPXClientInstrumentor().instrument() + client = self.create_client(mounts=proxy_mounts) + self.assert_proxy_mounts( + client._mounts.values(), + 2, + (SyncOpenTelemetryTransport, AsyncOpenTelemetryTransport), + ) + + HTTPXClientInstrumentor().uninstrument_client(client) + result = self.perform_request(self.URL, client=client) + + self.assertEqual(result.text, "Hello!") + self.assert_span(num_spans=0) + self.assert_proxy_mounts( + client._mounts.values(), + 2, + (httpx.HTTPTransport, httpx.AsyncHTTPTransport), + ) + # Test that other clients as well as instance client is still + # instrumented + client2 = self.create_client() + result = self.perform_request(self.URL, client=client2) + self.assertEqual(result.text, "Hello!") + self.assert_span() + + self.memory_exporter.clear() + + result = self.perform_request(self.URL) + self.assertEqual(result.text, "Hello!") + self.assert_span() + class TestSyncIntegration(BaseTestCases.BaseManualTest): def setUp(self): @@ -871,8 +986,9 @@ def create_transport( tracer_provider: typing.Optional["TracerProvider"] = None, request_hook: typing.Optional["RequestHook"] = None, response_hook: typing.Optional["ResponseHook"] = None, + **kwargs, ): - transport = httpx.HTTPTransport() + transport = httpx.HTTPTransport(**kwargs) telemetry_transport = SyncOpenTelemetryTransport( transport, tracer_provider=tracer_provider, @@ -884,8 +1000,9 @@ def create_transport( def create_client( self, transport: typing.Optional[SyncOpenTelemetryTransport] = None, + **kwargs, ): - return httpx.Client(transport=transport) + return httpx.Client(transport=transport, **kwargs) def perform_request( self, @@ -921,8 +1038,9 @@ def create_transport( tracer_provider: typing.Optional["TracerProvider"] = None, request_hook: typing.Optional["AsyncRequestHook"] = None, response_hook: typing.Optional["AsyncResponseHook"] = None, + **kwargs, ): - transport = httpx.AsyncHTTPTransport() + transport = httpx.AsyncHTTPTransport(**kwargs) telemetry_transport = AsyncOpenTelemetryTransport( transport, tracer_provider=tracer_provider, @@ -934,8 +1052,9 @@ def create_transport( def create_client( self, transport: typing.Optional[AsyncOpenTelemetryTransport] = None, + **kwargs, ): - return httpx.AsyncClient(transport=transport) + return httpx.AsyncClient(transport=transport, **kwargs) def perform_request( self, @@ -977,8 +1096,9 @@ class TestSyncInstrumentationIntegration(BaseTestCases.BaseInstrumentorTest): def create_client( self, transport: typing.Optional[SyncOpenTelemetryTransport] = None, + **kwargs, ): - return httpx.Client() + return httpx.Client(**kwargs) def perform_request( self, @@ -991,6 +1111,9 @@ def perform_request( return self.client.request(method, url, headers=headers) return client.request(method, url, headers=headers) + def create_proxy_transport(self, url): + return httpx.HTTPTransport(proxy=httpx.Proxy(url)) + class TestAsyncInstrumentationIntegration(BaseTestCases.BaseInstrumentorTest): response_hook = staticmethod(_async_response_hook) @@ -1007,8 +1130,9 @@ def setUp(self): def create_client( self, transport: typing.Optional[AsyncOpenTelemetryTransport] = None, + **kwargs, ): - return httpx.AsyncClient() + return httpx.AsyncClient(**kwargs) def perform_request( self, @@ -1027,6 +1151,9 @@ async def _perform_request(): return _async_call(_perform_request()) + def create_proxy_transport(self, url): + return httpx.AsyncHTTPTransport(proxy=httpx.Proxy(url)) + def test_basic_multiple(self): # We need to create separate clients because in httpx >= 0.19, # closing the client after "with" means the second http call fails diff --git a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py index c5f19fc736..08337c2d4a 100644 --- a/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-redis/src/opentelemetry/instrumentation/redis/__init__.py @@ -203,6 +203,8 @@ def _traced_execute_pipeline(func, instance, args, kwargs): span_name, ) = _build_span_meta_data_for_pipeline(instance) + exception = None + with tracer.start_as_current_span( span_name, kind=trace.SpanKind.CLIENT ) as span: @@ -216,13 +218,17 @@ def _traced_execute_pipeline(func, instance, args, kwargs): response = None try: response = func(*args, **kwargs) - except redis.WatchError: + except redis.WatchError as watch_exception: span.set_status(StatusCode.UNSET) + exception = watch_exception if callable(response_hook): response_hook(span, instance, response) - return response + if exception: + raise exception + + return response pipeline_class = ( "BasePipeline" if redis.VERSION < (3, 0, 0) else "Pipeline" @@ -279,6 +285,8 @@ async def _async_traced_execute_pipeline(func, instance, args, kwargs): span_name, ) = _build_span_meta_data_for_pipeline(instance) + exception = None + with tracer.start_as_current_span( span_name, kind=trace.SpanKind.CLIENT ) as span: @@ -292,12 +300,17 @@ async def _async_traced_execute_pipeline(func, instance, args, kwargs): response = None try: response = await func(*args, **kwargs) - except redis.WatchError: + except redis.WatchError as watch_exception: span.set_status(StatusCode.UNSET) + exception = watch_exception if callable(response_hook): response_hook(span, instance, response) - return response + + if exception: + raise exception + + return response if redis.VERSION >= _REDIS_ASYNCIO_VERSION: wrap_function_wrapper( diff --git a/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py b/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py index 23d21b6e5a..c436589adb 100644 --- a/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py +++ b/instrumentation/opentelemetry-instrumentation-redis/tests/test_redis.py @@ -359,7 +359,7 @@ def test_response_error(self): def test_watch_error_sync(self): def redis_operations(): - try: + with pytest.raises(WatchError): redis_client = fakeredis.FakeStrictRedis() pipe = redis_client.pipeline(transaction=True) pipe.watch("a") @@ -367,8 +367,6 @@ def redis_operations(): pipe.multi() pipe.set("a", "1") pipe.execute() - except WatchError: - pass redis_operations() @@ -400,7 +398,7 @@ def tearDown(self): @pytest.mark.asyncio async def test_watch_error_async(self): async def redis_operations(): - try: + with pytest.raises(WatchError): redis_client = FakeRedis() async with redis_client.pipeline(transaction=False) as pipe: await pipe.watch("a") @@ -408,8 +406,6 @@ async def redis_operations(): pipe.multi() await pipe.set("a", "1") await pipe.execute() - except WatchError: - pass await redis_operations() diff --git a/scripts/build.sh b/scripts/build.sh index 93dc0edce1..69df4f7748 100755 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -32,7 +32,7 @@ DISTDIR=dist cd $DISTDIR for x in * ; do # FIXME: Remove this logic once these packages are available in Pypi - if echo "$x" | grep -Eq "^opentelemetry_(instrumentation_aiohttp_server|resource_detector_container).*(\.tar\.gz|\.whl)$"; then + if echo "$x" | grep -Eq "^opentelemetry_resource_detector_container.*(\.tar\.gz|\.whl)$"; then echo "Skipping $x because of erroneous uploads. See: https://github.com/open-telemetry/opentelemetry-python-contrib/issues/2053" rm $x # FIXME: Remove this once opentelemetry-resource-detector-azure package goes 1.X