Skip to content

Commit e253755

Browse files
author
Alexander Schueren
authored
feat(batch): rename AsyncBatchProcessor to default BatchProcessor (#1683)
* docs: change async processing to default in the docs
1 parent fa24180 commit e253755

23 files changed

+411
-374
lines changed

Diff for: docs/snippets/batch/accessProcessedMessages.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ export const handler = async (
2525
const batch = event.Records; // (1)!
2626

2727
processor.register(batch, recordHandler, { context }); // (2)!
28-
const processedMessages = processor.process();
28+
const processedMessages = await processor.process();
2929

3030
for (const message of processedMessages) {
3131
const status: 'success' | 'fail' = message[0];

Diff for: docs/snippets/batch/advancedTracingRecordHandler.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import type {
1515
const processor = new BatchProcessor(EventType.SQS);
1616
const tracer = new Tracer({ serviceName: 'serverlessAirline' });
1717

18-
const recordHandler = (record: SQSRecord): void => {
18+
const recordHandler = async (record: SQSRecord): Promise<void> => {
1919
const subsegment = tracer.getSegment()?.addNewSubsegment('### recordHandler'); // (1)!
2020
subsegment?.addAnnotation('messageId', record.messageId); // (2)!
2121

Diff for: docs/snippets/batch/customPartialProcessor.ts

+9-7
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,6 @@ class MyPartialProcessor extends BasePartialBatchProcessor {
2727
this.#tableName = tableName;
2828
}
2929

30-
public async asyncProcessRecord(
31-
_record: BaseRecord
32-
): Promise<SuccessResponse | FailureResponse> {
33-
throw new Error('Not implemented');
34-
}
35-
3630
/**
3731
* It's called once, **after** processing the batch.
3832
*
@@ -64,13 +58,21 @@ class MyPartialProcessor extends BasePartialBatchProcessor {
6458
this.successMessages = [];
6559
}
6660

61+
public async processRecord(
62+
_record: BaseRecord
63+
): Promise<SuccessResponse | FailureResponse> {
64+
throw new Error('Not implemented');
65+
}
66+
6767
/**
6868
* It handles how your record is processed.
6969
*
7070
* Here we are keeping the status of each run, `this.handler` is
7171
* the function that is passed when calling `processor.register()`.
7272
*/
73-
public processRecord(record: BaseRecord): SuccessResponse | FailureResponse {
73+
public processRecordSync(
74+
record: BaseRecord
75+
): SuccessResponse | FailureResponse {
7476
try {
7577
const result = this.handler(record);
7678

Diff for: docs/snippets/batch/gettingStartedAsync.ts

+4-4
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import {
2-
AsyncBatchProcessor,
2+
BatchProcessor,
33
EventType,
4-
asyncProcessPartialResponse,
4+
processPartialResponse,
55
} from '@aws-lambda-powertools/batch';
66
import axios from 'axios'; // axios is an external dependency
77
import type {
@@ -11,7 +11,7 @@ import type {
1111
SQSBatchResponse,
1212
} from 'aws-lambda';
1313

14-
const processor = new AsyncBatchProcessor(EventType.SQS);
14+
const processor = new BatchProcessor(EventType.SQS);
1515

1616
const recordHandler = async (record: SQSRecord): Promise<number> => {
1717
const res = await axios.post('https://httpbin.org/anything', {
@@ -25,7 +25,7 @@ export const handler = async (
2525
event: SQSEvent,
2626
context: Context
2727
): Promise<SQSBatchResponse> => {
28-
return await asyncProcessPartialResponse(event, recordHandler, processor, {
28+
return await processPartialResponse(event, recordHandler, processor, {
2929
context,
3030
});
3131
};

Diff for: docs/snippets/batch/gettingStartedDynamoDBStreams.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import type {
1414
const processor = new BatchProcessor(EventType.DynamoDBStreams); // (1)!
1515
const logger = new Logger();
1616

17-
const recordHandler = (record: DynamoDBRecord): void => {
17+
const recordHandler = async (record: DynamoDBRecord): Promise<void> => {
1818
if (record.dynamodb && record.dynamodb.NewImage) {
1919
logger.info('Processing record', { record: record.dynamodb.NewImage });
2020
const message = record.dynamodb.NewImage.Message.S;

Diff for: docs/snippets/batch/gettingStartedErrorHandling.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ class InvalidPayload extends Error {
2121
}
2222
}
2323

24-
const recordHandler = (record: SQSRecord): void => {
24+
const recordHandler = async (record: SQSRecord): Promise<void> => {
2525
const payload = record.body;
2626
if (payload) {
2727
const item = JSON.parse(payload);

Diff for: docs/snippets/batch/gettingStartedKinesis.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import type {
1414
const processor = new BatchProcessor(EventType.KinesisDataStreams); // (1)!
1515
const logger = new Logger();
1616

17-
const recordHandler = (record: KinesisStreamRecord): void => {
17+
const recordHandler = async (record: KinesisStreamRecord): Promise<void> => {
1818
logger.info('Processing record', { record: record.kinesis.data });
1919
const payload = JSON.parse(record.kinesis.data);
2020
logger.info('Processed item', { item: payload });

Diff for: docs/snippets/batch/gettingStartedSQS.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ const processor = new BatchProcessor(EventType.SQS); // (1)!
1515
const logger = new Logger();
1616

1717
// prettier-ignore
18-
const recordHandler = (record: SQSRecord): void => { // (2)!
18+
const recordHandler = async (record: SQSRecord): Promise<void> => { // (2)!
1919
const payload = record.body;
2020
if (payload) {
2121
const item = JSON.parse(payload);

Diff for: docs/snippets/batch/gettingStartedSQSFifo.ts

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import {
22
SqsFifoPartialProcessor,
3-
processPartialResponse,
3+
processPartialResponseSync,
44
} from '@aws-lambda-powertools/batch';
55
import { Logger } from '@aws-lambda-powertools/logger';
66
import type {
@@ -25,7 +25,7 @@ export const handler = async (
2525
event: SQSEvent,
2626
context: Context
2727
): Promise<SQSBatchResponse> => {
28-
return processPartialResponse(event, recordHandler, processor, {
28+
return processPartialResponseSync(event, recordHandler, processor, {
2929
context,
3030
});
3131
};

Diff for: docs/utilities/batch.md

+40-29
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ journey
5252
Records expired: 1: Failure
5353
```
5454

55-
This behavior changes when you enable Report Batch Item Failures feature in your Lambda function event source configuration:
55+
This behavior changes when you enable [ReportBatchItemFailures feature](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting) in your Lambda function event source configuration:
5656

5757
<!-- markdownlint-disable MD013 -->
5858
* [**SQS queues**](#sqs-standard). Only messages reported as failure will return to the queue for a retry, while successful ones will be deleted.
@@ -69,11 +69,11 @@ This behavior changes when you enable Report Batch Item Failures feature in your
6969

7070
For this feature to work, you need to **(1)** configure your Lambda function event source to use `ReportBatchItemFailures`, and **(2)** return [a specific response](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting){target="_blank" rel="nofollow"} to report which records failed to be processed.
7171

72-
You use your preferred deployment framework to set the correct configuration while this utility handles the correct response to be returned.
72+
Use your preferred deployment framework to set the correct configuration while this utility handles the correct response to be returned.
7373

7474
### Required resources
7575

76-
The remaining sections of the documentation will rely on these samples. For completeness, this demonstrates IAM permissions and Dead Letter Queue where batch records will be sent after 2 retries were attempted.
76+
The remaining sections of the documentation will rely on these samples. For completeness, this demonstrates IAM permissions and Dead Letter Queue where batch records will be sent after 2 retries.
7777

7878
!!! note "You do not need any additional IAM permissions to use this utility, except for what each event source requires."
7979

@@ -137,14 +137,18 @@ Processing batches from SQS works in three stages:
137137
#### FIFO queues
138138

139139
When using [SQS FIFO queues](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html){target="_blank"}, we will stop processing messages after the first failure, and return all failed and unprocessed messages in `batchItemFailures`.
140-
This helps preserve the ordering of messages in your queue.
140+
This helps preserve the ordering of messages in your queue.
141141

142142
```typescript hl_lines="1-4 13 28-30"
143143
--8<-- "docs/snippets/batch/gettingStartedSQSFifo.ts"
144144
```
145145

146146
1. **Step 1**. Creates a partial failure batch processor for SQS FIFO queues. See [partial failure mechanics for details](#partial-failure-mechanics)
147147

148+
!!! Note
149+
Note that SqsFifoPartialProcessor is synchronous using `processPartialResponseSync`.
150+
This is because we need to preserve the order of messages in the queue. See [Async or sync processing section](#async-or-sync-processing) for more details.
151+
148152
### Processing messages from Kinesis
149153

150154
Processing batches from Kinesis works in three stages:
@@ -225,7 +229,7 @@ By default, we catch any exception raised by your record handler function. This
225229
--8<--
226230
```
227231

228-
1. Any exception works here. See [extending BatchProcessor section, if you want to override this behavior.](#extending-batchprocessor)
232+
1. Any exception works here. See [extending BatchProcessorSync section, if you want to override this behavior.](#extending-batchprocessor)
229233

230234
2. Exceptions raised in `record_handler` will propagate to `process_partial_response`. <br/><br/> We catch them and include each failed batch item identifier in the response dictionary (see `Sample response` tab).
231235

@@ -356,21 +360,29 @@ sequenceDiagram
356360
<i>Kinesis and DynamoDB streams mechanism with multiple batch item failures</i>
357361
</center>
358362

359-
### Processing messages asynchronously
363+
### Async or sync processing
360364

361-
You can use `AsyncBatchProcessor` class and `asyncProcessPartialResponse` function to process messages concurrently.
365+
There are two processors you can use with this utility:
362366

363-
???+ question "When is this useful?"
364-
Your use case might be able to process multiple records at the same time without conflicting with one another.
367+
* **`BatchProcessor`** and **`processPartialResponse`** – Processes messages asynchronously
368+
* **`BatchProcessorSync`** and **`processPartialResponseSync`** – Processes messages synchronously
369+
370+
In most cases your function will be `async` returning a `Promise`. Therefore, the `BatchProcessor` is the default processor handling your batch records asynchronously.
371+
There are use cases where you need to process the batch records synchronously. For example, when you need to process multiple records at the same time without conflicting with one another.
372+
For such cases we recommend to use the `BatchProcessorSync` and `processPartialResponseSync` functions.
373+
374+
!!! info "Note that you need match your processing function with the right batch processor"
375+
* If your function is `async` returning a `Promise`, use `BatchProcessor` and `processPartialResponse`
376+
* If your function is not `async`, use `BatchProcessorSync` and `processPartialResponseSync`
365377

378+
The difference between the two processors in implementation is that `BatchProcessor` uses `Promise.all()` while `BatchProcessorSync` loops through each record to preserve the order.
379+
380+
???+ question "When is this useful?"
381+
366382
For example, imagine you need to process multiple loyalty points and incrementally save in a database. While you await the database to confirm your records are saved, you could start processing another request concurrently.
367383

368384
The reason this is not the default behaviour is that not all use cases can handle concurrency safely (e.g., loyalty points must be updated in order).
369385

370-
```typescript hl_lines="1-5 14 28-30" title="High-concurrency with AsyncBatchProcessor"
371-
--8<-- "docs/snippets/batch/gettingStartedAsync.ts"
372-
```
373-
374386
## Advanced
375387

376388
### Accessing processed messages
@@ -380,6 +392,7 @@ Use the `BatchProcessor` directly in your function to access a list of all retur
380392
* **When successful**. We will include a tuple with `success`, the result of `recordHandler`, and the batch record
381393
* **When failed**. We will include a tuple with `fail`, exception as a string, and the batch record
382394

395+
383396
```typescript hl_lines="25 27-28 30-33 38" title="Accessing processed messages"
384397
--8<-- "docs/snippets/batch/accessProcessedMessages.ts"
385398
```
@@ -391,7 +404,7 @@ Use the `BatchProcessor` directly in your function to access a list of all retur
391404

392405
Within your `recordHandler` function, you might need access to the Lambda context to determine how much time you have left before your function times out.
393406

394-
We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lambda/latest/dg/typescript-context.html){target="_blank"} into your `recordHandler` as optional second argument if you register it when using `BatchProcessor` or the `processPartialResponse` function.
407+
We can automatically inject the [Lambda context](https://docs.aws.amazon.com/lambda/latest/dg/typescript-context.html){target="_blank"} into your `recordHandler` as optional second argument if you register it when using `BatchProcessorSync` or the `processPartialResponseSync` function.
395408

396409
```typescript hl_lines="17 35"
397410
--8<-- "docs/snippets/batch/accessLambdaContext.ts"
@@ -408,14 +421,14 @@ For these scenarios, you can subclass `BatchProcessor` and quickly override `suc
408421

409422
???+ example
410423
Let's suppose you'd like to add a metric named `BatchRecordFailures` for each batch record that failed processing
411-
412-
```typescript hl_lines="17 21 25 31 35" title="Extending failure handling mechanism in BatchProcessor"
413-
--8<-- "docs/snippets/batch/extendingFailure.ts"
414-
```
424+
425+
```typescript hl_lines="17 21 25 31 35" title="Extending failure handling mechanism in BatchProcessor"
426+
--8<-- "docs/snippets/batch/extendingFailure.ts"
427+
```
415428

416429
### Create your own partial processor
417430

418-
You can create your own partial batch processor from scratch by inheriting the `BasePartialProcessor` class, and implementing the `prepare()`, `clean()`, `processRecord()` and `asyncProcessRecord()` abstract methods.
431+
You can create your own partial batch processor from scratch by inheriting the `BasePartialProcessor` class, and implementing the `prepare()`, `clean()`, `processRecord()` and `processRecordSync()` abstract methods.
419432

420433
<center>
421434
```mermaid
@@ -426,28 +439,26 @@ classDiagram
426439
+prepare()
427440
+clean()
428441
+processRecord(record: BaseRecord)
429-
+asyncProcessRecord(record: BaseRecord)
442+
+processRecordSync(record: BaseRecord)
430443
}
431-
432444
class YourCustomProcessor {
433445
+prepare()
434446
+clean()
435447
+processRecord(record: BaseRecord)
436-
+asyncProcessRecord(record: BaseRecord)
448+
+processRecordSyc(record: BaseRecord)
437449
}
438-
439450
BasePartialProcessor <|-- YourCustomProcessor : extends
440451
```
441452
<i>Visual representation to bring your own processor</i>
442453
</center>
443454

444-
* **`processRecord()`** – handles all processing logic for each individual message of a batch, including calling the `recordHandler` (`this.handler`)
445455
* **`prepare()`** – called once as part of the processor initialization
446456
* **`clean()`** – teardown logic called once after `processRecord` completes
447-
* **`asyncProcessRecord()`** – If you need to implement asynchronous logic, use this method, otherwise define it in your class with empty logic
448-
449-
You can then use this class as a context manager, or pass it to `processPartialResponse` to process the records in your Lambda handler function.
457+
* **`processRecord()`** – If you need to implement asynchronous logic, use this method, otherwise define it in your class with empty logic
458+
* **`processRecordSync()`** – handles all processing logic for each individual message of a batch, including calling the `recordHandler` (`this.handler`)
450459

460+
You can then use this class as a context manager, or pass it to `processPartialResponseSync` to process the records in your Lambda handler function.
461+
451462
```typescript hl_lines="21 30 41 62 73 84" title="Creating a custom batch processor"
452463
--8<-- "docs/snippets/batch/customPartialProcessor.ts"
453464
```
@@ -456,7 +467,7 @@ You can then use this class as a context manager, or pass it to `processPartialR
456467

457468
You can use Tracer to create subsegments for each batch record processed. To do so, you can open a new subsegment for each record, and close it when you're done processing it. When adding annotations and metadata to the subsegment, you can do so directly without calling `tracer.setSegment(subsegment)`. This allows you to work with the subsegment directly and avoid having to either pass the parent subsegment around or have to restore the parent subsegment at the end of the record processing.
458469

459-
```ts
470+
```typescript
460471
--8<-- "docs/snippets/batch/advancedTracingRecordHandler.ts"
461472
```
462473

@@ -466,7 +477,7 @@ You can use Tracer to create subsegments for each batch record processed. To do
466477

467478
## Testing your code
468479

469-
As there is no external calls, you can unit test your code with `BatchProcessor` quite easily.
480+
As there is no external calls, you can unit test your code with `BatchProcessorSync` quite easily.
470481

471482
**Example**:
472483

0 commit comments

Comments
 (0)