Skip to content

Add SQS example implementing partial batch failure #584

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions examples/advanced-sqs-partial-batch-failures/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "advanced-sqs-partial-batch-failures"
version = "0.1.0"
edition = "2021"

[dependencies]
serde = "^1"
serde_derive = "^1"
serde_with = { version = "^2", features = ["json"], optional = true }
serde_json = "^1"
aws_lambda_events = "0.7.3"
lambda_runtime = "0.7"
tokio = { version = "1", features = ["macros"] }
futures = "0.3"
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] }
18 changes: 18 additions & 0 deletions examples/advanced-sqs-partial-batch-failures/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# AWS Lambda Function that receives events from SQS

This example shows how to process events from an SQS queue using the partial batch failure feature.

_Important note:_ your lambda sqs trigger *needs* to be configured with partial batch response support
(the ` ReportBatchItemFailures` flag set to true), otherwise failed message will be not be reprocessed.
For more details see:
https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting

## Build & Deploy

1. Install [cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation)
2. Build the function with `cargo lambda build --release`
3. Deploy the function to AWS Lambda with `cargo lambda deploy --iam-role YOUR_ROLE`

## Build for ARM 64

Build the function with `cargo lambda build --release --arm64`
159 changes: 159 additions & 0 deletions examples/advanced-sqs-partial-batch-failures/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
use aws_lambda_events::{
event::sqs::SqsEventObj,
sqs::{BatchItemFailure, SqsBatchResponse, SqsMessageObj},
};
use futures::Future;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tracing::Instrument;

/// [To customize] Your object definition, sent to the SQS queue triggering this lambda.
#[derive(Deserialize, Serialize)]
struct Data {
text: String,
}

/// [To customize] Your buisness logic to handle the payload of one SQS message.
async fn data_handler(data: Data) -> Result<(), Error> {
// Some processing
tracing::info!(text = ?data.text, "processing data");
// simulate error
if data.text == "bad request" {
Err("Processing error".into())
} else {
Ok(())
}
}

/// Main function for the lambda executable.
#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();

run_sqs_partial_batch_failure(data_handler).await
}

/// This function will handle the message batches from SQS.
/// It calls the provided user function `f` on every message concurrently and reports to SQS
/// which message failed to be processed so that only those are retried.
///
/// Important note: your lambda sqs trigger *needs* to be configured with partial batch response support
/// with the ` ReportBatchItemFailures` flag set to true, otherwise failed message will be dropped,
/// for more details see:
/// https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting
///
///
/// Note that if you are looking for parallel processing (multithread) instead of concurrent processing,
/// you can do so by spawning a task inside your function `f`.
async fn run_sqs_partial_batch_failure<T, D, R>(f: T) -> Result<(), Error>
where
T: Fn(D) -> R,
D: DeserializeOwned,
R: Future<Output = Result<(), Error>>,
{
run(service_fn(|e| batch_handler(|d| f(d), e))).await
}

/// Helper function to lift the user provided `f` function from message to batch of messages.
/// See `run_sqs` for the easier function to use.
async fn batch_handler<T, D, F>(
f: T,
event: LambdaEvent<SqsEventObj<serde_json::Value>>,
) -> Result<SqsBatchResponse, Error>
where
T: Fn(D) -> F,
F: Future<Output = Result<(), Error>>,
D: DeserializeOwned,
{
tracing::trace!("Handling batch size {}", event.payload.records.len());
let create_task = |msg| {
// We need to keep the message_id to report failures to SQS
let SqsMessageObj {
message_id, body, ..
} = msg;
let span = tracing::span!(tracing::Level::INFO, "Handling SQS msg", message_id);
let task = async {
//TODO catch panics like the `run` function from lambda_runtime
f(serde_json::from_value(body)?).await
}
.instrument(span);
(message_id.unwrap_or_default(), task)
};
let (ids, tasks): (Vec<_>, Vec<_>) = event.payload.records.into_iter().map(create_task).unzip();
let results = futures::future::join_all(tasks).await; // Run tasks concurrently
let failure_items = ids
.into_iter()
.zip(results)
.filter_map(
// Only keep the message_id of failed tasks
|(id, res)| match res {
Ok(()) => None,
Err(err) => {
tracing::error!("Failed to process msg {id}, {err}");
Some(id)
}
},
)
.map(|id| BatchItemFailure {
item_identifier: id,
})
.collect();

Ok(SqsBatchResponse {
batch_item_failures: failure_items,
})
}

#[cfg(test)]
mod test {
use lambda_runtime::Context;

use super::*;

#[derive(Serialize, Deserialize, Debug)]
struct UserData {
should_error: bool,
}
async fn user_fn(data: UserData) -> Result<(), Error> {
if data.should_error {
Err("Processing Error".into())
} else {
Ok(())
}
}

#[tokio::test]
async fn test() -> () {
let msg_to_fail: SqsMessageObj<serde_json::Value> = serde_json::from_str(
r#"{
"messageId": "1",
"body": "{\"should_error\": true}"
}"#,
)
.unwrap();
let msg_to_succeed: SqsMessageObj<serde_json::Value> = serde_json::from_str(
r#"{
"messageId": "0",
"body": "{\"should_error\" : false}"
}"#,
)
.unwrap();

let lambda_event = LambdaEvent {
payload: SqsEventObj {
records: vec![msg_to_fail, msg_to_succeed],
},
context: Context::default(),
};

let r = batch_handler(user_fn, lambda_event).await.unwrap();
assert_eq!(r.batch_item_failures.len(), 1);
assert_eq!(r.batch_item_failures[0].item_identifier, "1");
}
}