Skip to content

Commit 731d201

Browse files
authored
feat: Implement RFC for layering of runtime (#845)
* feat: Implement RFC for layering of runtime * Add example * Fix compilation errors * Remove Send and 'static * Fix ci * Reduce diff * Implement review comments
1 parent bae37bd commit 731d201

File tree

20 files changed

+1160
-435
lines changed

20 files changed

+1160
-435
lines changed

.github/workflows/build-events.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,18 @@ name: Check Lambda Events
33
on:
44
push:
55
paths:
6-
- 'lambda-events/**'
6+
- "lambda-events/**"
77
pull_request:
88
paths:
9-
- 'lambda-events/**'
9+
- "lambda-events/**"
1010

1111
jobs:
1212
build:
1313
runs-on: ubuntu-latest
1414
strategy:
1515
matrix:
1616
toolchain:
17-
- "1.66.0" # Current MSRV
17+
- "1.70.0" # Current MSRV
1818
- stable
1919
env:
2020
RUST_BACKTRACE: 1

.github/workflows/build-extension.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ jobs:
1818
strategy:
1919
matrix:
2020
toolchain:
21-
- "1.66.0" # Current MSRV
21+
- "1.70.0" # Current MSRV
2222
- stable
2323
env:
2424
RUST_BACKTRACE: 1

.github/workflows/build-runtime.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ jobs:
1919
strategy:
2020
matrix:
2121
toolchain:
22-
- "1.66.0" # Current MSRV
22+
- "1.70.0" # Current MSRV
2323
- stable
2424
env:
2525
RUST_BACKTRACE: 1

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ members = [
66
"lambda-runtime-api-client",
77
"lambda-runtime",
88
"lambda-extension",
9-
"lambda-events"
9+
"lambda-events",
1010
]
1111

1212
exclude = ["examples"]
@@ -26,4 +26,5 @@ hyper = "1.0"
2626
hyper-util = "0.1.1"
2727
pin-project-lite = "0.2"
2828
tower = "0.4"
29+
tower-layer = "0.3"
2930
tower-service = "0.3"

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ This will make your function compile much faster.
458458

459459
## Supported Rust Versions (MSRV)
460460

461-
The AWS Lambda Rust Runtime requires a minimum of Rust 1.66, and is not guaranteed to build on compiler versions earlier than that.
461+
The AWS Lambda Rust Runtime requires a minimum of Rust 1.70, and is not guaranteed to build on compiler versions earlier than that.
462462

463463
## Security
464464

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
[package]
2+
name = "opentelemetry-tracing"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
# Library dependencies
8+
lambda_runtime = { path = "../../lambda-runtime" }
9+
pin-project = "1"
10+
opentelemetry-semantic-conventions = "0.14"
11+
tower = "0.4"
12+
tracing = "0.1"
13+
14+
# Binary dependencies
15+
opentelemetry = { version = "0.22", optional = true }
16+
opentelemetry_sdk = { version = "0.22", features = ["rt-tokio"], optional = true }
17+
opentelemetry-stdout = { version = "0.3", features = ["trace"], optional = true }
18+
serde_json = { version = "1.0", optional = true }
19+
tokio = { version = "1", optional = true }
20+
tracing-opentelemetry = { version = "0.23", optional = true }
21+
tracing-subscriber = { version = "0.3", optional = true }
22+
23+
[features]
24+
build-binary = [
25+
"opentelemetry",
26+
"opentelemetry_sdk",
27+
"opentelemetry-stdout",
28+
"serde_json",
29+
"tokio",
30+
"tracing-opentelemetry",
31+
"tracing-subscriber",
32+
]
33+
34+
[[bin]]
35+
name = "opentelemetry-tracing"
36+
required-features = ["build-binary"]
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
use std::future::Future;
2+
use std::pin::Pin;
3+
use std::task;
4+
5+
use lambda_runtime::LambdaInvocation;
6+
use opentelemetry_semantic_conventions::trace as traceconv;
7+
use pin_project::pin_project;
8+
use tower::{Layer, Service};
9+
use tracing::instrument::Instrumented;
10+
use tracing::Instrument;
11+
12+
/// Tower layer to add OpenTelemetry tracing to a Lambda function invocation. The layer accepts
13+
/// a function to flush OpenTelemetry after the end of the invocation.
14+
pub struct OpenTelemetryLayer<F> {
15+
flush_fn: F,
16+
}
17+
18+
impl<F> OpenTelemetryLayer<F>
19+
where
20+
F: Fn() + Clone,
21+
{
22+
pub fn new(flush_fn: F) -> Self {
23+
Self { flush_fn }
24+
}
25+
}
26+
27+
impl<S, F> Layer<S> for OpenTelemetryLayer<F>
28+
where
29+
F: Fn() + Clone,
30+
{
31+
type Service = OpenTelemetryService<S, F>;
32+
33+
fn layer(&self, inner: S) -> Self::Service {
34+
OpenTelemetryService {
35+
inner,
36+
flush_fn: self.flush_fn.clone(),
37+
coldstart: true,
38+
}
39+
}
40+
}
41+
42+
/// Tower service created by [OpenTelemetryLayer].
43+
pub struct OpenTelemetryService<S, F> {
44+
inner: S,
45+
flush_fn: F,
46+
coldstart: bool,
47+
}
48+
49+
impl<S, F> Service<LambdaInvocation> for OpenTelemetryService<S, F>
50+
where
51+
S: Service<LambdaInvocation, Response = ()>,
52+
F: Fn() + Clone,
53+
{
54+
type Error = S::Error;
55+
type Response = ();
56+
type Future = OpenTelemetryFuture<Instrumented<S::Future>, F>;
57+
58+
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> task::Poll<Result<(), Self::Error>> {
59+
self.inner.poll_ready(cx)
60+
}
61+
62+
fn call(&mut self, req: LambdaInvocation) -> Self::Future {
63+
let span = tracing::info_span!(
64+
"Lambda function invocation",
65+
"otel.name" = req.context.env_config.function_name,
66+
{ traceconv::FAAS_TRIGGER } = "http",
67+
{ traceconv::FAAS_INVOCATION_ID } = req.context.request_id,
68+
{ traceconv::FAAS_COLDSTART } = self.coldstart
69+
);
70+
71+
// After the first execution, we can set 'coldstart' to false
72+
self.coldstart = false;
73+
74+
let fut = self.inner.call(req).instrument(span);
75+
OpenTelemetryFuture {
76+
future: Some(fut),
77+
flush_fn: self.flush_fn.clone(),
78+
}
79+
}
80+
}
81+
82+
/// Future created by [OpenTelemetryService].
83+
#[pin_project]
84+
pub struct OpenTelemetryFuture<Fut, F> {
85+
#[pin]
86+
future: Option<Fut>,
87+
flush_fn: F,
88+
}
89+
90+
impl<Fut, F> Future for OpenTelemetryFuture<Fut, F>
91+
where
92+
Fut: Future,
93+
F: Fn(),
94+
{
95+
type Output = Fut::Output;
96+
97+
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Self::Output> {
98+
// First, try to get the ready value of the future
99+
let ready = task::ready!(self
100+
.as_mut()
101+
.project()
102+
.future
103+
.as_pin_mut()
104+
.expect("future polled after completion")
105+
.poll(cx));
106+
107+
// If we got the ready value, we first drop the future: this ensures that the
108+
// OpenTelemetry span attached to it is closed and included in the subsequent flush.
109+
Pin::set(&mut self.as_mut().project().future, None);
110+
(self.project().flush_fn)();
111+
task::Poll::Ready(ready)
112+
}
113+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
use lambda_runtime::{LambdaEvent, Runtime};
2+
use opentelemetry::trace::TracerProvider;
3+
use opentelemetry_sdk::{runtime, trace};
4+
use opentelemetry_tracing::OpenTelemetryLayer;
5+
use tower::{service_fn, BoxError};
6+
use tracing_subscriber::prelude::*;
7+
8+
async fn echo(event: LambdaEvent<serde_json::Value>) -> Result<serde_json::Value, &'static str> {
9+
Ok(event.payload)
10+
}
11+
12+
#[tokio::main]
13+
async fn main() -> Result<(), BoxError> {
14+
// Set up OpenTelemetry tracer provider that writes spans to stdout for debugging purposes
15+
let exporter = opentelemetry_stdout::SpanExporter::default();
16+
let tracer_provider = trace::TracerProvider::builder()
17+
.with_batch_exporter(exporter, runtime::Tokio)
18+
.build();
19+
20+
// Set up link between OpenTelemetry and tracing crate
21+
tracing_subscriber::registry()
22+
.with(tracing_opentelemetry::OpenTelemetryLayer::new(
23+
tracer_provider.tracer("my-app"),
24+
))
25+
.init();
26+
27+
// Initialize the Lambda runtime and add OpenTelemetry tracing
28+
let runtime = Runtime::new(service_fn(echo)).layer(OpenTelemetryLayer::new(|| {
29+
// Make sure that the trace is exported before the Lambda runtime is frozen
30+
tracer_provider.force_flush();
31+
}));
32+
runtime.run().await?;
33+
Ok(())
34+
}

lambda-events/src/custom_serde/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -177,18 +177,18 @@ mod test {
177177

178178
let test = r#"{"v": null}"#;
179179
let decoded: Test = serde_json::from_str(test).unwrap();
180-
assert_eq!(false, decoded.v);
180+
assert!(!decoded.v);
181181

182182
let test = r#"{}"#;
183183
let decoded: Test = serde_json::from_str(test).unwrap();
184-
assert_eq!(false, decoded.v);
184+
assert!(!decoded.v);
185185

186186
let test = r#"{"v": true}"#;
187187
let decoded: Test = serde_json::from_str(test).unwrap();
188-
assert_eq!(true, decoded.v);
188+
assert!(decoded.v);
189189

190190
let test = r#"{"v": false}"#;
191191
let decoded: Test = serde_json::from_str(test).unwrap();
192-
assert_eq!(false, decoded.v);
192+
assert!(!decoded.v);
193193
}
194194
}

lambda-events/src/event/dynamodb/attributes.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ mod test {
8383

8484
let attr: AttributeValue = serde_json::from_value(value.clone()).unwrap();
8585
match attr {
86-
AttributeValue::Bool(b) => assert_eq!(true, b),
86+
AttributeValue::Bool(b) => assert!(b),
8787
other => panic!("unexpected value {:?}", other),
8888
}
8989

lambda-runtime-api-client/src/lib.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44

55
//! This crate includes a base HTTP client to interact with
66
//! the AWS Lambda Runtime API.
7+
use futures_util::{future::BoxFuture, FutureExt, TryFutureExt};
78
use http::{uri::PathAndQuery, uri::Scheme, Request, Response, Uri};
89
use hyper::body::Incoming;
910
use hyper_util::client::legacy::connect::HttpConnector;
10-
use std::{convert::TryInto, fmt::Debug};
11+
use std::{convert::TryInto, fmt::Debug, future};
1112

1213
const USER_AGENT_HEADER: &str = "User-Agent";
1314
const DEFAULT_USER_AGENT: &str = concat!("aws-lambda-rust/", env!("CARGO_PKG_VERSION"));
@@ -42,9 +43,15 @@ impl Client {
4243
impl Client {
4344
/// Send a given request to the Runtime API.
4445
/// Use the client's base URI to ensure the API endpoint is correct.
45-
pub async fn call(&self, req: Request<body::Body>) -> Result<Response<Incoming>, BoxError> {
46-
let req = self.set_origin(req)?;
47-
self.client.request(req).await.map_err(Into::into)
46+
pub fn call(&self, req: Request<body::Body>) -> BoxFuture<'static, Result<Response<Incoming>, BoxError>> {
47+
// NOTE: This method returns a boxed future such that the future has a static lifetime.
48+
// Due to limitations around the Rust async implementation as of Mar 2024, this is
49+
// required to minimize constraints on the handler passed to [lambda_runtime::run].
50+
let req = match self.set_origin(req) {
51+
Ok(req) => req,
52+
Err(err) => return future::ready(Err(err)).boxed(),
53+
};
54+
self.client.request(req).map_err(Into::into).boxed()
4855
}
4956

5057
/// Create a new client with a given base URI and HTTP connector.

lambda-runtime/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ hyper-util = { workspace = true, features = [
3737
"tokio",
3838
] }
3939
lambda_runtime_api_client = { version = "0.10", path = "../lambda-runtime-api-client" }
40+
pin-project = "1"
4041
serde = { version = "1", features = ["derive", "rc"] }
4142
serde_json = "^1"
4243
serde_path_to_error = "0.1.11"
@@ -48,6 +49,7 @@ tokio = { version = "1.0", features = [
4849
] }
4950
tokio-stream = "0.1.2"
5051
tower = { workspace = true, features = ["util"] }
52+
tower-layer = { workspace = true }
5153
tracing = { version = "0.1", features = ["log"] }
5254

5355
[dev-dependencies]

0 commit comments

Comments
 (0)