@@ -26,7 +26,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
26
26
use tokio_stream:: { Stream , StreamExt } ;
27
27
pub use tower:: { self , service_fn, Service } ;
28
28
use tower:: { util:: ServiceFn , ServiceExt } ;
29
- use tracing:: { error, trace} ;
29
+ use tracing:: { error, trace, Instrument } ;
30
30
31
31
mod requests;
32
32
#[ cfg( test) ]
@@ -120,15 +120,6 @@ where
120
120
continue ;
121
121
}
122
122
123
- let body = hyper:: body:: to_bytes ( body) . await ?;
124
- trace ! ( "response body - {}" , std:: str :: from_utf8( & body) ?) ;
125
-
126
- #[ cfg( debug_assertions) ]
127
- if parts. status . is_server_error ( ) {
128
- error ! ( "Lambda Runtime server returned an unexpected error" ) ;
129
- return Err ( parts. status . to_string ( ) . into ( ) ) ;
130
- }
131
-
132
123
let ctx: Context = Context :: try_from ( parts. headers ) ?;
133
124
let ctx: Context = ctx. with_config ( config) ;
134
125
let request_id = & ctx. request_id . clone ( ) ;
@@ -138,55 +129,80 @@ where
138
129
Some ( trace_id) => env:: set_var ( "_X_AMZN_TRACE_ID" , trace_id) ,
139
130
None => env:: remove_var ( "_X_AMZN_TRACE_ID" ) ,
140
131
}
141
- let body = match serde_json:: from_slice ( & body) {
142
- Ok ( body) => body,
143
- Err ( err) => {
144
- let req = build_event_error_request ( request_id, err) ?;
145
- client. call ( req) . await . expect ( "Unable to send response to Runtime APIs" ) ;
146
- return Ok ( ( ) ) ;
147
- }
132
+ let request_span = match xray_trace_id {
133
+ Some ( trace_id) => tracing:: span!(
134
+ tracing:: Level :: INFO ,
135
+ "Lambda runtime invoke" ,
136
+ requestId = request_id,
137
+ xrayTraceId = trace_id
138
+ ) ,
139
+ None => tracing:: span!( tracing:: Level :: INFO , "Lambda runtime invoke" , requestId = request_id) ,
148
140
} ;
149
141
150
- let req = match handler. ready ( ) . await {
151
- Ok ( handler) => {
152
- // Catches panics outside of a `Future`
153
- let task =
154
- panic:: catch_unwind ( panic:: AssertUnwindSafe ( || handler. call ( LambdaEvent :: new ( body, ctx) ) ) ) ;
155
-
156
- let task = match task {
157
- // Catches panics inside of the `Future`
158
- Ok ( task) => panic:: AssertUnwindSafe ( task) . catch_unwind ( ) . await ,
159
- Err ( err) => Err ( err) ,
160
- } ;
161
-
162
- match task {
163
- Ok ( response) => match response {
164
- Ok ( response) => {
165
- trace ! ( "Ok response from handler (run loop)" ) ;
166
- EventCompletionRequest {
167
- request_id,
168
- body : response,
142
+ // Group the handling in one future and instrument it with the span
143
+ async {
144
+ let body = hyper:: body:: to_bytes ( body) . await ?;
145
+ trace ! ( "response body - {}" , std:: str :: from_utf8( & body) ?) ;
146
+
147
+ #[ cfg( debug_assertions) ]
148
+ if parts. status . is_server_error ( ) {
149
+ error ! ( "Lambda Runtime server returned an unexpected error" ) ;
150
+ return Err ( parts. status . to_string ( ) . into ( ) ) ;
151
+ }
152
+
153
+ let body = match serde_json:: from_slice ( & body) {
154
+ Ok ( body) => body,
155
+ Err ( err) => {
156
+ let req = build_event_error_request ( request_id, err) ?;
157
+ client. call ( req) . await . expect ( "Unable to send response to Runtime APIs" ) ;
158
+ return Ok ( ( ) ) ;
159
+ }
160
+ } ;
161
+
162
+ let req = match handler. ready ( ) . await {
163
+ Ok ( handler) => {
164
+ // Catches panics outside of a `Future`
165
+ let task =
166
+ panic:: catch_unwind ( panic:: AssertUnwindSafe ( || handler. call ( LambdaEvent :: new ( body, ctx) ) ) ) ;
167
+
168
+ let task = match task {
169
+ // Catches panics inside of the `Future`
170
+ Ok ( task) => panic:: AssertUnwindSafe ( task) . catch_unwind ( ) . await ,
171
+ Err ( err) => Err ( err) ,
172
+ } ;
173
+
174
+ match task {
175
+ Ok ( response) => match response {
176
+ Ok ( response) => {
177
+ trace ! ( "Ok response from handler (run loop)" ) ;
178
+ EventCompletionRequest {
179
+ request_id,
180
+ body : response,
181
+ }
182
+ . into_req ( )
169
183
}
170
- . into_req ( )
184
+ Err ( err) => build_event_error_request ( request_id, err) ,
185
+ } ,
186
+ Err ( err) => {
187
+ error ! ( "{:?}" , err) ;
188
+ let error_type = type_name_of_val ( & err) ;
189
+ let msg = if let Some ( msg) = err. downcast_ref :: < & str > ( ) {
190
+ format ! ( "Lambda panicked: {}" , msg)
191
+ } else {
192
+ "Lambda panicked" . to_string ( )
193
+ } ;
194
+ EventErrorRequest :: new ( request_id, error_type, & msg) . into_req ( )
171
195
}
172
- Err ( err) => build_event_error_request ( request_id, err) ,
173
- } ,
174
- Err ( err) => {
175
- error ! ( "{:?}" , err) ;
176
- let error_type = type_name_of_val ( & err) ;
177
- let msg = if let Some ( msg) = err. downcast_ref :: < & str > ( ) {
178
- format ! ( "Lambda panicked: {}" , msg)
179
- } else {
180
- "Lambda panicked" . to_string ( )
181
- } ;
182
- EventErrorRequest :: new ( request_id, error_type, & msg) . into_req ( )
183
196
}
184
197
}
185
- }
186
- Err ( err) => build_event_error_request ( request_id, err) ,
187
- } ?;
198
+ Err ( err) => build_event_error_request ( request_id, err) ,
199
+ } ?;
188
200
189
- client. call ( req) . await . expect ( "Unable to send response to Runtime APIs" ) ;
201
+ client. call ( req) . await . expect ( "Unable to send response to Runtime APIs" ) ;
202
+ Ok :: < ( ) , Error > ( ( ) )
203
+ }
204
+ . instrument ( request_span)
205
+ . await ?;
190
206
}
191
207
Ok ( ( ) )
192
208
}
0 commit comments