Skip to content

Support closures as handlers #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 1, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 38 additions & 20 deletions lambda-runtime/src/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{error::Error, result};
use std::{error::Error, marker::PhantomData, result};

use serde;
use serde_json;
Expand All @@ -12,7 +12,19 @@ use tokio::runtime::Runtime as TokioRuntime;
const MAX_RETRIES: i8 = 3;

/// Functions acting as a handler must conform to this type.
pub type Handler<E, O> = fn(E, Context) -> Result<O, HandlerError>;
pub trait Handler<E, O> {
/// Run the handler.
fn run(&mut self, event: E, ctx: Context) -> Result<O, HandlerError>;
}

impl<F, E, O> Handler<E, O> for F
where
F: FnMut(E, Context) -> Result<O, HandlerError>,
{
fn run(&mut self, event: E, ctx: Context) -> Result<O, HandlerError> {
(*self)(event, ctx)
}
}

/// Creates a new runtime and begins polling for events using Lambda's Runtime APIs.
///
Expand All @@ -22,9 +34,9 @@ pub type Handler<E, O> = fn(E, Context) -> Result<O, HandlerError>;
///
/// # Panics
/// The function panics if the Lambda environment variables are not set.
pub fn start<E, O>(f: Handler<E, O>, runtime: Option<TokioRuntime>)
pub fn start<E, O>(f: impl Handler<E, O>, runtime: Option<TokioRuntime>)
where
for<'invocation> E: serde::Deserialize<'invocation>,
E: serde::de::DeserializeOwned,
O: serde::Serialize,
{
start_with_config(f, &EnvConfigProvider::new(), runtime)
Expand Down Expand Up @@ -53,9 +65,9 @@ macro_rules! lambda {
/// The function panics if the `ConfigProvider` returns an error from the `get_runtime_api_endpoint()`
/// or `get_function_settings()` methods. The panic forces AWS Lambda to terminate the environment
/// and spin up a new one for the next invocation.
pub(crate) fn start_with_config<E, O, C>(f: Handler<E, O>, config: &C, runtime: Option<TokioRuntime>)
pub(crate) fn start_with_config<E, O, C>(f: impl Handler<E, O>, config: &C, runtime: Option<TokioRuntime>)
where
for<'invocation> E: serde::Deserialize<'invocation>,
E: serde::de::DeserializeOwned,
O: serde::Serialize,
C: ConfigProvider,
{
Expand Down Expand Up @@ -100,12 +112,15 @@ where
///
/// # Panics
/// The function panics if we cannot instantiate a new `RustRuntime` object.
pub(crate) fn start_with_runtime_client<E, O>(f: Handler<E, O>, func_settings: FunctionSettings, client: RuntimeClient)
where
for<'invocation> E: serde::Deserialize<'invocation>,
pub(crate) fn start_with_runtime_client<E, O>(
f: impl Handler<E, O>,
func_settings: FunctionSettings,
client: RuntimeClient,
) where
E: serde::de::DeserializeOwned,
O: serde::Serialize,
{
let lambda_runtime: Runtime<E, O>;
let mut lambda_runtime: Runtime<_, E, O>;
match Runtime::new(f, func_settings, MAX_RETRIES, client) {
Ok(r) => lambda_runtime = r,
Err(e) => {
Expand All @@ -119,15 +134,16 @@ where

/// Internal representation of the runtime object that polls for events and communicates
/// with the Runtime APIs
pub(super) struct Runtime<E, O> {
pub(super) struct Runtime<F, E, O> {
runtime_client: RuntimeClient,
handler: Handler<E, O>,
handler: F,
max_retries: i8,
settings: FunctionSettings,
_phan: PhantomData<(E, O)>,
}

// generic methods implementation
impl<E, O> Runtime<E, O> {
impl<F, E, O> Runtime<F, E, O> {
/// Creates a new instance of the `Runtime` object populated with the environment
/// settings.
///
Expand All @@ -142,11 +158,11 @@ impl<E, O> Runtime<E, O> {
/// fails the init if this function returns an error. If we cannot find the
/// `AWS_LAMBDA_RUNTIME_API` variable in the environment the function panics.
pub(super) fn new(
f: Handler<E, O>,
f: F,
config: FunctionSettings,
retries: i8,
client: RuntimeClient,
) -> result::Result<Runtime<E, O>, RuntimeError> {
) -> result::Result<Self, RuntimeError> {
debug!(
"Creating new runtime with {} max retries for endpoint {}",
retries,
Expand All @@ -157,21 +173,23 @@ impl<E, O> Runtime<E, O> {
settings: config,
handler: f,
max_retries: retries,
_phan: PhantomData,
})
}
}

// implementation of methods that require the Event and Output types
// to be compatible with `serde`'s Deserialize/Serialize.
impl<'env, E, O> Runtime<E, O>
impl<F, E, O> Runtime<F, E, O>
where
for<'de> E: serde::Deserialize<'de>,
F: Handler<E, O>,
E: serde::de::DeserializeOwned,
O: serde::Serialize,
{
/// Starts the main event loop and begin polling or new events. If one of the
/// Runtime APIs returns an unrecoverable error this method calls the init failed
/// API and then panics.
fn start(&self) {
fn start(&mut self) {
debug!("Beginning main event loop");
loop {
let (event, ctx) = self.get_next_event(0, None);
Expand Down Expand Up @@ -238,8 +256,8 @@ where

/// Invoke the handler function. This method is split out of the main loop to
/// make it testable.
pub(super) fn invoke(&self, e: E, ctx: Context) -> Result<O, HandlerError> {
(self.handler)(e, ctx)
pub(super) fn invoke(&mut self, e: E, ctx: Context) -> Result<O, HandlerError> {
(&mut self.handler).run(e, ctx)
}

/// Attempts to get the next event from the Runtime APIs and keeps retrying
Expand Down