@@ -94,7 +94,7 @@ def response_hook(span, response):
94
94
from opentelemetry .instrumentation .instrumentor import BaseInstrumentor
95
95
from opentelemetry .instrumentation .utils import unwrap
96
96
from opentelemetry .semconv .trace import SpanAttributes
97
- from opentelemetry .trace import SpanKind , get_tracer
97
+ from opentelemetry .trace import SpanKind , Status , StatusCode , get_tracer
98
98
99
99
from .utils import sanitize_body
100
100
@@ -103,6 +103,7 @@ def response_hook(span, response):
103
103
es_transport_split = elasticsearch .VERSION [0 ] > 7
104
104
if es_transport_split :
105
105
import elastic_transport
106
+ from elastic_transport ._models import DefaultType
106
107
107
108
logger = getLogger (__name__ )
108
109
@@ -173,7 +174,12 @@ def _instrument(self, **kwargs):
173
174
174
175
def _uninstrument (self , ** kwargs ):
175
176
# pylint: disable=no-member
176
- unwrap (elasticsearch .Transport , "perform_request" )
177
+ transport_class = (
178
+ elastic_transport .Transport
179
+ if es_transport_split
180
+ else elasticsearch .Transport
181
+ )
182
+ unwrap (transport_class , "perform_request" )
177
183
178
184
179
185
_regex_doc_url = re .compile (r"/_doc/([^/]+)" )
@@ -182,6 +188,7 @@ def _uninstrument(self, **kwargs):
182
188
_regex_search_url = re .compile (r"/([^/]+)/_search[/]?" )
183
189
184
190
191
+ # pylint: disable=too-many-statements
185
192
def _wrap_perform_request (
186
193
tracer ,
187
194
span_name_prefix ,
@@ -234,7 +241,22 @@ def wrapper(wrapped, _, args, kwargs):
234
241
kind = SpanKind .CLIENT ,
235
242
) as span :
236
243
if callable (request_hook ):
237
- request_hook (span , method , url , kwargs )
244
+ # elasticsearch 8 changed the parameters quite a bit
245
+ if es_transport_split :
246
+
247
+ def normalize_kwargs (k , v ):
248
+ if isinstance (v , DefaultType ):
249
+ v = str (v )
250
+ elif isinstance (v , elastic_transport .HttpHeaders ):
251
+ v = dict (v )
252
+ return (k , v )
253
+
254
+ hook_kwargs = dict (
255
+ normalize_kwargs (k , v ) for k , v in kwargs .items ()
256
+ )
257
+ else :
258
+ hook_kwargs = kwargs
259
+ request_hook (span , method , url , hook_kwargs )
238
260
239
261
if span .is_recording ():
240
262
attributes = {
@@ -260,16 +282,41 @@ def wrapper(wrapped, _, args, kwargs):
260
282
span .set_attribute (key , value )
261
283
262
284
rv = wrapped (* args , ** kwargs )
263
- if isinstance (rv , dict ) and span .is_recording ():
285
+
286
+ body = rv .body if es_transport_split else rv
287
+ if isinstance (body , dict ) and span .is_recording ():
264
288
for member in _ATTRIBUTES_FROM_RESULT :
265
- if member in rv :
289
+ if member in body :
266
290
span .set_attribute (
267
291
f"elasticsearch.{ member } " ,
268
- str (rv [member ]),
292
+ str (body [member ]),
293
+ )
294
+
295
+ # since the transport split the raising of exceptions that set the error status
296
+ # are called after this code so need to set error status manually
297
+ if es_transport_split and span .is_recording ():
298
+ if not (method == "HEAD" and rv .meta .status == 404 ) and (
299
+ not 200 <= rv .meta .status < 299
300
+ ):
301
+ exception = elasticsearch .exceptions .HTTP_EXCEPTIONS .get (
302
+ rv .meta .status , elasticsearch .exceptions .ApiError
303
+ )
304
+ message = str (body )
305
+ if isinstance (body , dict ):
306
+ error = body .get ("error" , message )
307
+ if isinstance (error , dict ) and "type" in error :
308
+ error = error ["type" ]
309
+ message = error
310
+
311
+ span .set_status (
312
+ Status (
313
+ status_code = StatusCode .ERROR ,
314
+ description = f"{ exception .__name__ } : { message } " ,
269
315
)
316
+ )
270
317
271
318
if callable (response_hook ):
272
- response_hook (span , rv )
319
+ response_hook (span , body )
273
320
return rv
274
321
275
322
return wrapper
0 commit comments