@@ -94,7 +94,7 @@ def response_hook(span, response):
94
94
from opentelemetry .instrumentation .instrumentor import BaseInstrumentor
95
95
from opentelemetry .instrumentation .utils import unwrap
96
96
from opentelemetry .semconv .trace import SpanAttributes
97
- from opentelemetry .trace import SpanKind , get_tracer
97
+ from opentelemetry .trace import SpanKind , Status , StatusCode , get_tracer
98
98
99
99
from .utils import sanitize_body
100
100
@@ -103,6 +103,7 @@ def response_hook(span, response):
103
103
es_transport_split = elasticsearch .VERSION [0 ] > 7
104
104
if es_transport_split :
105
105
import elastic_transport
106
+ from elastic_transport ._models import DefaultType
106
107
107
108
logger = getLogger (__name__ )
108
109
@@ -173,7 +174,12 @@ def _instrument(self, **kwargs):
173
174
174
175
def _uninstrument (self , ** kwargs ):
175
176
# pylint: disable=no-member
176
- unwrap (elasticsearch .Transport , "perform_request" )
177
+ transport_class = (
178
+ elastic_transport .Transport
179
+ if es_transport_split
180
+ else elasticsearch .Transport
181
+ )
182
+ unwrap (transport_class , "perform_request" )
177
183
178
184
179
185
_regex_doc_url = re .compile (r"/_doc/([^/]+)" )
@@ -234,7 +240,21 @@ def wrapper(wrapped, _, args, kwargs):
234
240
kind = SpanKind .CLIENT ,
235
241
) as span :
236
242
if callable (request_hook ):
237
- request_hook (span , method , url , kwargs )
243
+ if es_transport_split :
244
+
245
+ def normalize_kwargs (k , v ):
246
+ if isinstance (v , DefaultType ):
247
+ v = str (v )
248
+ elif isinstance (v , elastic_transport .HttpHeaders ):
249
+ v = dict (v )
250
+ return (k , v )
251
+
252
+ hook_kwargs = dict (
253
+ normalize_kwargs (k , v ) for k , v in kwargs .items ()
254
+ )
255
+ else :
256
+ hook_kwargs = kwargs
257
+ request_hook (span , method , url , hook_kwargs )
238
258
239
259
if span .is_recording ():
240
260
attributes = {
@@ -260,16 +280,41 @@ def wrapper(wrapped, _, args, kwargs):
260
280
span .set_attribute (key , value )
261
281
262
282
rv = wrapped (* args , ** kwargs )
263
- if isinstance (rv , dict ) and span .is_recording ():
283
+
284
+ body = rv .body if es_transport_split else rv
285
+ if isinstance (body , dict ) and span .is_recording ():
264
286
for member in _ATTRIBUTES_FROM_RESULT :
265
- if member in rv :
287
+ if member in body :
266
288
span .set_attribute (
267
289
f"elasticsearch.{ member } " ,
268
- str (rv [member ]),
290
+ str (body [member ]),
291
+ )
292
+
293
+ # since the transport split the raising of exceptions that set the error status
294
+ # are called after this code so need to set error status manually
295
+ if es_transport_split and span .is_recording ():
296
+ if not (method == "HEAD" and rv .meta .status == 404 ) and (
297
+ not 200 <= rv .meta .status < 299
298
+ ):
299
+ exception = elasticsearch .exceptions .HTTP_EXCEPTIONS .get (
300
+ rv .meta .status , elasticsearch .exceptions .ApiError
301
+ )
302
+ message = str (body )
303
+ if isinstance (body , dict ):
304
+ error = body .get ("error" , message )
305
+ if isinstance (error , dict ) and "type" in error :
306
+ error = error ["type" ]
307
+ message = error
308
+
309
+ span .set_status (
310
+ Status (
311
+ status_code = StatusCode .ERROR ,
312
+ description = f"{ exception .__name__ } : { message } " ,
269
313
)
314
+ )
270
315
271
316
if callable (response_hook ):
272
- response_hook (span , rv )
317
+ response_hook (span , body )
273
318
return rv
274
319
275
320
return wrapper
0 commit comments