49
49
import logging
50
50
51
51
from botocore .client import BaseClient
52
+ from botocore .exceptions import ClientError , ParamValidationError
52
53
from wrapt import ObjectProxy , wrap_function_wrapper
53
54
54
55
from opentelemetry import context as context_api
55
56
from opentelemetry import propagators
56
57
from opentelemetry .instrumentation .botocore .version import __version__
57
58
from opentelemetry .instrumentation .instrumentor import BaseInstrumentor
59
+ from opentelemetry .instrumentation .utils import unwrap
58
60
from opentelemetry .sdk .trace import Resource
59
61
from opentelemetry .trace import SpanKind , get_tracer
60
62
@@ -70,15 +72,13 @@ def _patched_endpoint_prepare_request(wrapped, instance, args, kwargs):
70
72
71
73
72
74
class BotocoreInstrumentor (BaseInstrumentor ):
73
- """A instrumentor for Botocore
75
+ """An instrumentor for Botocore.
74
76
75
77
See `BaseInstrumentor`
76
78
"""
77
79
78
80
def _instrument (self , ** kwargs ):
79
81
80
- # FIXME should the tracer provider be accessed via Configuration
81
- # instead?
82
82
# pylint: disable=attribute-defined-outside-init
83
83
self ._tracer = get_tracer (
84
84
__name__ , __version__ , kwargs .get ("tracer_provider" )
@@ -99,137 +99,66 @@ def _instrument(self, **kwargs):
99
99
def _uninstrument (self , ** kwargs ):
100
100
unwrap (BaseClient , "_make_api_call" )
101
101
102
+ # pylint: disable=too-many-branches
102
103
def _patched_api_call (self , original_func , instance , args , kwargs ):
103
104
if context_api .get_value ("suppress_instrumentation" ):
104
105
return original_func (* args , ** kwargs )
105
106
106
- endpoint_name = deep_getattr (instance , "_endpoint._endpoint_prefix" )
107
+ # pylint: disable=protected-access
108
+ service_name = instance ._service_model .service_name
109
+ operation_name , api_params = args
110
+
111
+ error = None
112
+ result = None
107
113
108
114
with self ._tracer .start_as_current_span (
109
- "{}.command " .format (endpoint_name ), kind = SpanKind .CONSUMER ,
115
+ "{}" .format (service_name ), kind = SpanKind .CLIENT ,
110
116
) as span :
111
-
112
- operation = None
113
- if args and span .is_recording ():
114
- operation = args [0 ]
115
- span .resource = Resource (
116
- attributes = {
117
- "endpoint" : endpoint_name ,
118
- "operation" : operation .lower (),
119
- }
120
- )
121
-
122
- else :
123
- span .resource = Resource (
124
- attributes = {"endpoint" : endpoint_name }
125
- )
126
-
127
- add_span_arg_tags (
128
- span ,
129
- endpoint_name ,
130
- args ,
131
- ("action" , "params" , "path" , "verb" ),
132
- {"params" , "path" , "verb" },
133
- )
134
-
135
117
if span .is_recording ():
136
- region_name = deep_getattr (instance , "meta.region_name" )
137
-
138
- meta = {
139
- "aws.agent" : "botocore" ,
140
- "aws.operation" : operation ,
141
- "aws.region" : region_name ,
142
- }
143
- for key , value in meta .items ():
144
- span .set_attribute (key , value )
145
-
146
- result = original_func (* args , ** kwargs )
118
+ span .set_attribute ("aws.operation" , operation_name )
119
+ span .set_attribute ("aws.region" , instance .meta .region_name )
120
+ span .set_attribute ("aws.service" , service_name )
121
+ if "QueueUrl" in api_params :
122
+ span .set_attribute ("aws.queue_url" , api_params ["QueueUrl" ])
123
+ if "TableName" in api_params :
124
+ span .set_attribute (
125
+ "aws.table_name" , api_params ["TableName" ]
126
+ )
127
+
128
+ try :
129
+ result = original_func (* args , ** kwargs )
130
+ except ClientError as ex :
131
+ error = ex
132
+
133
+ if error :
134
+ result = error .response
147
135
148
136
if span .is_recording ():
149
- span .set_attribute (
150
- "http.status_code" ,
151
- result ["ResponseMetadata" ]["HTTPStatusCode" ],
152
- )
153
- span .set_attribute (
154
- "retry_attempts" ,
155
- result ["ResponseMetadata" ]["RetryAttempts" ],
156
- )
137
+ if "ResponseMetadata" in result :
138
+ metadata = result ["ResponseMetadata" ]
139
+ req_id = None
140
+ if "RequestId" in metadata :
141
+ req_id = metadata ["RequestId" ]
142
+ elif "HTTPHeaders" in metadata :
143
+ headers = metadata ["HTTPHeaders" ]
144
+ if "x-amzn-RequestId" in headers :
145
+ req_id = headers ["x-amzn-RequestId" ]
146
+ elif "x-amz-request-id" in headers :
147
+ req_id = headers ["x-amz-request-id" ]
148
+ elif "x-amz-id-2" in headers :
149
+ req_id = headers ["x-amz-id-2" ]
150
+
151
+ if req_id :
152
+ span .set_attribute (
153
+ "aws.request_id" , req_id ,
154
+ )
155
+
156
+ if "HTTPStatusCode" in metadata :
157
+ span .set_attribute (
158
+ "http.status_code" , metadata ["HTTPStatusCode" ],
159
+ )
160
+
161
+ if error :
162
+ raise error
157
163
158
164
return result
159
-
160
-
161
- def unwrap (obj , attr ):
162
- function = getattr (obj , attr , None )
163
- if (
164
- function
165
- and isinstance (function , ObjectProxy )
166
- and hasattr (function , "__wrapped__" )
167
- ):
168
- setattr (obj , attr , function .__wrapped__ )
169
-
170
-
171
- def add_span_arg_tags (span , endpoint_name , args , args_names , args_traced ):
172
- def truncate_arg_value (value , max_len = 1024 ):
173
- """Truncate values which are bytes and greater than `max_len`.
174
- Useful for parameters like "Body" in `put_object` operations.
175
- """
176
- if isinstance (value , bytes ) and len (value ) > max_len :
177
- return b"..."
178
-
179
- return value
180
-
181
- def flatten_dict (dict_ , sep = "." , prefix = "" ):
182
- """
183
- Returns a normalized dict of depth 1 with keys in order of embedding
184
- """
185
- # adapted from https://stackoverflow.com/a/19647596
186
- return (
187
- {
188
- prefix + sep + k if prefix else k : v
189
- for kk , vv in dict_ .items ()
190
- for k , v in flatten_dict (vv , sep , kk ).items ()
191
- }
192
- if isinstance (dict_ , dict )
193
- else {prefix : dict_ }
194
- )
195
-
196
- if not span .is_recording ():
197
- return
198
-
199
- if endpoint_name not in {"kms" , "sts" }:
200
- tags = dict (
201
- (name , value )
202
- for (name , value ) in zip (args_names , args )
203
- if name in args_traced
204
- )
205
- tags = flatten_dict (tags )
206
- for key , value in {
207
- k : truncate_arg_value (v )
208
- for k , v in tags .items ()
209
- if k not in {"s3" : ["params.Body" ]}.get (endpoint_name , [])
210
- }.items ():
211
- span .set_attribute (key , value )
212
-
213
-
214
- def deep_getattr (obj , attr_string , default = None ):
215
- """
216
- Returns the attribute of ``obj`` at the dotted path given by
217
- ``attr_string``, if no such attribute is reachable, returns ``default``.
218
-
219
- >>> deep_getattr(cass, "cluster")
220
- <cassandra.cluster.Cluster object at 0xa20c350
221
-
222
- >>> deep_getattr(cass, "cluster.metadata.partitioner")
223
- u"org.apache.cassandra.dht.Murmur3Partitioner"
224
-
225
- >>> deep_getattr(cass, "i.dont.exist", default="default")
226
- "default"
227
- """
228
- attrs = attr_string .split ("." )
229
- for attr in attrs :
230
- try :
231
- obj = getattr (obj , attr )
232
- except AttributeError :
233
- return default
234
-
235
- return obj
0 commit comments