@@ -94,14 +94,17 @@ def response_hook(span, instance, response):
94
94
from typing import Any , Collection
95
95
96
96
import redis
97
+ import redis .commands
97
98
from wrapt import wrap_function_wrapper
98
99
99
100
from opentelemetry import trace
100
101
from opentelemetry .instrumentation .instrumentor import BaseInstrumentor
101
102
from opentelemetry .instrumentation .redis .package import _instruments
102
103
from opentelemetry .instrumentation .redis .util import (
104
+ _args_or_none ,
103
105
_extract_conn_attributes ,
104
106
_format_command_args ,
107
+ _set_span_attribute ,
105
108
)
106
109
from opentelemetry .instrumentation .redis .version import __version__
107
110
from opentelemetry .instrumentation .utils import unwrap
@@ -217,6 +220,67 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
217
220
response_hook (span , instance , response )
218
221
return response
219
222
223
+ def _traced_create_index (func , instance , args , kwargs ):
224
+ span_name = "redis.create_index"
225
+ with tracer .start_as_current_span (span_name ) as span :
226
+ _set_span_attribute (
227
+ span ,
228
+ "redis.create_index.fields" ,
229
+ kwargs .get ("fields" ).__str__ (),
230
+ )
231
+ _set_span_attribute (
232
+ span ,
233
+ "redis.create_index.definition" ,
234
+ kwargs .get ("definition" ).__str__ (),
235
+ )
236
+ response = func (* args , ** kwargs )
237
+ return response
238
+
239
+ def _traced_search (func , instance , args , kwargs ):
240
+ span_name = "redis.search"
241
+ with tracer .start_as_current_span (span_name ) as span :
242
+ query = kwargs .get ("query" ) or _args_or_none (args , 0 )
243
+ _set_span_attribute (
244
+ span ,
245
+ "redis.commands.search.query" ,
246
+ query .query_string (),
247
+ )
248
+ response = func (* args , ** kwargs )
249
+ _set_span_attribute (
250
+ span ,
251
+ "redis.commands.search.total" ,
252
+ response .total
253
+ )
254
+ _set_span_attribute (
255
+ span ,
256
+ "redis.commands.search.duration" ,
257
+ response .duration
258
+ )
259
+ for index , doc in enumerate (response .docs ):
260
+ _set_span_attribute (
261
+ span ,
262
+ f"redis.commands.search.xdoc_{ index } " ,
263
+ doc .__str__ ()
264
+ )
265
+ return response
266
+
267
+ def _traced_aggregate (func , instance , args , kwargs ):
268
+ span_name = "redis.aggregate"
269
+ with tracer .start_as_current_span (span_name ) as span :
270
+ query = kwargs .get ("query" ) or _args_or_none (args , 0 )
271
+ _set_span_attribute (
272
+ span ,
273
+ "redis.commands.aggregate.query" ,
274
+ query .query_string (),
275
+ )
276
+ response = func (* args , ** kwargs )
277
+ _set_span_attribute (
278
+ span ,
279
+ "redis.commands.aggregate.results" ,
280
+ str (response .rows )
281
+ )
282
+ return response
283
+
220
284
pipeline_class = (
221
285
"BasePipeline" if redis .VERSION < (3 , 0 , 0 ) else "Pipeline"
222
286
)
@@ -235,6 +299,21 @@ def _traced_execute_pipeline(func, instance, args, kwargs):
235
299
f"{ pipeline_class } .immediate_execute_command" ,
236
300
_traced_execute_command ,
237
301
)
302
+ wrap_function_wrapper (
303
+ "redis.commands.search" ,
304
+ "Search.create_index" ,
305
+ _traced_create_index ,
306
+ )
307
+ wrap_function_wrapper (
308
+ "redis.commands.search" ,
309
+ "Search.search" ,
310
+ _traced_search ,
311
+ )
312
+ wrap_function_wrapper (
313
+ "redis.commands.search" ,
314
+ "Search.aggregate" ,
315
+ _traced_aggregate ,
316
+ )
238
317
if redis .VERSION >= _REDIS_CLUSTER_VERSION :
239
318
wrap_function_wrapper (
240
319
"redis.cluster" ,
@@ -345,6 +424,9 @@ def _instrument(self, **kwargs):
345
424
)
346
425
347
426
def _uninstrument (self , ** kwargs ):
427
+ unwrap (redis .commands .search .Search , "create_index" )
428
+ unwrap (redis .commands .search .Search , "search" )
429
+ unwrap (redis .commands .search .Search , "aggregate" )
348
430
if redis .VERSION < (3 , 0 , 0 ):
349
431
unwrap (redis .StrictRedis , "execute_command" )
350
432
unwrap (redis .StrictRedis , "pipeline" )
0 commit comments