Skip to content

Commit ad28790

Browse files
srijsdavidbarsky
authored andcommitted
Support closures as handlers (#19)
1 parent 26edea3 commit ad28790

File tree

1 file changed

+38
-20
lines changed

1 file changed

+38
-20
lines changed

lambda-runtime/src/runtime.rs

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{error::Error, result};
1+
use std::{error::Error, marker::PhantomData, result};
22

33
use serde;
44
use serde_json;
@@ -12,7 +12,19 @@ use tokio::runtime::Runtime as TokioRuntime;
1212
const MAX_RETRIES: i8 = 3;
1313

1414
/// Functions acting as a handler must conform to this type.
15-
pub type Handler<E, O> = fn(E, Context) -> Result<O, HandlerError>;
15+
pub trait Handler<E, O> {
16+
/// Run the handler.
17+
fn run(&mut self, event: E, ctx: Context) -> Result<O, HandlerError>;
18+
}
19+
20+
impl<F, E, O> Handler<E, O> for F
21+
where
22+
F: FnMut(E, Context) -> Result<O, HandlerError>,
23+
{
24+
fn run(&mut self, event: E, ctx: Context) -> Result<O, HandlerError> {
25+
(*self)(event, ctx)
26+
}
27+
}
1628

1729
/// Creates a new runtime and begins polling for events using Lambda's Runtime APIs.
1830
///
@@ -22,9 +34,9 @@ pub type Handler<E, O> = fn(E, Context) -> Result<O, HandlerError>;
2234
///
2335
/// # Panics
2436
/// The function panics if the Lambda environment variables are not set.
25-
pub fn start<E, O>(f: Handler<E, O>, runtime: Option<TokioRuntime>)
37+
pub fn start<E, O>(f: impl Handler<E, O>, runtime: Option<TokioRuntime>)
2638
where
27-
for<'invocation> E: serde::Deserialize<'invocation>,
39+
E: serde::de::DeserializeOwned,
2840
O: serde::Serialize,
2941
{
3042
start_with_config(f, &EnvConfigProvider::new(), runtime)
@@ -53,9 +65,9 @@ macro_rules! lambda {
5365
/// The function panics if the `ConfigProvider` returns an error from the `get_runtime_api_endpoint()`
5466
/// or `get_function_settings()` methods. The panic forces AWS Lambda to terminate the environment
5567
/// and spin up a new one for the next invocation.
56-
pub(crate) fn start_with_config<E, O, C>(f: Handler<E, O>, config: &C, runtime: Option<TokioRuntime>)
68+
pub(crate) fn start_with_config<E, O, C>(f: impl Handler<E, O>, config: &C, runtime: Option<TokioRuntime>)
5769
where
58-
for<'invocation> E: serde::Deserialize<'invocation>,
70+
E: serde::de::DeserializeOwned,
5971
O: serde::Serialize,
6072
C: ConfigProvider,
6173
{
@@ -100,12 +112,15 @@ where
100112
///
101113
/// # Panics
102114
/// The function panics if we cannot instantiate a new `RustRuntime` object.
103-
pub(crate) fn start_with_runtime_client<E, O>(f: Handler<E, O>, func_settings: FunctionSettings, client: RuntimeClient)
104-
where
105-
for<'invocation> E: serde::Deserialize<'invocation>,
115+
pub(crate) fn start_with_runtime_client<E, O>(
116+
f: impl Handler<E, O>,
117+
func_settings: FunctionSettings,
118+
client: RuntimeClient,
119+
) where
120+
E: serde::de::DeserializeOwned,
106121
O: serde::Serialize,
107122
{
108-
let lambda_runtime: Runtime<E, O>;
123+
let mut lambda_runtime: Runtime<_, E, O>;
109124
match Runtime::new(f, func_settings, MAX_RETRIES, client) {
110125
Ok(r) => lambda_runtime = r,
111126
Err(e) => {
@@ -119,15 +134,16 @@ where
119134

120135
/// Internal representation of the runtime object that polls for events and communicates
121136
/// with the Runtime APIs
122-
pub(super) struct Runtime<E, O> {
137+
pub(super) struct Runtime<F, E, O> {
123138
runtime_client: RuntimeClient,
124-
handler: Handler<E, O>,
139+
handler: F,
125140
max_retries: i8,
126141
settings: FunctionSettings,
142+
_phan: PhantomData<(E, O)>,
127143
}
128144

129145
// generic methods implementation
130-
impl<E, O> Runtime<E, O> {
146+
impl<F, E, O> Runtime<F, E, O> {
131147
/// Creates a new instance of the `Runtime` object populated with the environment
132148
/// settings.
133149
///
@@ -142,11 +158,11 @@ impl<E, O> Runtime<E, O> {
142158
/// fails the init if this function returns an error. If we cannot find the
143159
/// `AWS_LAMBDA_RUNTIME_API` variable in the environment the function panics.
144160
pub(super) fn new(
145-
f: Handler<E, O>,
161+
f: F,
146162
config: FunctionSettings,
147163
retries: i8,
148164
client: RuntimeClient,
149-
) -> result::Result<Runtime<E, O>, RuntimeError> {
165+
) -> result::Result<Self, RuntimeError> {
150166
debug!(
151167
"Creating new runtime with {} max retries for endpoint {}",
152168
retries,
@@ -157,21 +173,23 @@ impl<E, O> Runtime<E, O> {
157173
settings: config,
158174
handler: f,
159175
max_retries: retries,
176+
_phan: PhantomData,
160177
})
161178
}
162179
}
163180

164181
// implementation of methods that require the Event and Output types
165182
// to be compatible with `serde`'s Deserialize/Serialize.
166-
impl<'env, E, O> Runtime<E, O>
183+
impl<F, E, O> Runtime<F, E, O>
167184
where
168-
for<'de> E: serde::Deserialize<'de>,
185+
F: Handler<E, O>,
186+
E: serde::de::DeserializeOwned,
169187
O: serde::Serialize,
170188
{
171189
/// Starts the main event loop and begin polling or new events. If one of the
172190
/// Runtime APIs returns an unrecoverable error this method calls the init failed
173191
/// API and then panics.
174-
fn start(&self) {
192+
fn start(&mut self) {
175193
debug!("Beginning main event loop");
176194
loop {
177195
let (event, ctx) = self.get_next_event(0, None);
@@ -238,8 +256,8 @@ where
238256

239257
/// Invoke the handler function. This method is split out of the main loop to
240258
/// make it testable.
241-
pub(super) fn invoke(&self, e: E, ctx: Context) -> Result<O, HandlerError> {
242-
(self.handler)(e, ctx)
259+
pub(super) fn invoke(&mut self, e: E, ctx: Context) -> Result<O, HandlerError> {
260+
(&mut self.handler).run(e, ctx)
243261
}
244262

245263
/// Attempts to get the next event from the Runtime APIs and keeps retrying

0 commit comments

Comments
 (0)