54
54
import logging
55
55
56
56
from botocore .client import BaseClient
57
+ from botocore .exceptions import ClientError , ParamValidationError
57
58
from wrapt import ObjectProxy , wrap_function_wrapper
58
59
59
60
from opentelemetry import context as context_api
60
61
from opentelemetry import propagators
61
62
from opentelemetry .instrumentation .botocore .version import __version__
62
63
from opentelemetry .instrumentation .instrumentor import BaseInstrumentor
64
+ from opentelemetry .instrumentation .utils import unwrap
63
65
from opentelemetry .sdk .trace import Resource
64
66
from opentelemetry .trace import SpanKind , get_tracer
65
67
@@ -75,15 +77,13 @@ def _patched_endpoint_prepare_request(wrapped, instance, args, kwargs):
75
77
76
78
77
79
class BotocoreInstrumentor (BaseInstrumentor ):
78
- """A instrumentor for Botocore
80
+ """An instrumentor for Botocore.
79
81
80
82
See `BaseInstrumentor`
81
83
"""
82
84
83
85
def _instrument (self , ** kwargs ):
84
86
85
- # FIXME should the tracer provider be accessed via Configuration
86
- # instead?
87
87
# pylint: disable=attribute-defined-outside-init
88
88
self ._tracer = get_tracer (
89
89
__name__ , __version__ , kwargs .get ("tracer_provider" )
@@ -104,137 +104,66 @@ def _instrument(self, **kwargs):
104
104
def _uninstrument (self , ** kwargs ):
105
105
unwrap (BaseClient , "_make_api_call" )
106
106
107
+ # pylint: disable=too-many-branches
107
108
def _patched_api_call (self , original_func , instance , args , kwargs ):
108
109
if context_api .get_value ("suppress_instrumentation" ):
109
110
return original_func (* args , ** kwargs )
110
111
111
- endpoint_name = deep_getattr (instance , "_endpoint._endpoint_prefix" )
112
+ # pylint: disable=protected-access
113
+ service_name = instance ._service_model .service_name
114
+ operation_name , api_params = args
112
115
113
116
with self ._tracer .start_as_current_span (
114
- "{}.command " .format (endpoint_name ), kind = SpanKind .CONSUMER ,
117
+ "{}" .format (service_name ), kind = SpanKind .CLIENT ,
115
118
) as span :
116
-
117
- operation = None
118
- if args and span .is_recording ():
119
- operation = args [0 ]
120
- span .resource = Resource (
121
- attributes = {
122
- "endpoint" : endpoint_name ,
123
- "operation" : operation .lower (),
124
- }
125
- )
126
-
127
- else :
128
- span .resource = Resource (
129
- attributes = {"endpoint" : endpoint_name }
130
- )
131
-
132
- add_span_arg_tags (
133
- span ,
134
- endpoint_name ,
135
- args ,
136
- ("action" , "params" , "path" , "verb" ),
137
- {"params" , "path" , "verb" },
138
- )
119
+ error = None
120
+ result = None
139
121
140
122
if span .is_recording ():
141
- region_name = deep_getattr (instance , "meta.region_name" )
142
-
143
- meta = {
144
- "aws.agent" : "botocore" ,
145
- "aws.operation" : operation ,
146
- "aws.region" : region_name ,
147
- }
148
- for key , value in meta .items ():
149
- span .set_attribute (key , value )
150
-
151
- result = original_func (* args , ** kwargs )
123
+ span .set_attribute ("aws.operation" , operation_name )
124
+ span .set_attribute ("aws.region" , instance .meta .region_name )
125
+ span .set_attribute ("aws.service" , service_name )
126
+ if "QueueUrl" in api_params :
127
+ span .set_attribute ("aws.queue_url" , api_params ["QueueUrl" ])
128
+ if "TableName" in api_params :
129
+ span .set_attribute (
130
+ "aws.table_name" , api_params ["TableName" ]
131
+ )
132
+
133
+ try :
134
+ result = original_func (* args , ** kwargs )
135
+ except ClientError as ex :
136
+ error = ex
137
+
138
+ if error :
139
+ result = error .response
152
140
153
141
if span .is_recording ():
154
- span .set_attribute (
155
- "http.status_code" ,
156
- result ["ResponseMetadata" ]["HTTPStatusCode" ],
157
- )
158
- span .set_attribute (
159
- "retry_attempts" ,
160
- result ["ResponseMetadata" ]["RetryAttempts" ],
161
- )
142
+ if "ResponseMetadata" in result :
143
+ metadata = result ["ResponseMetadata" ]
144
+ req_id = None
145
+ if "RequestId" in metadata :
146
+ req_id = metadata ["RequestId" ]
147
+ elif "HTTPHeaders" in metadata :
148
+ headers = metadata ["HTTPHeaders" ]
149
+ if "x-amzn-RequestId" in headers :
150
+ req_id = headers ["x-amzn-RequestId" ]
151
+ elif "x-amz-request-id" in headers :
152
+ req_id = headers ["x-amz-request-id" ]
153
+ elif "x-amz-id-2" in headers :
154
+ req_id = headers ["x-amz-id-2" ]
155
+
156
+ if req_id :
157
+ span .set_attribute (
158
+ "aws.request_id" , req_id ,
159
+ )
160
+
161
+ if "HTTPStatusCode" in metadata :
162
+ span .set_attribute (
163
+ "http.status_code" , metadata ["HTTPStatusCode" ],
164
+ )
165
+
166
+ if error :
167
+ raise error
162
168
163
169
return result
164
-
165
-
166
- def unwrap (obj , attr ):
167
- function = getattr (obj , attr , None )
168
- if (
169
- function
170
- and isinstance (function , ObjectProxy )
171
- and hasattr (function , "__wrapped__" )
172
- ):
173
- setattr (obj , attr , function .__wrapped__ )
174
-
175
-
176
- def add_span_arg_tags (span , endpoint_name , args , args_names , args_traced ):
177
- def truncate_arg_value (value , max_len = 1024 ):
178
- """Truncate values which are bytes and greater than `max_len`.
179
- Useful for parameters like "Body" in `put_object` operations.
180
- """
181
- if isinstance (value , bytes ) and len (value ) > max_len :
182
- return b"..."
183
-
184
- return value
185
-
186
- def flatten_dict (dict_ , sep = "." , prefix = "" ):
187
- """
188
- Returns a normalized dict of depth 1 with keys in order of embedding
189
- """
190
- # adapted from https://stackoverflow.com/a/19647596
191
- return (
192
- {
193
- prefix + sep + k if prefix else k : v
194
- for kk , vv in dict_ .items ()
195
- for k , v in flatten_dict (vv , sep , kk ).items ()
196
- }
197
- if isinstance (dict_ , dict )
198
- else {prefix : dict_ }
199
- )
200
-
201
- if not span .is_recording ():
202
- return
203
-
204
- if endpoint_name not in {"kms" , "sts" }:
205
- tags = dict (
206
- (name , value )
207
- for (name , value ) in zip (args_names , args )
208
- if name in args_traced
209
- )
210
- tags = flatten_dict (tags )
211
- for key , value in {
212
- k : truncate_arg_value (v )
213
- for k , v in tags .items ()
214
- if k not in {"s3" : ["params.Body" ]}.get (endpoint_name , [])
215
- }.items ():
216
- span .set_attribute (key , value )
217
-
218
-
219
- def deep_getattr (obj , attr_string , default = None ):
220
- """
221
- Returns the attribute of ``obj`` at the dotted path given by
222
- ``attr_string``, if no such attribute is reachable, returns ``default``.
223
-
224
- >>> deep_getattr(cass, "cluster")
225
- <cassandra.cluster.Cluster object at 0xa20c350
226
-
227
- >>> deep_getattr(cass, "cluster.metadata.partitioner")
228
- u"org.apache.cassandra.dht.Murmur3Partitioner"
229
-
230
- >>> deep_getattr(cass, "i.dont.exist", default="default")
231
- "default"
232
- """
233
- attrs = attr_string .split ("." )
234
- for attr in attrs :
235
- try :
236
- obj = getattr (obj , attr )
237
- except AttributeError :
238
- return default
239
-
240
- return obj
0 commit comments