Skip to content

Add the possibility to flush traces/metrics after a handler loop has finished #691

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
purkhusid opened this issue Sep 4, 2023 · 10 comments

Comments

@purkhusid
Copy link

purkhusid commented Sep 4, 2023

The problem I currently have is that there is no good way to flush traces/logs/metrics/errors once the runtime has finished handling a message.

Currently the handling code is structured something like this:

let request_span = <create span for the request>

async {
    <do parsing, call the user provider handler, catch panics etc>
}.instrument(tracing_span)

There are a couple of issues with this approach:

  1. The user provided handler is the only way or the end user to affect the loop and thus he has to e.g. flush spans inside the handler
  2. Since the handler can panic it is possible that the flushing is never done since it has to happen after the work has been done.
  3. The request_span span can not be flushed since it's finished after the user provided handler is provided.
  4. If the handler fails on e.g. parsing there is no way for the end user to send the errors to their error reporting tool of choice because their handler was never run.

I think it would be very valuable that the runtime would provide a hook for the end user so that they can do cleanup after each invocation.

@purkhusid
Copy link
Author

I patched the runtime with this to fix tracing in our lambdas:

diff --git lambda-runtime/src/lib.rs lambda-runtime/src/lib.rs
index e3ffd49..a01d2cb 100644
--- lambda-runtime/src/lib.rs
+++ lambda-runtime/src/lib.rs
@@ -20,7 +20,7 @@ use std::{
     env,
     fmt::{self, Debug, Display},
     future::Future,
-    panic,
+    panic, sync::Arc,
 };
 use tokio::io::{AsyncRead, AsyncWrite};
 use tokio_stream::{Stream, StreamExt};
@@ -101,6 +101,7 @@ where
         &self,
         incoming: impl Stream<Item = Result<http::Response<hyper::Body>, Error>> + Send,
         mut handler: F,
+        callback: Option<Arc<impl Fn()>>
     ) -> Result<(), Error>
     where
         F: Service<LambdaEvent<A>>,
@@ -202,7 +203,13 @@ where
             }
             .instrument(request_span)
             .await?;
+        
+            if let Some(callback) = callback.clone() {
+                callback();
+            }
         }
+
+
         Ok(())
     }
 }
@@ -258,7 +265,52 @@ where
 
     let client = &runtime.client;
     let incoming = incoming(client);
-    runtime.run(incoming, handler).await
+    let callback : Option<Arc<fn()>>= None;
+    runtime.run(incoming, handler, callback).await
+}
+
+/// Starts the Lambda Rust runtime and begins polling for events on the [Lambda
+/// Runtime APIs](https://docs.aws.amazon.com/lambda/latest/dg/runtimes-api.html).
+/// 
+/// The callback function is called at the end of a single invocation of the runtime.
+///
+/// # Example
+/// ```no_run
+/// use lambda_runtime::{Error, service_fn, LambdaEvent};
+/// use serde_json::Value;
+///
+/// #[tokio::main]
+/// async fn main() -> Result<(), Error> {
+///     let func = service_fn(func);
+///     lambda_runtime::run_with_callback(func, callback_func).await?;
+///     Ok(())
+/// }
+///
+/// async fn func(event: LambdaEvent<Value>) -> Result<Value, Error> {
+///     Ok(event.payload)
+/// }
+/// 
+/// async fn callback_func() -> () {
+///     println!("Callback function called!");
+///     ()
+/// }
+/// ```
+pub async fn run_with_callback<A, B, F>(handler: F, callback: Arc<impl Fn()>) -> Result<(), Error>
+where
+    F: Service<LambdaEvent<A>>,
+    F::Future: Future<Output = Result<B, F::Error>>,
+    F::Error: fmt::Debug + fmt::Display,
+    A: for<'de> Deserialize<'de>,
+    B: Serialize,
+{
+    trace!("Loading config from env");
+    let config = Config::from_env()?;
+    let client = Client::builder().build().expect("Unable to create a runtime client");
+    let runtime = Runtime { client, config };
+
+    let client = &runtime.client;
+    let incoming = incoming(client);
+    runtime.run(incoming, handler, Some(callback)).await
 }
 
 fn type_name_of_val<T>(_: T) -> &'static str {
@@ -293,7 +345,7 @@ mod endpoint_tests {
     use lambda_runtime_api_client::Client;
     use serde_json::json;
     use simulated::DuplexStreamWrapper;
-    use std::{convert::TryFrom, env};
+    use std::{convert::TryFrom, env, sync::Arc};
     use tokio::{
         io::{self, AsyncRead, AsyncWrite},
         select,
@@ -525,7 +577,8 @@ mod endpoint_tests {
         let runtime = Runtime { client, config };
         let client = &runtime.client;
         let incoming = incoming(client).take(1);
-        runtime.run(incoming, f).await?;
+        let callback: Option<Arc<fn()>> = None;
+        runtime.run(incoming, f, callback).await?;
 
         // shutdown server
         tx.send(()).expect("Receiver has been dropped");
@@ -568,7 +621,9 @@ mod endpoint_tests {
         let runtime = Runtime { client, config };
         let client = &runtime.client;
         let incoming = incoming(client).take(1);
-        runtime.run(incoming, f).await?;
+        let callback: Option<Arc<fn()>> = None;
+        
+        runtime.run(incoming, f, callback).await?;
 
         match server.await {
             Ok(_) => Ok(()),

@calavera
Copy link
Contributor

calavera commented Sep 5, 2023

Can you open a PR so we can review it with more context?

@purkhusid
Copy link
Author

Can you open a PR so we can review it with more context?

Sure thing.

@BMorinDrifter
Copy link
Contributor

BMorinDrifter commented Sep 5, 2023

Cross-posting what I said in the PR here:

See MetricsService in https://github.com/BMorinDrifter/metrics-cloudwatch-embedded/blob/main/src/lambda.rs for how I did this for the embedded metrics crate.

@bnusunny
Copy link
Contributor

Internal extension could be the solution. You could create a library to start a background thread to interact with Lambda Extension API, register for invoke events. The extension will get a notification event when an invoke happens and can perform all kinds of tasks. It could even continue running after the handler returns. It is a good place to flush logs/traces.

@ramosbugs
Copy link
Contributor

I added an example in #744 that uses an internal extension to flush telemetry after the handler finishes (once there's a new release of lambda-extension that includes the Extension::register API). Just before the handler returns, it notifies the extension via a Tokio channel.

As OP mentioned, handlers sometimes panic, so for a production use case I would wrap the bulk of the handler in something like futures_util::future::FutureExt::catch_unwind so that you can still clean up and flush telemetry even if the handler panics.

@calavera
Copy link
Contributor

calavera commented Dec 4, 2023

Internal extensions are an option, but I think we can make something simpler for this use case. I'm working on an RFC that I think will benefit the runtime in general providing a simpler solution.

@calavera
Copy link
Contributor

calavera commented Dec 4, 2023

This is what I had in mind #747

@calavera
Copy link
Contributor

The layering system that I described in #747 has landed in the main branch. Look at this example on how to flush traces/metrics after the handler loop has finished: https://github.com/awslabs/aws-lambda-rust-runtime/blob/main/examples/opentelemetry-tracing/src/main.rs

I'm marking this as resolved. I'll release a new version with those improvements next week after adding some extra documentation. I'm closing this issue as resolved since it's possible to implement what you explained in this issue now.

Copy link

This issue is now closed. Comments on closed issues are hard for our team to see.
If you need more assistance, please either tag a team member or open a new issue that references this one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants