Skip to content

Commit 381ac09

Browse files
authored
Add SQS example implementing partial batch failure (#584)
1 parent 128aef4 commit 381ac09

File tree

3 files changed

+193
-0
lines changed

3 files changed

+193
-0
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
[package]
2+
name = "advanced-sqs-partial-batch-failures"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
serde = "^1"
8+
serde_derive = "^1"
9+
serde_with = { version = "^2", features = ["json"], optional = true }
10+
serde_json = "^1"
11+
aws_lambda_events = "0.7.3"
12+
lambda_runtime = "0.7"
13+
tokio = { version = "1", features = ["macros"] }
14+
futures = "0.3"
15+
tracing = { version = "0.1", features = ["log"] }
16+
tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] }
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# AWS Lambda Function that receives events from SQS
2+
3+
This example shows how to process events from an SQS queue using the partial batch failure feature.
4+
5+
_Important note:_ your lambda sqs trigger *needs* to be configured with partial batch response support
6+
(the ` ReportBatchItemFailures` flag set to true), otherwise failed message will be not be reprocessed.
7+
For more details see:
8+
https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting
9+
10+
## Build & Deploy
11+
12+
1. Install [cargo-lambda](https://github.com/cargo-lambda/cargo-lambda#installation)
13+
2. Build the function with `cargo lambda build --release`
14+
3. Deploy the function to AWS Lambda with `cargo lambda deploy --iam-role YOUR_ROLE`
15+
16+
## Build for ARM 64
17+
18+
Build the function with `cargo lambda build --release --arm64`
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
use aws_lambda_events::{
2+
event::sqs::SqsEventObj,
3+
sqs::{BatchItemFailure, SqsBatchResponse, SqsMessageObj},
4+
};
5+
use futures::Future;
6+
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
7+
use serde::{de::DeserializeOwned, Deserialize, Serialize};
8+
use tracing::Instrument;
9+
10+
/// [To customize] Your object definition, sent to the SQS queue triggering this lambda.
11+
#[derive(Deserialize, Serialize)]
12+
struct Data {
13+
text: String,
14+
}
15+
16+
/// [To customize] Your buisness logic to handle the payload of one SQS message.
17+
async fn data_handler(data: Data) -> Result<(), Error> {
18+
// Some processing
19+
tracing::info!(text = ?data.text, "processing data");
20+
// simulate error
21+
if data.text == "bad request" {
22+
Err("Processing error".into())
23+
} else {
24+
Ok(())
25+
}
26+
}
27+
28+
/// Main function for the lambda executable.
29+
#[tokio::main]
30+
async fn main() -> Result<(), Error> {
31+
tracing_subscriber::fmt()
32+
.with_max_level(tracing::Level::INFO)
33+
// disable printing the name of the module in every log line.
34+
.with_target(false)
35+
// disabling time is handy because CloudWatch will add the ingestion time.
36+
.without_time()
37+
.init();
38+
39+
run_sqs_partial_batch_failure(data_handler).await
40+
}
41+
42+
/// This function will handle the message batches from SQS.
43+
/// It calls the provided user function `f` on every message concurrently and reports to SQS
44+
/// which message failed to be processed so that only those are retried.
45+
///
46+
/// Important note: your lambda sqs trigger *needs* to be configured with partial batch response support
47+
/// with the ` ReportBatchItemFailures` flag set to true, otherwise failed message will be dropped,
48+
/// for more details see:
49+
/// https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting
50+
///
51+
///
52+
/// Note that if you are looking for parallel processing (multithread) instead of concurrent processing,
53+
/// you can do so by spawning a task inside your function `f`.
54+
async fn run_sqs_partial_batch_failure<T, D, R>(f: T) -> Result<(), Error>
55+
where
56+
T: Fn(D) -> R,
57+
D: DeserializeOwned,
58+
R: Future<Output = Result<(), Error>>,
59+
{
60+
run(service_fn(|e| batch_handler(|d| f(d), e))).await
61+
}
62+
63+
/// Helper function to lift the user provided `f` function from message to batch of messages.
64+
/// See `run_sqs` for the easier function to use.
65+
async fn batch_handler<T, D, F>(
66+
f: T,
67+
event: LambdaEvent<SqsEventObj<serde_json::Value>>,
68+
) -> Result<SqsBatchResponse, Error>
69+
where
70+
T: Fn(D) -> F,
71+
F: Future<Output = Result<(), Error>>,
72+
D: DeserializeOwned,
73+
{
74+
tracing::trace!("Handling batch size {}", event.payload.records.len());
75+
let create_task = |msg| {
76+
// We need to keep the message_id to report failures to SQS
77+
let SqsMessageObj {
78+
message_id, body, ..
79+
} = msg;
80+
let span = tracing::span!(tracing::Level::INFO, "Handling SQS msg", message_id);
81+
let task = async {
82+
//TODO catch panics like the `run` function from lambda_runtime
83+
f(serde_json::from_value(body)?).await
84+
}
85+
.instrument(span);
86+
(message_id.unwrap_or_default(), task)
87+
};
88+
let (ids, tasks): (Vec<_>, Vec<_>) = event.payload.records.into_iter().map(create_task).unzip();
89+
let results = futures::future::join_all(tasks).await; // Run tasks concurrently
90+
let failure_items = ids
91+
.into_iter()
92+
.zip(results)
93+
.filter_map(
94+
// Only keep the message_id of failed tasks
95+
|(id, res)| match res {
96+
Ok(()) => None,
97+
Err(err) => {
98+
tracing::error!("Failed to process msg {id}, {err}");
99+
Some(id)
100+
}
101+
},
102+
)
103+
.map(|id| BatchItemFailure {
104+
item_identifier: id,
105+
})
106+
.collect();
107+
108+
Ok(SqsBatchResponse {
109+
batch_item_failures: failure_items,
110+
})
111+
}
112+
113+
#[cfg(test)]
114+
mod test {
115+
use lambda_runtime::Context;
116+
117+
use super::*;
118+
119+
#[derive(Serialize, Deserialize, Debug)]
120+
struct UserData {
121+
should_error: bool,
122+
}
123+
async fn user_fn(data: UserData) -> Result<(), Error> {
124+
if data.should_error {
125+
Err("Processing Error".into())
126+
} else {
127+
Ok(())
128+
}
129+
}
130+
131+
#[tokio::test]
132+
async fn test() -> () {
133+
let msg_to_fail: SqsMessageObj<serde_json::Value> = serde_json::from_str(
134+
r#"{
135+
"messageId": "1",
136+
"body": "{\"should_error\": true}"
137+
}"#,
138+
)
139+
.unwrap();
140+
let msg_to_succeed: SqsMessageObj<serde_json::Value> = serde_json::from_str(
141+
r#"{
142+
"messageId": "0",
143+
"body": "{\"should_error\" : false}"
144+
}"#,
145+
)
146+
.unwrap();
147+
148+
let lambda_event = LambdaEvent {
149+
payload: SqsEventObj {
150+
records: vec![msg_to_fail, msg_to_succeed],
151+
},
152+
context: Context::default(),
153+
};
154+
155+
let r = batch_handler(user_fn, lambda_event).await.unwrap();
156+
assert_eq!(r.batch_item_failures.len(), 1);
157+
assert_eq!(r.batch_item_failures[0].item_identifier, "1");
158+
}
159+
}

0 commit comments

Comments
 (0)