Skip to content

Add tracing span with request id to the handler #577

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 16, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 68 additions & 52 deletions lambda-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tokio::io::{AsyncRead, AsyncWrite};
use tokio_stream::{Stream, StreamExt};
pub use tower::{self, service_fn, Service};
use tower::{util::ServiceFn, ServiceExt};
use tracing::{error, trace};
use tracing::{error, trace, Instrument};

mod requests;
#[cfg(test)]
Expand Down Expand Up @@ -120,15 +120,6 @@ where
continue;
}

let body = hyper::body::to_bytes(body).await?;
trace!("response body - {}", std::str::from_utf8(&body)?);

#[cfg(debug_assertions)]
if parts.status.is_server_error() {
error!("Lambda Runtime server returned an unexpected error");
return Err(parts.status.to_string().into());
}

let ctx: Context = Context::try_from(parts.headers)?;
let ctx: Context = ctx.with_config(config);
let request_id = &ctx.request_id.clone();
Expand All @@ -138,55 +129,80 @@ where
Some(trace_id) => env::set_var("_X_AMZN_TRACE_ID", trace_id),
None => env::remove_var("_X_AMZN_TRACE_ID"),
}
let body = match serde_json::from_slice(&body) {
Ok(body) => body,
Err(err) => {
let req = build_event_error_request(request_id, err)?;
client.call(req).await.expect("Unable to send response to Runtime APIs");
return Ok(());
}
let request_span = match xray_trace_id {
Some(trace_id) => tracing::span!(
tracing::Level::INFO,
"Lambda runtime invoke",
requestId = request_id,
xrayTraceId = trace_id
),
None => tracing::span!(tracing::Level::INFO, "Lambda runtime invoke", requestId = request_id),
};

let req = match handler.ready().await {
Ok(handler) => {
// Catches panics outside of a `Future`
let task =
panic::catch_unwind(panic::AssertUnwindSafe(|| handler.call(LambdaEvent::new(body, ctx))));

let task = match task {
// Catches panics inside of the `Future`
Ok(task) => panic::AssertUnwindSafe(task).catch_unwind().await,
Err(err) => Err(err),
};

match task {
Ok(response) => match response {
Ok(response) => {
trace!("Ok response from handler (run loop)");
EventCompletionRequest {
request_id,
body: response,
// Group the handling in one future and instrument it with the span
async {
let body = hyper::body::to_bytes(body).await?;
trace!("response body - {}", std::str::from_utf8(&body)?);

#[cfg(debug_assertions)]
if parts.status.is_server_error() {
error!("Lambda Runtime server returned an unexpected error");
return Err(parts.status.to_string().into());
}

let body = match serde_json::from_slice(&body) {
Ok(body) => body,
Err(err) => {
let req = build_event_error_request(request_id, err)?;
client.call(req).await.expect("Unable to send response to Runtime APIs");
return Ok(());
}
};

let req = match handler.ready().await {
Ok(handler) => {
// Catches panics outside of a `Future`
let task =
panic::catch_unwind(panic::AssertUnwindSafe(|| handler.call(LambdaEvent::new(body, ctx))));

let task = match task {
// Catches panics inside of the `Future`
Ok(task) => panic::AssertUnwindSafe(task).catch_unwind().await,
Err(err) => Err(err),
};

match task {
Ok(response) => match response {
Ok(response) => {
trace!("Ok response from handler (run loop)");
EventCompletionRequest {
request_id,
body: response,
}
.into_req()
}
.into_req()
Err(err) => build_event_error_request(request_id, err),
},
Err(err) => {
error!("{:?}", err);
let error_type = type_name_of_val(&err);
let msg = if let Some(msg) = err.downcast_ref::<&str>() {
format!("Lambda panicked: {}", msg)
} else {
"Lambda panicked".to_string()
};
EventErrorRequest::new(request_id, error_type, &msg).into_req()
}
Err(err) => build_event_error_request(request_id, err),
},
Err(err) => {
error!("{:?}", err);
let error_type = type_name_of_val(&err);
let msg = if let Some(msg) = err.downcast_ref::<&str>() {
format!("Lambda panicked: {}", msg)
} else {
"Lambda panicked".to_string()
};
EventErrorRequest::new(request_id, error_type, &msg).into_req()
}
}
}
Err(err) => build_event_error_request(request_id, err),
}?;
Err(err) => build_event_error_request(request_id, err),
}?;

client.call(req).await.expect("Unable to send response to Runtime APIs");
client.call(req).await.expect("Unable to send response to Runtime APIs");
Ok::<(), Error>(())
}
.instrument(request_span)
.await?;
}
Ok(())
}
Expand Down