From b1ca7f155d5cb21a7aa28387711dcad403343f6b Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Fri, 10 Dec 2021 18:35:23 +0100 Subject: [PATCH 01/32] feat(batch): new BatchProcessor for SQS, DynamoDB, Kinesis --- aws_lambda_powertools/utilities/batch/base.py | 84 ++++++++++++++++++- tests/functional/test_utilities_batch.py | 76 +++++++++++++++++ 2 files changed, 158 insertions(+), 2 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index a0ad18a9ec1..144bf9b5ef0 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -3,16 +3,23 @@ """ Batch processing utilities """ - import logging +import sys from abc import ABC, abstractmethod -from typing import Any, Callable, Dict, List, Tuple +from enum import Enum +from typing import Any, Callable, Dict, List, Optional, Tuple from aws_lambda_powertools.middleware_factory import lambda_handler_decorator logger = logging.getLogger(__name__) +class EventType(Enum): + SQS = "SQS" + KinesisDataStream = "KinesisDataStream" + DynamoDB = "DynamoDB" + + class BasePartialProcessor(ABC): """ Abstract class for batch processors. @@ -146,3 +153,76 @@ def batch_processor( processor.process() return handler(event, context) + + +class BatchProcessor(BasePartialProcessor): + DEFAULT_REPORT = {"batchItemFailures": []} + + def __init__(self, event_type: EventType): + """Process batch and partially report failed items + + Parameters + ---------- + event_type: EventType + Whether this is a SQS, DynamoDB Streams, or Kinesis Data Stream event + """ + # refactor: Bring boto3 etc. for deleting permanent exceptions + self.event_type = event_type + self.items_to_report: Dict[str, List[Optional[dict]]] = self.DEFAULT_REPORT + + super().__init__() + + # refactor: think of a better name + def report(self): + """Report batch items that failed processing, if any""" + return self.items_to_report + + def _prepare(self): + """ + Remove results from previous execution. + """ + self.success_messages.clear() + self.fail_messages.clear() + self.items_to_report = self.DEFAULT_REPORT + + def _process_record(self, record) -> Tuple: + """ + Process a record with instance's handler + + Parameters + ---------- + record: Any + An object to be processed. + """ + try: + result = self.handler(record=record) + return self.success_handler(record=record, result=result) + except Exception: + return self.failure_handler(record=record, exception=sys.exc_info()) + + def _clean(self): + """ + Report messages to be deleted in case of partial failure. + """ + + if not self._has_messages_to_report(): + return + + messages = self._get_messages_to_report() + self.items_to_report["batchItemFailures"].append(messages) + + return self.items_to_report + + def _has_messages_to_report(self) -> bool: + if self.fail_messages: + return True + + logger.debug(f"All {len(self.success_messages)} records successfully processed") + return False + + def _get_messages_to_report(self) -> Dict[str, str]: + """ + Format messages to use in batch deletion + """ + # Refactor: get message per event type + return {msg["receiptHandle"]: msg["messageId"] for msg in self.fail_messages} diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index a453f0bfe07..e6110e2c3de 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -6,6 +6,7 @@ from botocore.stub import Stubber from aws_lambda_powertools.utilities.batch import PartialSQSProcessor, batch_processor, sqs_batch_processor +from aws_lambda_powertools.utilities.batch.base import BatchProcessor, EventType from aws_lambda_powertools.utilities.batch.exceptions import SQSBatchProcessingError @@ -290,3 +291,78 @@ def test_partial_sqs_processor_context_only_failure(sqs_event_factory, record_ha ctx.process() assert len(error.value.child_exceptions) == 2 + + +def test_batch_processor_middleware_success_only(sqs_event_factory, record_handler): + # GIVEN + first_record = sqs_event_factory("success") + second_record = sqs_event_factory("success") + event = {"Records": [first_record, second_record]} + + processor = BatchProcessor(event_type=EventType.SQS) + + @batch_processor(record_handler=record_handler, processor=processor) + def lambda_handler(event, context): + return processor.report() + + # WHEN + result = lambda_handler(event, {}) + + # THEN + assert result["batchItemFailures"] == [] + + +def test_batch_processor_middleware_with_failure(sqs_event_factory, record_handler): + # GIVEN + first_record = sqs_event_factory("fail") + second_record = sqs_event_factory("success") + event = {"Records": [first_record, second_record]} + + processor = BatchProcessor(event_type=EventType.SQS) + + @batch_processor(record_handler=record_handler, processor=processor) + def lambda_handler(event, context): + return processor.report() + + # WHEN + result = lambda_handler(event, {}) + + # THEN + assert len(result["batchItemFailures"]) == 1 + + +def test_batch_processor_context_success_only(sqs_event_factory, record_handler): + # GIVEN + first_record = sqs_event_factory("success") + second_record = sqs_event_factory("success") + records = [first_record, second_record] + processor = BatchProcessor(event_type=EventType.SQS) + + # WHEN + with processor(records, record_handler) as batch: + processed_messages = batch.process() + + # THEN + assert processed_messages == [ + ("success", first_record["body"], first_record), + ("success", second_record["body"], second_record), + ] + + assert batch.report() == {"batchItemFailures": []} + + +def test_batch_processor_context_with_failure(sqs_event_factory, record_handler): + # GIVEN + first_record = sqs_event_factory("failure") + second_record = sqs_event_factory("success") + records = [first_record, second_record] + processor = BatchProcessor(event_type=EventType.SQS) + + # WHEN + with processor(records, record_handler) as batch: + processed_messages = batch.process() + + # THEN + assert processed_messages[1] == ("success", second_record["body"], second_record) + assert len(batch.fail_messages) == 1 + assert batch.report() == {"batchItemFailures": [{first_record["receiptHandle"]: first_record["messageId"]}]} From 113a44fc085c48b3996f42f7a56365fb8b80e930 Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Sat, 11 Dec 2021 09:01:12 +0100 Subject: [PATCH 02/32] refactor: response over report --- aws_lambda_powertools/utilities/batch/base.py | 5 ++--- tests/functional/test_utilities_batch.py | 8 ++++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 144bf9b5ef0..1abea367be9 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -172,9 +172,8 @@ def __init__(self, event_type: EventType): super().__init__() - # refactor: think of a better name - def report(self): - """Report batch items that failed processing, if any""" + def response(self): + """Response containing batch items that failed processing, if any""" return self.items_to_report def _prepare(self): diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index e6110e2c3de..d7224821bf1 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -303,7 +303,7 @@ def test_batch_processor_middleware_success_only(sqs_event_factory, record_handl @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context): - return processor.report() + return processor.response() # WHEN result = lambda_handler(event, {}) @@ -322,7 +322,7 @@ def test_batch_processor_middleware_with_failure(sqs_event_factory, record_handl @batch_processor(record_handler=record_handler, processor=processor) def lambda_handler(event, context): - return processor.report() + return processor.response() # WHEN result = lambda_handler(event, {}) @@ -348,7 +348,7 @@ def test_batch_processor_context_success_only(sqs_event_factory, record_handler) ("success", second_record["body"], second_record), ] - assert batch.report() == {"batchItemFailures": []} + assert batch.response() == {"batchItemFailures": []} def test_batch_processor_context_with_failure(sqs_event_factory, record_handler): @@ -365,4 +365,4 @@ def test_batch_processor_context_with_failure(sqs_event_factory, record_handler) # THEN assert processed_messages[1] == ("success", second_record["body"], second_record) assert len(batch.fail_messages) == 1 - assert batch.report() == {"batchItemFailures": [{first_record["receiptHandle"]: first_record["messageId"]}]} + assert batch.response() == {"batchItemFailures": [{first_record["receiptHandle"]: first_record["messageId"]}]} From 5ab2ec733a95ca147780b79516f9329c27219bb5 Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Sat, 11 Dec 2021 10:03:50 +0100 Subject: [PATCH 03/32] fix: mutability bug --- aws_lambda_powertools/utilities/batch/base.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 1abea367be9..2f1c311e20f 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -156,7 +156,7 @@ def batch_processor( class BatchProcessor(BasePartialProcessor): - DEFAULT_REPORT = {"batchItemFailures": []} + DEFAULT_RESPONSE: Dict[str, List[Optional[dict]]] = {"batchItemFailures": []} def __init__(self, event_type: EventType): """Process batch and partially report failed items @@ -168,13 +168,13 @@ def __init__(self, event_type: EventType): """ # refactor: Bring boto3 etc. for deleting permanent exceptions self.event_type = event_type - self.items_to_report: Dict[str, List[Optional[dict]]] = self.DEFAULT_REPORT + self.batch_response = self.DEFAULT_RESPONSE super().__init__() def response(self): - """Response containing batch items that failed processing, if any""" - return self.items_to_report + """Batch items that failed processing, if any""" + return self.batch_response def _prepare(self): """ @@ -182,7 +182,7 @@ def _prepare(self): """ self.success_messages.clear() self.fail_messages.clear() - self.items_to_report = self.DEFAULT_REPORT + self.batch_response = self.DEFAULT_RESPONSE def _process_record(self, record) -> Tuple: """ @@ -208,9 +208,7 @@ def _clean(self): return messages = self._get_messages_to_report() - self.items_to_report["batchItemFailures"].append(messages) - - return self.items_to_report + self.batch_response = {"batchItemFailures": [messages]} def _has_messages_to_report(self) -> bool: if self.fail_messages: From 46522373e96bf996a1b96ace12e8f5f92b4fd030 Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Sun, 12 Dec 2021 10:07:27 +0100 Subject: [PATCH 04/32] feat(batch): add Kinesis Data streams support --- aws_lambda_powertools/utilities/batch/base.py | 17 +++- tests/functional/test_utilities_batch.py | 96 +++++++++++++++++++ tests/functional/utils.py | 9 ++ tests/utils.py | 0 4 files changed, 120 insertions(+), 2 deletions(-) create mode 100644 tests/utils.py diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 2f1c311e20f..c3a53d79a3b 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -16,7 +16,7 @@ class EventType(Enum): SQS = "SQS" - KinesisDataStream = "KinesisDataStream" + KinesisDataStreams = "KinesisDataStreams" DynamoDB = "DynamoDB" @@ -169,7 +169,11 @@ def __init__(self, event_type: EventType): # refactor: Bring boto3 etc. for deleting permanent exceptions self.event_type = event_type self.batch_response = self.DEFAULT_RESPONSE - + self._COLLECTOR_MAPPING = { + EventType.SQS: self._collect_sqs_failures, + EventType.KinesisDataStreams: self._collect_kinesis_failures, + EventType.DynamoDB: self._collect_dynamodb_failures, + } super().__init__() def response(self): @@ -222,4 +226,13 @@ def _get_messages_to_report(self) -> Dict[str, str]: Format messages to use in batch deletion """ # Refactor: get message per event type + return self._COLLECTOR_MAPPING[self.event_type]() + + def _collect_sqs_failures(self): return {msg["receiptHandle"]: msg["messageId"] for msg in self.fail_messages} + + def _collect_kinesis_failures(self): + return {msg["eventID"]: msg["kinesis"]["sequenceNumber"] for msg in self.fail_messages} + + def _collect_dynamodb_failures(self): + ... diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index d7224821bf1..ce6cd2d8a49 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -1,3 +1,4 @@ +from random import randint from typing import Callable from unittest.mock import patch @@ -8,6 +9,7 @@ from aws_lambda_powertools.utilities.batch import PartialSQSProcessor, batch_processor, sqs_batch_processor from aws_lambda_powertools.utilities.batch.base import BatchProcessor, EventType from aws_lambda_powertools.utilities.batch.exceptions import SQSBatchProcessingError +from tests.functional.utils import decode_kinesis_data, str_to_b64 @pytest.fixture(scope="module") @@ -28,6 +30,31 @@ def factory(body: str): return factory +@pytest.fixture(scope="module") +def kinesis_event_factory() -> Callable: + def factory(body: str): + seq = "".join(str(randint(0, 9)) for _ in range(52)) + partition_key = str(randint(1, 9)) + return { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": partition_key, + "sequenceNumber": seq, + "data": str_to_b64(body), + "approximateArrivalTimestamp": 1545084650.987, + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": f"shardId-000000000006:{seq}", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", + } + + return factory + + @pytest.fixture(scope="module") def record_handler() -> Callable: def handler(record): @@ -39,6 +66,17 @@ def handler(record): return handler +@pytest.fixture(scope="module") +def kinesis_record_handler() -> Callable: + def handler(record): + body = decode_kinesis_data(record) + if "fail" in body: + raise Exception("Failed to process record.") + return body + + return handler + + @pytest.fixture(scope="module") def config() -> Config: return Config(region_name="us-east-1") @@ -366,3 +404,61 @@ def test_batch_processor_context_with_failure(sqs_event_factory, record_handler) assert processed_messages[1] == ("success", second_record["body"], second_record) assert len(batch.fail_messages) == 1 assert batch.response() == {"batchItemFailures": [{first_record["receiptHandle"]: first_record["messageId"]}]} + + +def test_batch_processor_kinesis_context_success_only(kinesis_event_factory, kinesis_record_handler): + # GIVEN + first_record = kinesis_event_factory("success") + second_record = kinesis_event_factory("success") + records = [first_record, second_record] + processor = BatchProcessor(event_type=EventType.KinesisDataStreams) + + # WHEN + with processor(records, kinesis_record_handler) as batch: + processed_messages = batch.process() + + # THEN + assert processed_messages == [ + ("success", decode_kinesis_data(first_record), first_record), + ("success", decode_kinesis_data(second_record), second_record), + ] + + assert batch.response() == {"batchItemFailures": []} + + +def test_batch_processor_kinesis_context_with_failure(kinesis_event_factory, kinesis_record_handler): + # GIVEN + first_record = kinesis_event_factory("failure") + second_record = kinesis_event_factory("success") + records = [first_record, second_record] + processor = BatchProcessor(event_type=EventType.KinesisDataStreams) + + # WHEN + with processor(records, kinesis_record_handler) as batch: + processed_messages = batch.process() + + # THEN + assert processed_messages[1] == ("success", decode_kinesis_data(second_record), second_record) + assert len(batch.fail_messages) == 1 + assert batch.response() == { + "batchItemFailures": [{first_record["eventID"]: first_record["kinesis"]["sequenceNumber"]}] + } + + +def test_batch_processor_kinesis_middleware_with_failure(kinesis_event_factory, kinesis_record_handler): + # GIVEN + first_record = kinesis_event_factory("failure") + second_record = kinesis_event_factory("success") + event = {"Records": [first_record, second_record]} + + processor = BatchProcessor(event_type=EventType.KinesisDataStreams) + + @batch_processor(record_handler=kinesis_record_handler, processor=processor) + def lambda_handler(event, context): + return processor.response() + + # WHEN + result = lambda_handler(event, {}) + + # THEN + assert len(result["batchItemFailures"]) == 1 diff --git a/tests/functional/utils.py b/tests/functional/utils.py index a58d27f3526..180129b1de4 100644 --- a/tests/functional/utils.py +++ b/tests/functional/utils.py @@ -1,3 +1,4 @@ +import base64 import json from pathlib import Path from typing import Any @@ -6,3 +7,11 @@ def load_event(file_name: str) -> Any: path = Path(str(Path(__file__).parent.parent) + "/events/" + file_name) return json.loads(path.read_text()) + + +def str_to_b64(data: str) -> str: + return base64.b64encode(data.encode()).decode("utf-8") + + +def decode_kinesis_data(data: dict) -> str: + return base64.b64decode(data["kinesis"]["data"].encode()).decode("utf-8") diff --git a/tests/utils.py b/tests/utils.py new file mode 100644 index 00000000000..e69de29bb2d From 4cfcd34e19ef3714f862adeb13187a9072388a9d Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Sun, 12 Dec 2021 13:55:51 +0100 Subject: [PATCH 05/32] fix: item identifier key should be constant --- aws_lambda_powertools/utilities/batch/base.py | 4 ++-- tests/functional/test_utilities_batch.py | 9 +++------ 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index c3a53d79a3b..4cf5f2b9cca 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -229,10 +229,10 @@ def _get_messages_to_report(self) -> Dict[str, str]: return self._COLLECTOR_MAPPING[self.event_type]() def _collect_sqs_failures(self): - return {msg["receiptHandle"]: msg["messageId"] for msg in self.fail_messages} + return {"itemIdentifier": msg["messageId"] for msg in self.fail_messages} def _collect_kinesis_failures(self): - return {msg["eventID"]: msg["kinesis"]["sequenceNumber"] for msg in self.fail_messages} + return {"itemIdentifier": msg["kinesis"]["sequenceNumber"] for msg in self.fail_messages} def _collect_dynamodb_failures(self): ... diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index ce6cd2d8a49..70d0751e342 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -34,11 +34,10 @@ def factory(body: str): def kinesis_event_factory() -> Callable: def factory(body: str): seq = "".join(str(randint(0, 9)) for _ in range(52)) - partition_key = str(randint(1, 9)) return { "kinesis": { "kinesisSchemaVersion": "1.0", - "partitionKey": partition_key, + "partitionKey": "1", "sequenceNumber": seq, "data": str_to_b64(body), "approximateArrivalTimestamp": 1545084650.987, @@ -403,7 +402,7 @@ def test_batch_processor_context_with_failure(sqs_event_factory, record_handler) # THEN assert processed_messages[1] == ("success", second_record["body"], second_record) assert len(batch.fail_messages) == 1 - assert batch.response() == {"batchItemFailures": [{first_record["receiptHandle"]: first_record["messageId"]}]} + assert batch.response() == {"batchItemFailures": [{"itemIdentifier": first_record["messageId"]}]} def test_batch_processor_kinesis_context_success_only(kinesis_event_factory, kinesis_record_handler): @@ -440,9 +439,7 @@ def test_batch_processor_kinesis_context_with_failure(kinesis_event_factory, kin # THEN assert processed_messages[1] == ("success", decode_kinesis_data(second_record), second_record) assert len(batch.fail_messages) == 1 - assert batch.response() == { - "batchItemFailures": [{first_record["eventID"]: first_record["kinesis"]["sequenceNumber"]}] - } + assert batch.response() == {"batchItemFailures": [{"itemIdentifier": first_record["kinesis"]["sequenceNumber"]}]} def test_batch_processor_kinesis_middleware_with_failure(kinesis_event_factory, kinesis_record_handler): From 139f52bf0f4d552381d9901d5fa8eda198fc85f1 Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Sun, 12 Dec 2021 14:18:05 +0100 Subject: [PATCH 06/32] feat(batch): add DynamoDB Streams support --- aws_lambda_powertools/utilities/batch/base.py | 6 +- tests/functional/test_utilities_batch.py | 90 +++++++++++++++++++ 2 files changed, 93 insertions(+), 3 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 4cf5f2b9cca..55eb41254a6 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -17,7 +17,7 @@ class EventType(Enum): SQS = "SQS" KinesisDataStreams = "KinesisDataStreams" - DynamoDB = "DynamoDB" + DynamoDBStreams = "DynamoDBStreams" class BasePartialProcessor(ABC): @@ -172,7 +172,7 @@ def __init__(self, event_type: EventType): self._COLLECTOR_MAPPING = { EventType.SQS: self._collect_sqs_failures, EventType.KinesisDataStreams: self._collect_kinesis_failures, - EventType.DynamoDB: self._collect_dynamodb_failures, + EventType.DynamoDBStreams: self._collect_dynamodb_failures, } super().__init__() @@ -235,4 +235,4 @@ def _collect_kinesis_failures(self): return {"itemIdentifier": msg["kinesis"]["sequenceNumber"] for msg in self.fail_messages} def _collect_dynamodb_failures(self): - ... + return {"itemIdentifier": msg["dynamodb"]["SequenceNumber"] for msg in self.fail_messages} diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index 70d0751e342..d37f34d8a65 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -54,6 +54,29 @@ def factory(body: str): return factory +@pytest.fixture(scope="module") +def dynamodb_event_factory() -> Callable: + def factory(body: str): + seq = "".join(str(randint(0, 9)) for _ in range(10)) + return { + "eventID": "1", + "eventVersion": "1.0", + "dynamodb": { + "Keys": {"Id": {"N": "101"}}, + "NewImage": {"message": {"S": body}}, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": seq, + "SizeBytes": 26, + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "eventsource_arn", + "eventSource": "aws:dynamodb", + } + + return factory + + @pytest.fixture(scope="module") def record_handler() -> Callable: def handler(record): @@ -76,6 +99,17 @@ def handler(record): return handler +@pytest.fixture(scope="module") +def dynamodb_record_handler() -> Callable: + def handler(record): + body = record["dynamodb"]["NewImage"]["message"]["S"] + if "fail" in body: + raise Exception("Failed to process record.") + return body + + return handler + + @pytest.fixture(scope="module") def config() -> Config: return Config(region_name="us-east-1") @@ -459,3 +493,59 @@ def lambda_handler(event, context): # THEN assert len(result["batchItemFailures"]) == 1 + + +def test_batch_processor_dynamodb_context_success_only(dynamodb_event_factory, dynamodb_record_handler): + # GIVEN + first_record = dynamodb_event_factory("success") + second_record = dynamodb_event_factory("success") + records = [first_record, second_record] + processor = BatchProcessor(event_type=EventType.DynamoDBStreams) + + # WHEN + with processor(records, dynamodb_record_handler) as batch: + processed_messages = batch.process() + + # THEN + assert processed_messages == [ + ("success", first_record["dynamodb"]["NewImage"]["message"]["S"], first_record), + ("success", second_record["dynamodb"]["NewImage"]["message"]["S"], second_record), + ] + + assert batch.response() == {"batchItemFailures": []} + + +def test_batch_processor_dynamodb_context_with_failure(dynamodb_event_factory, dynamodb_record_handler): + # GIVEN + first_record = dynamodb_event_factory("failure") + second_record = dynamodb_event_factory("success") + records = [first_record, second_record] + processor = BatchProcessor(event_type=EventType.DynamoDBStreams) + + # WHEN + with processor(records, dynamodb_record_handler) as batch: + processed_messages = batch.process() + + # THEN + assert processed_messages[1] == ("success", second_record["dynamodb"]["NewImage"]["message"]["S"], second_record) + assert len(batch.fail_messages) == 1 + assert batch.response() == {"batchItemFailures": [{"itemIdentifier": first_record["dynamodb"]["SequenceNumber"]}]} + + +def test_batch_processor_dynamodb_middleware_with_failure(dynamodb_event_factory, dynamodb_record_handler): + # GIVEN + first_record = dynamodb_event_factory("failure") + second_record = dynamodb_event_factory("success") + event = {"Records": [first_record, second_record]} + + processor = BatchProcessor(event_type=EventType.DynamoDBStreams) + + @batch_processor(record_handler=dynamodb_record_handler, processor=processor) + def lambda_handler(event, context): + return processor.response() + + # WHEN + result = lambda_handler(event, {}) + + # THEN + assert len(result["batchItemFailures"]) == 1 From 18224567eec7ef82710b693b158cfb03be4332eb Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Sun, 12 Dec 2021 14:52:39 +0100 Subject: [PATCH 07/32] feat(batch): use event source data classes by default --- .../utilities/batch/__init__.py | 13 +++- aws_lambda_powertools/utilities/batch/base.py | 13 +++- tests/functional/test_utilities_batch.py | 73 ++++++++++--------- tests/functional/utils.py | 4 +- 4 files changed, 62 insertions(+), 41 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/__init__.py b/aws_lambda_powertools/utilities/batch/__init__.py index d308a56abda..1ad1cae7141 100644 --- a/aws_lambda_powertools/utilities/batch/__init__.py +++ b/aws_lambda_powertools/utilities/batch/__init__.py @@ -4,7 +4,14 @@ Batch processing utility """ -from .base import BasePartialProcessor, batch_processor -from .sqs import PartialSQSProcessor, sqs_batch_processor +from aws_lambda_powertools.utilities.batch.base import BasePartialProcessor, BatchProcessor, EventType, batch_processor +from aws_lambda_powertools.utilities.batch.sqs import PartialSQSProcessor, sqs_batch_processor -__all__ = ("BasePartialProcessor", "PartialSQSProcessor", "batch_processor", "sqs_batch_processor") +__all__ = ( + "BatchProcessor", + "BasePartialProcessor", + "EventType", + "PartialSQSProcessor", + "batch_processor", + "sqs_batch_processor", +) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 55eb41254a6..4674f45aa2d 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -10,6 +10,9 @@ from typing import Any, Callable, Dict, List, Optional, Tuple from aws_lambda_powertools.middleware_factory import lambda_handler_decorator +from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord +from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord logger = logging.getLogger(__name__) @@ -174,6 +177,12 @@ def __init__(self, event_type: EventType): EventType.KinesisDataStreams: self._collect_kinesis_failures, EventType.DynamoDBStreams: self._collect_dynamodb_failures, } + self._DATA_CLASS_MAPPING = { + EventType.SQS: SQSRecord, + EventType.KinesisDataStreams: KinesisStreamRecord, + EventType.DynamoDBStreams: DynamoDBRecord, + } + super().__init__() def response(self): @@ -198,7 +207,8 @@ def _process_record(self, record) -> Tuple: An object to be processed. """ try: - result = self.handler(record=record) + data = self._DATA_CLASS_MAPPING[self.event_type](record) + result = self.handler(record=data) return self.success_handler(record=record, result=result) except Exception: return self.failure_handler(record=record, exception=sys.exc_info()) @@ -225,7 +235,6 @@ def _get_messages_to_report(self) -> Dict[str, str]: """ Format messages to use in batch deletion """ - # Refactor: get message per event type return self._COLLECTOR_MAPPING[self.event_type]() def _collect_sqs_failures(self): diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index d37f34d8a65..d7a8838883d 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -9,7 +9,10 @@ from aws_lambda_powertools.utilities.batch import PartialSQSProcessor, batch_processor, sqs_batch_processor from aws_lambda_powertools.utilities.batch.base import BatchProcessor, EventType from aws_lambda_powertools.utilities.batch.exceptions import SQSBatchProcessingError -from tests.functional.utils import decode_kinesis_data, str_to_b64 +from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord +from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from tests.functional.utils import b64_to_str, str_to_b64 @pytest.fixture(scope="module") @@ -90,8 +93,8 @@ def handler(record): @pytest.fixture(scope="module") def kinesis_record_handler() -> Callable: - def handler(record): - body = decode_kinesis_data(record) + def handler(record: KinesisStreamRecord): + body = b64_to_str(record.kinesis.data) if "fail" in body: raise Exception("Failed to process record.") return body @@ -101,8 +104,8 @@ def handler(record): @pytest.fixture(scope="module") def dynamodb_record_handler() -> Callable: - def handler(record): - body = record["dynamodb"]["NewImage"]["message"]["S"] + def handler(record: DynamoDBRecord): + body = record.dynamodb.new_image.get("message").get_value if "fail" in body: raise Exception("Failed to process record.") return body @@ -366,9 +369,9 @@ def test_partial_sqs_processor_context_only_failure(sqs_event_factory, record_ha def test_batch_processor_middleware_success_only(sqs_event_factory, record_handler): # GIVEN - first_record = sqs_event_factory("success") - second_record = sqs_event_factory("success") - event = {"Records": [first_record, second_record]} + first_record = SQSRecord(sqs_event_factory("success")) + second_record = SQSRecord(sqs_event_factory("success")) + event = {"Records": [first_record.raw_event, second_record.raw_event]} processor = BatchProcessor(event_type=EventType.SQS) @@ -385,9 +388,9 @@ def lambda_handler(event, context): def test_batch_processor_middleware_with_failure(sqs_event_factory, record_handler): # GIVEN - first_record = sqs_event_factory("fail") - second_record = sqs_event_factory("success") - event = {"Records": [first_record, second_record]} + first_record = SQSRecord(sqs_event_factory("fail")) + second_record = SQSRecord(sqs_event_factory("success")) + event = {"Records": [first_record.raw_event, second_record.raw_event]} processor = BatchProcessor(event_type=EventType.SQS) @@ -404,9 +407,9 @@ def lambda_handler(event, context): def test_batch_processor_context_success_only(sqs_event_factory, record_handler): # GIVEN - first_record = sqs_event_factory("success") - second_record = sqs_event_factory("success") - records = [first_record, second_record] + first_record = SQSRecord(sqs_event_factory("success")) + second_record = SQSRecord(sqs_event_factory("success")) + records = [first_record.raw_event, second_record.raw_event] processor = BatchProcessor(event_type=EventType.SQS) # WHEN @@ -415,8 +418,8 @@ def test_batch_processor_context_success_only(sqs_event_factory, record_handler) # THEN assert processed_messages == [ - ("success", first_record["body"], first_record), - ("success", second_record["body"], second_record), + ("success", first_record.body, first_record.raw_event), + ("success", second_record.body, second_record.raw_event), ] assert batch.response() == {"batchItemFailures": []} @@ -424,9 +427,9 @@ def test_batch_processor_context_success_only(sqs_event_factory, record_handler) def test_batch_processor_context_with_failure(sqs_event_factory, record_handler): # GIVEN - first_record = sqs_event_factory("failure") - second_record = sqs_event_factory("success") - records = [first_record, second_record] + first_record = SQSRecord(sqs_event_factory("failure")) + second_record = SQSRecord(sqs_event_factory("success")) + records = [first_record.raw_event, second_record.raw_event] processor = BatchProcessor(event_type=EventType.SQS) # WHEN @@ -434,16 +437,17 @@ def test_batch_processor_context_with_failure(sqs_event_factory, record_handler) processed_messages = batch.process() # THEN - assert processed_messages[1] == ("success", second_record["body"], second_record) + assert processed_messages[1] == ("success", second_record.body, second_record.raw_event) assert len(batch.fail_messages) == 1 - assert batch.response() == {"batchItemFailures": [{"itemIdentifier": first_record["messageId"]}]} + assert batch.response() == {"batchItemFailures": [{"itemIdentifier": first_record.message_id}]} def test_batch_processor_kinesis_context_success_only(kinesis_event_factory, kinesis_record_handler): # GIVEN - first_record = kinesis_event_factory("success") - second_record = kinesis_event_factory("success") - records = [first_record, second_record] + first_record = KinesisStreamRecord(kinesis_event_factory("success")) + second_record = KinesisStreamRecord(kinesis_event_factory("success")) + + records = [first_record.raw_event, second_record.raw_event] processor = BatchProcessor(event_type=EventType.KinesisDataStreams) # WHEN @@ -452,8 +456,8 @@ def test_batch_processor_kinesis_context_success_only(kinesis_event_factory, kin # THEN assert processed_messages == [ - ("success", decode_kinesis_data(first_record), first_record), - ("success", decode_kinesis_data(second_record), second_record), + ("success", b64_to_str(first_record.kinesis.data), first_record.raw_event), + ("success", b64_to_str(second_record.kinesis.data), second_record.raw_event), ] assert batch.response() == {"batchItemFailures": []} @@ -461,9 +465,10 @@ def test_batch_processor_kinesis_context_success_only(kinesis_event_factory, kin def test_batch_processor_kinesis_context_with_failure(kinesis_event_factory, kinesis_record_handler): # GIVEN - first_record = kinesis_event_factory("failure") - second_record = kinesis_event_factory("success") - records = [first_record, second_record] + first_record = KinesisStreamRecord(kinesis_event_factory("failure")) + second_record = KinesisStreamRecord(kinesis_event_factory("success")) + + records = [first_record.raw_event, second_record.raw_event] processor = BatchProcessor(event_type=EventType.KinesisDataStreams) # WHEN @@ -471,16 +476,16 @@ def test_batch_processor_kinesis_context_with_failure(kinesis_event_factory, kin processed_messages = batch.process() # THEN - assert processed_messages[1] == ("success", decode_kinesis_data(second_record), second_record) + assert processed_messages[1] == ("success", b64_to_str(second_record.kinesis.data), second_record.raw_event) assert len(batch.fail_messages) == 1 - assert batch.response() == {"batchItemFailures": [{"itemIdentifier": first_record["kinesis"]["sequenceNumber"]}]} + assert batch.response() == {"batchItemFailures": [{"itemIdentifier": first_record.kinesis.sequence_number}]} def test_batch_processor_kinesis_middleware_with_failure(kinesis_event_factory, kinesis_record_handler): # GIVEN - first_record = kinesis_event_factory("failure") - second_record = kinesis_event_factory("success") - event = {"Records": [first_record, second_record]} + first_record = KinesisStreamRecord(kinesis_event_factory("failure")) + second_record = KinesisStreamRecord(kinesis_event_factory("success")) + event = {"Records": [first_record.raw_event, second_record.raw_event]} processor = BatchProcessor(event_type=EventType.KinesisDataStreams) diff --git a/tests/functional/utils.py b/tests/functional/utils.py index 180129b1de4..703f21744e2 100644 --- a/tests/functional/utils.py +++ b/tests/functional/utils.py @@ -13,5 +13,5 @@ def str_to_b64(data: str) -> str: return base64.b64encode(data.encode()).decode("utf-8") -def decode_kinesis_data(data: dict) -> str: - return base64.b64decode(data["kinesis"]["data"].encode()).decode("utf-8") +def b64_to_str(data: str) -> str: + return base64.b64decode(data.encode()).decode("utf-8") From c75731632b83bc2e5fb71759e8dca8d7334875d6 Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Sun, 12 Dec 2021 14:53:43 +0100 Subject: [PATCH 08/32] chore: permanent exceptions TBD in separate PR --- aws_lambda_powertools/utilities/batch/base.py | 1 - 1 file changed, 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 4674f45aa2d..d0a3ce909f5 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -169,7 +169,6 @@ def __init__(self, event_type: EventType): event_type: EventType Whether this is a SQS, DynamoDB Streams, or Kinesis Data Stream event """ - # refactor: Bring boto3 etc. for deleting permanent exceptions self.event_type = event_type self.batch_response = self.DEFAULT_RESPONSE self._COLLECTOR_MAPPING = { From 3217097db4c00d57513953a3c0080ffad66f5200 Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Sun, 12 Dec 2021 20:02:11 +0100 Subject: [PATCH 09/32] feat: mypy support --- aws_lambda_powertools/utilities/batch/base.py | 31 +++++++++++++------ 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index d0a3ce909f5..db17b74b915 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -7,7 +7,8 @@ import sys from abc import ABC, abstractmethod from enum import Enum -from typing import Any, Callable, Dict, List, Optional, Tuple +from types import TracebackType +from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union from aws_lambda_powertools.middleware_factory import lambda_handler_decorator from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord @@ -15,6 +16,11 @@ from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord logger = logging.getLogger(__name__) +SuccessCallback = Tuple[str, Any, dict] +FailureCallback = Tuple[str, str, dict] + +_ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType] +_OptExcInfo = Union[_ExcInfo, Tuple[None, None, None]] class EventType(Enum): @@ -48,7 +54,7 @@ def _clean(self): raise NotImplementedError() @abstractmethod - def _process_record(self, record: Any): + def _process_record(self, record: dict): """ Process record with handler. """ @@ -67,13 +73,13 @@ def __enter__(self): def __exit__(self, exception_type, exception_value, traceback): self._clean() - def __call__(self, records: List[Any], handler: Callable): + def __call__(self, records: List[dict], handler: Callable): """ Set instance attributes before execution Parameters ---------- - records: List[Any] + records: List[dict] List with objects to be processed. handler: Callable Callable to process "records" entries. @@ -82,7 +88,7 @@ def __call__(self, records: List[Any], handler: Callable): self.handler = handler return self - def success_handler(self, record: Any, result: Any): + def success_handler(self, record: dict, result: Any) -> SuccessCallback: """ Success callback @@ -95,7 +101,7 @@ def success_handler(self, record: Any, result: Any): self.success_messages.append(record) return entry - def failure_handler(self, record: Any, exception: Tuple): + def failure_handler(self, record: dict, exception: _OptExcInfo) -> FailureCallback: """ Failure callback @@ -196,17 +202,17 @@ def _prepare(self): self.fail_messages.clear() self.batch_response = self.DEFAULT_RESPONSE - def _process_record(self, record) -> Tuple: + def _process_record(self, record: dict) -> Union[SuccessCallback, FailureCallback]: """ Process a record with instance's handler Parameters ---------- - record: Any - An object to be processed. + record: dict + A batch record to be processed. """ try: - data = self._DATA_CLASS_MAPPING[self.event_type](record) + data = self._to_batch_type(record, event_type=self.event_type) result = self.handler(record=data) return self.success_handler(record=record, result=result) except Exception: @@ -244,3 +250,8 @@ def _collect_kinesis_failures(self): def _collect_dynamodb_failures(self): return {"itemIdentifier": msg["dynamodb"]["SequenceNumber"] for msg in self.fail_messages} + + def _to_batch_type( + self, record: dict, event_type: EventType + ) -> Union[SQSRecord, KinesisStreamRecord, DynamoDBRecord]: + return self._DATA_CLASS_MAPPING[event_type](record) # type: ignore # since DictWrapper inference is incorrect From 09257baa752fb444d1463a8dbd6281effdd83d4b Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Sun, 12 Dec 2021 20:52:51 +0100 Subject: [PATCH 10/32] feat: draft implementation --- aws_lambda_powertools/utilities/batch/base.py | 32 ++++++++++++++++--- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index db17b74b915..d1435f31f31 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -8,7 +8,7 @@ from abc import ABC, abstractmethod from enum import Enum from types import TracebackType -from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union, overload from aws_lambda_powertools.middleware_factory import lambda_handler_decorator from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord @@ -16,12 +16,20 @@ from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord logger = logging.getLogger(__name__) +has_pydantic = "pydantic" in sys.modules + SuccessCallback = Tuple[str, Any, dict] FailureCallback = Tuple[str, str, dict] - _ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType] _OptExcInfo = Union[_ExcInfo, Tuple[None, None, None]] +if has_pydantic: + from aws_lambda_powertools.utilities.parser.models import DynamoDBStreamRecordModel + from aws_lambda_powertools.utilities.parser.models import KinesisDataStreamRecord as KinesisDataStreamRecordModel + from aws_lambda_powertools.utilities.parser.models import SqsRecordModel + + BatchTypeModels = Union[SqsRecordModel, DynamoDBStreamRecordModel, KinesisDataStreamRecordModel] + class EventType(Enum): SQS = "SQS" @@ -167,15 +175,18 @@ def batch_processor( class BatchProcessor(BasePartialProcessor): DEFAULT_RESPONSE: Dict[str, List[Optional[dict]]] = {"batchItemFailures": []} - def __init__(self, event_type: EventType): + def __init__(self, event_type: EventType, model: Optional["BatchTypeModels"] = None): """Process batch and partially report failed items Parameters ---------- event_type: EventType Whether this is a SQS, DynamoDB Streams, or Kinesis Data Stream event + model: Optional["BatchTypeModels"] + Parser's data model using either SqsRecordModel, DynamoDBStreamRecordModel, KinesisDataStreamRecord """ self.event_type = event_type + self.model = model self.batch_response = self.DEFAULT_RESPONSE self._COLLECTOR_MAPPING = { EventType.SQS: self._collect_sqs_failures, @@ -212,7 +223,7 @@ def _process_record(self, record: dict) -> Union[SuccessCallback, FailureCallbac A batch record to be processed. """ try: - data = self._to_batch_type(record, event_type=self.event_type) + data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model) result = self.handler(record=data) return self.success_handler(record=record, result=result) except Exception: @@ -251,7 +262,18 @@ def _collect_kinesis_failures(self): def _collect_dynamodb_failures(self): return {"itemIdentifier": msg["dynamodb"]["SequenceNumber"] for msg in self.fail_messages} + @overload + def _to_batch_type(self, record: dict, event_type: EventType, model: "BatchTypeModels") -> "BatchTypeModels": + ... + + @overload def _to_batch_type( self, record: dict, event_type: EventType ) -> Union[SQSRecord, KinesisStreamRecord, DynamoDBRecord]: - return self._DATA_CLASS_MAPPING[event_type](record) # type: ignore # since DictWrapper inference is incorrect + ... + + def _to_batch_type(self, record: dict, event_type: EventType, model: Optional["BatchTypeModels"] = None): + if model: + return model.parse_obj(record) + else: + return self._DATA_CLASS_MAPPING[event_type](record) From 251541c79479cdb169253913b8bc0cd66a7aef53 Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Mon, 13 Dec 2021 14:32:49 +0100 Subject: [PATCH 11/32] fix: mypy typing --- aws_lambda_powertools/utilities/batch/base.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index d1435f31f31..358b8df760c 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -28,7 +28,9 @@ from aws_lambda_powertools.utilities.parser.models import KinesisDataStreamRecord as KinesisDataStreamRecordModel from aws_lambda_powertools.utilities.parser.models import SqsRecordModel - BatchTypeModels = Union[SqsRecordModel, DynamoDBStreamRecordModel, KinesisDataStreamRecordModel] + BatchTypeModels = Optional[ + Union[Type[SqsRecordModel], Type[DynamoDBStreamRecordModel], Type[KinesisDataStreamRecordModel]] + ] class EventType(Enum): From 4c95d39fb829ef5c10097523e597dd52de4f5cf0 Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Mon, 13 Dec 2021 17:22:02 +0100 Subject: [PATCH 12/32] chore: improve mypy support on success/failure --- .pre-commit-config.yaml | 4 -- aws_lambda_powertools/utilities/batch/base.py | 46 +++++++++++-------- 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f42337d5c5b..61e98378017 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -11,10 +11,6 @@ repos: - id: trailing-whitespace - id: end-of-file-fixer - id: check-toml - - repo: https://github.com/pre-commit/pygrep-hooks - rev: v1.5.1 - hooks: - - id: python-use-type-annotations - repo: local hooks: - id: black diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 358b8df760c..b5011b033ad 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -16,13 +16,23 @@ from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord logger = logging.getLogger(__name__) -has_pydantic = "pydantic" in sys.modules -SuccessCallback = Tuple[str, Any, dict] -FailureCallback = Tuple[str, str, dict] + +class EventType(Enum): + SQS = "SQS" + KinesisDataStreams = "KinesisDataStreams" + DynamoDBStreams = "DynamoDBStreams" + + +# +# type specifics +# +has_pydantic = "pydantic" in sys.modules _ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType] _OptExcInfo = Union[_ExcInfo, Tuple[None, None, None]] +# For IntelliSense and Mypy to work, we need to account for possible SQS, Kinesis and DynamoDB subclasses +# We need them as subclasses as we must access their message ID or sequence number metadata via dot notation if has_pydantic: from aws_lambda_powertools.utilities.parser.models import DynamoDBStreamRecordModel from aws_lambda_powertools.utilities.parser.models import KinesisDataStreamRecord as KinesisDataStreamRecordModel @@ -32,11 +42,13 @@ Union[Type[SqsRecordModel], Type[DynamoDBStreamRecordModel], Type[KinesisDataStreamRecordModel]] ] - -class EventType(Enum): - SQS = "SQS" - KinesisDataStreams = "KinesisDataStreams" - DynamoDBStreams = "DynamoDBStreams" +# When using processor with default arguments, records will carry EventSourceDataClassTypes +# and depending on what EventType it's passed it'll correctly map to the right record +# When using Pydantic Models, it'll accept any +EventSourceDataClassTypes = Union[SQSRecord, KinesisStreamRecord, DynamoDBRecord] +BatchEventTypes = Union[EventSourceDataClassTypes, "BatchTypeModels"] +SuccessCallback = Tuple[str, Any, BatchEventTypes] +FailureCallback = Tuple[str, str, BatchEventTypes] class BasePartialProcessor(ABC): @@ -45,8 +57,8 @@ class BasePartialProcessor(ABC): """ def __init__(self): - self.success_messages: List = [] - self.fail_messages: List = [] + self.success_messages: List[BatchEventTypes] = [] + self.fail_messages: List[BatchEventTypes] = [] self.exceptions: List = [] @abstractmethod @@ -98,7 +110,7 @@ def __call__(self, records: List[dict], handler: Callable): self.handler = handler return self - def success_handler(self, record: dict, result: Any) -> SuccessCallback: + def success_handler(self, record, result: Any) -> SuccessCallback: """ Success callback @@ -111,7 +123,7 @@ def success_handler(self, record: dict, result: Any) -> SuccessCallback: self.success_messages.append(record) return entry - def failure_handler(self, record: dict, exception: _OptExcInfo) -> FailureCallback: + def failure_handler(self, record, exception: _OptExcInfo) -> FailureCallback: """ Failure callback @@ -256,22 +268,20 @@ def _get_messages_to_report(self) -> Dict[str, str]: return self._COLLECTOR_MAPPING[self.event_type]() def _collect_sqs_failures(self): - return {"itemIdentifier": msg["messageId"] for msg in self.fail_messages} + return {"itemIdentifier": msg.messageId for msg in self.fail_messages} def _collect_kinesis_failures(self): - return {"itemIdentifier": msg["kinesis"]["sequenceNumber"] for msg in self.fail_messages} + return {"itemIdentifier": msg.kinesis.sequence_number for msg in self.fail_messages} def _collect_dynamodb_failures(self): - return {"itemIdentifier": msg["dynamodb"]["SequenceNumber"] for msg in self.fail_messages} + return {"itemIdentifier": msg.dynamodb.sequence_number for msg in self.fail_messages} @overload def _to_batch_type(self, record: dict, event_type: EventType, model: "BatchTypeModels") -> "BatchTypeModels": ... @overload - def _to_batch_type( - self, record: dict, event_type: EventType - ) -> Union[SQSRecord, KinesisStreamRecord, DynamoDBRecord]: + def _to_batch_type(self, record: dict, event_type: EventType) -> EventSourceDataClassTypes: ... def _to_batch_type(self, record: dict, event_type: EventType, model: Optional["BatchTypeModels"] = None): From 7756d2c338875c2eb3103319b04221ee016a78c8 Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Mon, 13 Dec 2021 17:36:43 +0100 Subject: [PATCH 13/32] fix: failure handler record types --- aws_lambda_powertools/utilities/batch/base.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index b5011b033ad..77e63ea6c8c 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -236,12 +236,12 @@ def _process_record(self, record: dict) -> Union[SuccessCallback, FailureCallbac record: dict A batch record to be processed. """ + data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model) try: - data = self._to_batch_type(record=record, event_type=self.event_type, model=self.model) result = self.handler(record=data) return self.success_handler(record=record, result=result) except Exception: - return self.failure_handler(record=record, exception=sys.exc_info()) + return self.failure_handler(record=data, exception=sys.exc_info()) def _clean(self): """ @@ -268,7 +268,7 @@ def _get_messages_to_report(self) -> Dict[str, str]: return self._COLLECTOR_MAPPING[self.event_type]() def _collect_sqs_failures(self): - return {"itemIdentifier": msg.messageId for msg in self.fail_messages} + return {"itemIdentifier": msg.message_id for msg in self.fail_messages} def _collect_kinesis_failures(self): return {"itemIdentifier": msg.kinesis.sequence_number for msg in self.fail_messages} @@ -285,7 +285,7 @@ def _to_batch_type(self, record: dict, event_type: EventType) -> EventSourceData ... def _to_batch_type(self, record: dict, event_type: EventType, model: Optional["BatchTypeModels"] = None): - if model: + if model is not None: return model.parse_obj(record) else: return self._DATA_CLASS_MAPPING[event_type](record) From 1b242eb540e29bbb698d6f0378328009b188d006 Mon Sep 17 00:00:00 2001 From: Heitor Lessa <lessa@amazon.co.uk> Date: Fri, 17 Dec 2021 10:53:49 +0100 Subject: [PATCH 14/32] fix: copy data not pointer if one subclasses it Co-authored-by: Guilherme Martins Crocetti <gmcrocetti@gmail.com> --- aws_lambda_powertools/utilities/batch/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 77e63ea6c8c..4edab87a242 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -201,7 +201,7 @@ def __init__(self, event_type: EventType, model: Optional["BatchTypeModels"] = N """ self.event_type = event_type self.model = model - self.batch_response = self.DEFAULT_RESPONSE + self.batch_response = deepcopy(self.DEFAULT_RESPONSE) self._COLLECTOR_MAPPING = { EventType.SQS: self._collect_sqs_failures, EventType.KinesisDataStreams: self._collect_kinesis_failures, From 4a7ceeae7ea12ee8332d1a063ae0d8460525beff Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Fri, 17 Dec 2021 10:59:59 +0100 Subject: [PATCH 15/32] chore: linting --- aws_lambda_powertools/utilities/batch/base.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 4edab87a242..5b3c2a4459b 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -3,6 +3,7 @@ """ Batch processing utilities """ +import copy import logging import sys from abc import ABC, abstractmethod @@ -201,7 +202,7 @@ def __init__(self, event_type: EventType, model: Optional["BatchTypeModels"] = N """ self.event_type = event_type self.model = model - self.batch_response = deepcopy(self.DEFAULT_RESPONSE) + self.batch_response = copy.deepcopy(self.DEFAULT_RESPONSE) self._COLLECTOR_MAPPING = { EventType.SQS: self._collect_sqs_failures, EventType.KinesisDataStreams: self._collect_kinesis_failures, From 53b8e755d856dbb034ae81cb740628adbf4be504 Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Fri, 17 Dec 2021 11:01:35 +0100 Subject: [PATCH 16/32] chore: address Tom's feedback on type name --- aws_lambda_powertools/utilities/batch/base.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 5b3c2a4459b..b45b57ec4fc 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -48,8 +48,8 @@ class EventType(Enum): # When using Pydantic Models, it'll accept any EventSourceDataClassTypes = Union[SQSRecord, KinesisStreamRecord, DynamoDBRecord] BatchEventTypes = Union[EventSourceDataClassTypes, "BatchTypeModels"] -SuccessCallback = Tuple[str, Any, BatchEventTypes] -FailureCallback = Tuple[str, str, BatchEventTypes] +SuccessResponse = Tuple[str, Any, BatchEventTypes] +FailureResponse = Tuple[str, str, BatchEventTypes] class BasePartialProcessor(ABC): @@ -111,7 +111,7 @@ def __call__(self, records: List[dict], handler: Callable): self.handler = handler return self - def success_handler(self, record, result: Any) -> SuccessCallback: + def success_handler(self, record, result: Any) -> SuccessResponse: """ Success callback @@ -124,7 +124,7 @@ def success_handler(self, record, result: Any) -> SuccessCallback: self.success_messages.append(record) return entry - def failure_handler(self, record, exception: _OptExcInfo) -> FailureCallback: + def failure_handler(self, record, exception: _OptExcInfo) -> FailureResponse: """ Failure callback @@ -228,7 +228,7 @@ def _prepare(self): self.fail_messages.clear() self.batch_response = self.DEFAULT_RESPONSE - def _process_record(self, record: dict) -> Union[SuccessCallback, FailureCallback]: + def _process_record(self, record: dict) -> Union[SuccessResponse, FailureResponse]: """ Process a record with instance's handler From b0f170e04622a6efdf301aba3f98f3b76ff812f6 Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Fri, 17 Dec 2021 15:11:46 +0100 Subject: [PATCH 17/32] chore: test model support --- aws_lambda_powertools/utilities/batch/base.py | 19 +- tests/functional/test_utilities_batch.py | 272 +++++++++++++++++- 2 files changed, 281 insertions(+), 10 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index b45b57ec4fc..56d0bf755f5 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -213,6 +213,7 @@ def __init__(self, event_type: EventType, model: Optional["BatchTypeModels"] = N EventType.KinesisDataStreams: KinesisStreamRecord, EventType.DynamoDBStreams: DynamoDBRecord, } + self._EVENT_ID_MAPPING = {} super().__init__() @@ -268,14 +269,26 @@ def _get_messages_to_report(self) -> Dict[str, str]: """ return self._COLLECTOR_MAPPING[self.event_type]() + # Event Source Data Classes follow python idioms for fields + # while Parser/Pydantic follows the event field names to the latter def _collect_sqs_failures(self): - return {"itemIdentifier": msg.message_id for msg in self.fail_messages} + if self.model: + return {"itemIdentifier": msg.messageId for msg in self.fail_messages} + else: + return {"itemIdentifier": msg.message_id for msg in self.fail_messages} def _collect_kinesis_failures(self): - return {"itemIdentifier": msg.kinesis.sequence_number for msg in self.fail_messages} + if self.model: + # Pydantic model uses int but Lambda poller expects str + return {"itemIdentifier": msg.kinesis.sequenceNumber for msg in self.fail_messages} + else: + return {"itemIdentifier": msg.kinesis.sequence_number for msg in self.fail_messages} def _collect_dynamodb_failures(self): - return {"itemIdentifier": msg.dynamodb.sequence_number for msg in self.fail_messages} + if self.model: + return {"itemIdentifier": msg.dynamodb.SequenceNumber for msg in self.fail_messages} + else: + return {"itemIdentifier": msg.dynamodb.sequence_number for msg in self.fail_messages} @overload def _to_batch_type(self, record: dict, event_type: EventType, model: "BatchTypeModels") -> "BatchTypeModels": diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index d7a8838883d..929a5bbff19 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -1,5 +1,6 @@ +import json from random import randint -from typing import Callable +from typing import Callable, Dict, Optional from unittest.mock import patch import pytest @@ -12,6 +13,11 @@ from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.parser import BaseModel, validator +from aws_lambda_powertools.utilities.parser.models import DynamoDBStreamChangedRecordModel, DynamoDBStreamRecordModel +from aws_lambda_powertools.utilities.parser.models import KinesisDataStreamRecord as KinesisDataStreamRecordModel +from aws_lambda_powertools.utilities.parser.models import KinesisDataStreamRecordPayload, SqsRecordModel +from aws_lambda_powertools.utilities.parser.types import Literal from tests.functional.utils import b64_to_str, str_to_b64 @@ -22,7 +28,12 @@ def factory(body: str): "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", "body": body, - "attributes": {}, + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185", + }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", @@ -66,7 +77,7 @@ def factory(body: str): "eventVersion": "1.0", "dynamodb": { "Keys": {"Id": {"N": "101"}}, - "NewImage": {"message": {"S": body}}, + "NewImage": {"Message": {"S": body}}, "StreamViewType": "NEW_AND_OLD_IMAGES", "SequenceNumber": seq, "SizeBytes": 26, @@ -105,7 +116,7 @@ def handler(record: KinesisStreamRecord): @pytest.fixture(scope="module") def dynamodb_record_handler() -> Callable: def handler(record: DynamoDBRecord): - body = record.dynamodb.new_image.get("message").get_value + body = record.dynamodb.new_image.get("Message").get_value if "fail" in body: raise Exception("Failed to process record.") return body @@ -142,6 +153,14 @@ def stubbed_partial_processor_suppressed(config) -> PartialSQSProcessor: yield stubber, processor +@pytest.fixture(scope="module") +def order_event_factory() -> Callable: + def factory(item: Dict) -> str: + return json.dumps({"item": item}) + + return factory + + def test_partial_sqs_processor_context_with_failure(sqs_event_factory, record_handler, partial_processor): """ Test processor with one failing record @@ -513,8 +532,8 @@ def test_batch_processor_dynamodb_context_success_only(dynamodb_event_factory, d # THEN assert processed_messages == [ - ("success", first_record["dynamodb"]["NewImage"]["message"]["S"], first_record), - ("success", second_record["dynamodb"]["NewImage"]["message"]["S"], second_record), + ("success", first_record["dynamodb"]["NewImage"]["Message"]["S"], first_record), + ("success", second_record["dynamodb"]["NewImage"]["Message"]["S"], second_record), ] assert batch.response() == {"batchItemFailures": []} @@ -532,7 +551,7 @@ def test_batch_processor_dynamodb_context_with_failure(dynamodb_event_factory, d processed_messages = batch.process() # THEN - assert processed_messages[1] == ("success", second_record["dynamodb"]["NewImage"]["message"]["S"], second_record) + assert processed_messages[1] == ("success", second_record["dynamodb"]["NewImage"]["Message"]["S"], second_record) assert len(batch.fail_messages) == 1 assert batch.response() == {"batchItemFailures": [{"itemIdentifier": first_record["dynamodb"]["SequenceNumber"]}]} @@ -554,3 +573,242 @@ def lambda_handler(event, context): # THEN assert len(result["batchItemFailures"]) == 1 + + +def test_batch_processor_context_model(sqs_event_factory, order_event_factory): + # GIVEN + class Order(BaseModel): + item: dict + + class OrderSqs(SqsRecordModel): + body: Order + + # auto transform json string + # so Pydantic can auto-initialize nested Order model + @validator("body", pre=True) + def transform_body_to_dict(cls, value: str): + return json.loads(value) + + def record_handler(record: OrderSqs): + return record.body.item + + order_event = order_event_factory({"type": "success"}) + first_record = sqs_event_factory(order_event) + second_record = sqs_event_factory(order_event) + records = [first_record, second_record] + + # WHEN + processor = BatchProcessor(event_type=EventType.SQS, model=OrderSqs) + with processor(records, record_handler) as batch: + processed_messages = batch.process() + + # THEN + order_item = json.loads(order_event)["item"] + assert processed_messages == [ + ("success", order_item, first_record), + ("success", order_item, second_record), + ] + + assert batch.response() == {"batchItemFailures": []} + + +def test_batch_processor_context_model_with_failure(sqs_event_factory, order_event_factory): + # GIVEN + class Order(BaseModel): + item: dict + + class OrderSqs(SqsRecordModel): + body: Order + + # auto transform json string + # so Pydantic can auto-initialize nested Order model + @validator("body", pre=True) + def transform_body_to_dict(cls, value: str): + return json.loads(value) + + def record_handler(record: OrderSqs): + if "fail" in record.body.item["type"]: + raise Exception("Failed to process record.") + return record.body.item + + order_event = order_event_factory({"type": "success"}) + order_event_fail = order_event_factory({"type": "fail"}) + first_record = sqs_event_factory(order_event_fail) + second_record = sqs_event_factory(order_event) + records = [first_record, second_record] + + # WHEN + processor = BatchProcessor(event_type=EventType.SQS, model=OrderSqs) + with processor(records, record_handler) as batch: + batch.process() + + # THEN + assert len(batch.fail_messages) == 1 + assert batch.response() == {"batchItemFailures": [{"itemIdentifier": first_record["messageId"]}]} + + +def test_batch_processor_dynamodb_context_model(dynamodb_event_factory, order_event_factory): + # GIVEN + class Order(BaseModel): + item: dict + + class OrderDynamoDB(BaseModel): + Message: Order + + # auto transform json string + # so Pydantic can auto-initialize nested Order model + @validator("Message", pre=True) + def transform_message_to_dict(cls, value: Dict[Literal["S"], str]): + return json.loads(value["S"]) + + class OrderDynamoDBChangeRecord(DynamoDBStreamChangedRecordModel): + NewImage: Optional[OrderDynamoDB] + OldImage: Optional[OrderDynamoDB] + + class OrderDynamoDBRecord(DynamoDBStreamRecordModel): + dynamodb: OrderDynamoDBChangeRecord + + def record_handler(record: OrderDynamoDBRecord): + return record.dynamodb.NewImage.Message.item + + order_event = order_event_factory({"type": "success"}) + first_record = dynamodb_event_factory(order_event) + second_record = dynamodb_event_factory(order_event) + records = [first_record, second_record] + + # WHEN + processor = BatchProcessor(event_type=EventType.DynamoDBStreams, model=OrderDynamoDBRecord) + with processor(records, record_handler) as batch: + processed_messages = batch.process() + + # THEN + order_item = json.loads(order_event)["item"] + assert processed_messages == [ + ("success", order_item, first_record), + ("success", order_item, second_record), + ] + + assert batch.response() == {"batchItemFailures": []} + + +def test_batch_processor_dynamodb_context_model_with_failure(dynamodb_event_factory, order_event_factory): + # GIVEN + class Order(BaseModel): + item: dict + + class OrderDynamoDB(BaseModel): + Message: Order + + # auto transform json string + # so Pydantic can auto-initialize nested Order model + @validator("Message", pre=True) + def transform_message_to_dict(cls, value: Dict[Literal["S"], str]): + return json.loads(value["S"]) + + class OrderDynamoDBChangeRecord(DynamoDBStreamChangedRecordModel): + NewImage: Optional[OrderDynamoDB] + OldImage: Optional[OrderDynamoDB] + + class OrderDynamoDBRecord(DynamoDBStreamRecordModel): + dynamodb: OrderDynamoDBChangeRecord + + def record_handler(record: OrderDynamoDBRecord): + if "fail" in record.dynamodb.NewImage.Message.item["type"]: + raise Exception("Failed to process record.") + return record.dynamodb.NewImage.Message.item + + order_event = order_event_factory({"type": "success"}) + order_event_fail = order_event_factory({"type": "fail"}) + first_record = dynamodb_event_factory(order_event_fail) + second_record = dynamodb_event_factory(order_event) + records = [first_record, second_record] + + # WHEN + processor = BatchProcessor(event_type=EventType.DynamoDBStreams, model=OrderDynamoDBRecord) + with processor(records, record_handler) as batch: + batch.process() + + # THEN + assert len(batch.fail_messages) == 1 + assert batch.response() == {"batchItemFailures": [{"itemIdentifier": first_record["dynamodb"]["SequenceNumber"]}]} + + +def test_batch_processor_kinesis_context_parser_model(kinesis_event_factory, order_event_factory): + # GIVEN + class Order(BaseModel): + item: dict + + class OrderKinesisPayloadRecord(KinesisDataStreamRecordPayload): + data: Order + + # auto transform json string + # so Pydantic can auto-initialize nested Order model + @validator("data", pre=True) + def transform_message_to_dict(cls, value: str): + # Powertools KinesisDataStreamRecordModel + return json.loads(value) + + class OrderKinesisRecord(KinesisDataStreamRecordModel): + kinesis: OrderKinesisPayloadRecord + + def record_handler(record: OrderKinesisRecord): + return record.kinesis.data.item + + order_event = order_event_factory({"type": "success"}) + first_record = kinesis_event_factory(order_event) + second_record = kinesis_event_factory(order_event) + records = [first_record, second_record] + + # WHEN + processor = BatchProcessor(event_type=EventType.KinesisDataStreams, model=OrderKinesisRecord) + with processor(records, record_handler) as batch: + processed_messages = batch.process() + + # THEN + order_item = json.loads(order_event)["item"] + assert processed_messages == [ + ("success", order_item, first_record), + ("success", order_item, second_record), + ] + + assert batch.response() == {"batchItemFailures": []} + + +def test_batch_processor_kinesis_context_parser_model_with_failure(kinesis_event_factory, order_event_factory): + # GIVEN + class Order(BaseModel): + item: dict + + class OrderKinesisPayloadRecord(KinesisDataStreamRecordPayload): + data: Order + + # auto transform json string + # so Pydantic can auto-initialize nested Order model + @validator("data", pre=True) + def transform_message_to_dict(cls, value: str): + # Powertools KinesisDataStreamRecordModel + return json.loads(value) + + class OrderKinesisRecord(KinesisDataStreamRecordModel): + kinesis: OrderKinesisPayloadRecord + + def record_handler(record: OrderKinesisRecord): + if "fail" in record.kinesis.data.item["type"]: + raise Exception("Failed to process record.") + return record.kinesis.data.item + + order_event = order_event_factory({"type": "success"}) + order_event_fail = order_event_factory({"type": "fail"}) + + first_record = kinesis_event_factory(order_event_fail) + second_record = kinesis_event_factory(order_event) + records = [first_record, second_record] + + # WHEN + processor = BatchProcessor(event_type=EventType.KinesisDataStreams, model=OrderKinesisRecord) + with processor(records, record_handler) as batch: + batch.process() + + # THEN + assert len(batch.fail_messages) == 1 + assert batch.response() == {"batchItemFailures": [{"itemIdentifier": first_record["kinesis"]["sequenceNumber"]}]} From 77a7ab5fb6fb3f83e0a37434ed42ccc15a9b3915 Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Fri, 17 Dec 2021 15:33:12 +0100 Subject: [PATCH 18/32] chore: remove leftovers --- aws_lambda_powertools/utilities/batch/base.py | 1 - 1 file changed, 1 deletion(-) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 56d0bf755f5..582068b01f7 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -213,7 +213,6 @@ def __init__(self, event_type: EventType, model: Optional["BatchTypeModels"] = N EventType.KinesisDataStreams: KinesisStreamRecord, EventType.DynamoDBStreams: DynamoDBRecord, } - self._EVENT_ID_MAPPING = {} super().__init__() From 11ab82518a6b6c6bf95c8d5548c75fa1c6b79c5c Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Sun, 19 Dec 2021 13:20:48 +0100 Subject: [PATCH 19/32] docs: new BatchProcessor for SQS, Kinesis, DynamoDB --- docs/utilities/batch.md | 645 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 619 insertions(+), 26 deletions(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 56ab160e9f9..023edd8c0bb 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -1,33 +1,626 @@ --- -title: SQS Batch Processing +title: Batch Processing description: Utility --- -The SQS batch processing utility provides a way to handle partial failures when processing batches of messages from SQS. +The batch processing utility handles partial failures when processing batches from Amazon SQS, Amazon Kinesis Data Streams, and Amazon DynamoDB Streams. ## Key Features -* Prevent successfully processed messages being returned to SQS -* Simple interface for individually processing messages from a batch -* Build your own batch processor using the base classes +* Reports batch item failures to reduce number of retries for a record upon errors +* Simple interface to process each batch record +* Integrates with [Event Source Data Classes](./data_classes.md){target="_blank} and [Parser (Pydantic)](parser.md){target="_blank} for self-documenting record schema +* Build your own batch processor by extending primitives ## Background -When using SQS as a Lambda event source mapping, Lambda functions are triggered with a batch of messages from SQS. +When using SQS, Kinesis Data Streams, or DynamoDB Streams as a Lambda event source, your Lambda functions are triggered with a batch of messages. -If your function fails to process any message from the batch, the entire batch returns to your SQS queue, and your Lambda function is triggered with the same batch one more time. +If your function fails to process any message from the batch, the entire batch returns to your queue or stream. This same batch is then retried until either condition happens first: **a)** your Lambda function returns a successful response, **b)** record reaches maximum retry attempts, or **c)** when records expire. -With this utility, messages within a batch are handled individually - only messages that were not successfully processed -are returned to the queue. +With this utility, batch records are processed individually – only messages that failed to be processed return to the queue or stream for a further retry. This works when two mechanisms are in place: -!!! warning - While this utility lowers the chance of processing messages more than once, it is not guaranteed. We recommend implementing processing logic in an idempotent manner wherever possible. +1. `ReportBatchItemFailures` is set in your SQS, Kinesis, or DynamoDB event source properties +2. [A specific response](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#sqs-batchfailurereporting-syntax){target="_blank"} is returned so Lambda knows which records should not be deleted during partial responses + +!!! warning "This utility lowers the chance of processing records more than once; it does not guarantee it" + We recommend implementing processing logic in an [idempotent manner](idempotency.md){target="_blank"} wherever possible. - More details on how Lambda works with SQS can be found in the [AWS documentation](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html) + You can find more details on how Lambda works with either [SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html){target="_blank"}, [Kinesis](https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html){target="_blank"}, or [DynamoDB](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html){target="_blank"} in the AWS Documentation. ## Getting started -### IAM Permissions +Regardless whether you're using SQS, Kinesis Data Streams or DynamoDB Streams, you must configure your Lambda function event source to use ``ReportBatchItemFailures`. + +### Required resources + +The remaining sections of the documentation will rely on these samples. For completeness, this demonstrates IAM permissions and Dead Letter Queue where batch records will be sent after 2 retries were attempted. + + +=== "SQS" + + ```yaml title="template.yaml" hl_lines="31-32" + AWSTemplateFormatVersion: '2010-09-09' + Transform: AWS::Serverless-2016-10-31 + Description: partial batch response sample + + Globals: + Function: + Timeout: 5 + MemorySize: 256 + Runtime: python3.8 + Tracing: Active + Environment: + Variables: + LOG_LEVEL: INFO + POWERTOOLS_SERVICE_NAME: hello + + Resources: + HelloWorldFunction: + Type: AWS::Serverless::Function + Properties: + Handler: app.lambda_handler + CodeUri: hello_world + Policies: + - SQSPollerPolicy: + QueueName: !GetAtt SampleQueue.QueueName + Events: + Batch: + Type: SQS + Properties: + Queue: !GetAtt SampleQueue.Arn + FunctionResponseTypes: + - ReportBatchItemFailures + + SampleDLQ: + Type: AWS::SQS::Queue + + SampleQueue: + Type: AWS::SQS::Queue + Properties: + VisibilityTimeout: 30 # Fn timeout * 6 + RedrivePolicy: + maxReceiveCount: 2 + deadLetterTargetArn: !GetAtt SampleDLQ.Arn + ``` + +=== "Kinesis Data Streams" + + ```yaml title="template.yaml" hl_lines="44-45" + AWSTemplateFormatVersion: '2010-09-09' + Transform: AWS::Serverless-2016-10-31 + Description: partial batch response sample + + Globals: + Function: + Timeout: 5 + MemorySize: 256 + Runtime: python3.8 + Tracing: Active + Environment: + Variables: + LOG_LEVEL: INFO + POWERTOOLS_SERVICE_NAME: hello + + Resources: + HelloWorldFunction: + Type: AWS::Serverless::Function + Properties: + Handler: app.lambda_handler + CodeUri: hello_world + Policies: + # Lambda Destinations require additional permissions + # to send failure records to DLQ from Kinesis/DynamoDB + - Version: "2012-10-17" + Statement: + Effect: "Allow" + Action: + - sqs:GetQueueAttributes + - sqs:GetQueueUrl + - sqs:SendMessage + Resource: !GetAtt SampleDLQ.Arn + Events: + KinesisStream: + Type: Kinesis + Properties: + Stream: !GetAtt SampleStream.Arn + BatchSize: 100 + StartingPosition: LATEST + MaximumRetryAttempts: 2 + DestinationConfig: + OnFailure: + Destination: !GetAtt SampleDLQ.Arn + FunctionResponseTypes: + - ReportBatchItemFailures + + SampleDLQ: + Type: AWS::SQS::Queue + + SampleStream: + Type: AWS::Kinesis::Stream + Properties: + ShardCount: 1 + ``` + +=== "DynamoDB Streams" + + ```yaml title="template.yaml" hl_lines="43-44" + AWSTemplateFormatVersion: '2010-09-09' + Transform: AWS::Serverless-2016-10-31 + Description: partial batch response sample + + Globals: + Function: + Timeout: 5 + MemorySize: 256 + Runtime: python3.8 + Tracing: Active + Environment: + Variables: + LOG_LEVEL: INFO + POWERTOOLS_SERVICE_NAME: hello + + Resources: + HelloWorldFunction: + Type: AWS::Serverless::Function + Properties: + Handler: app.lambda_handler + CodeUri: hello_world + Policies: + # Lambda Destinations require additional permissions + # to send failure records from Kinesis/DynamoDB + - Version: "2012-10-17" + Statement: + Effect: "Allow" + Action: + - sqs:GetQueueAttributes + - sqs:GetQueueUrl + - sqs:SendMessage + Resource: !GetAtt SampleDLQ.Arn + Events: + DynamoDBStream: + Type: DynamoDB + Properties: + Stream: !GetAtt SampleTable.StreamArn + StartingPosition: LATEST + MaximumRetryAttempts: 2 + DestinationConfig: + OnFailure: + Destination: !GetAtt SampleDLQ.Arn + FunctionResponseTypes: + - ReportBatchItemFailures + + SampleDLQ: + Type: AWS::SQS::Queue + + SampleTable: + Type: AWS::DynamoDB::Table + Properties: + BillingMode: PAY_PER_REQUEST + AttributeDefinitions: + - AttributeName: pk + AttributeType: S + - AttributeName: sk + AttributeType: S + KeySchema: + - AttributeName: pk + KeyType: HASH + - AttributeName: sk + KeyType: RANGE + SSESpecification: + SSEEnabled: yes + StreamSpecification: + StreamViewType: NEW_AND_OLD_IMAGES + + ``` + +### Processing SQS batch + +Processing batches from SQS works in four stages: + +1. Instantiate **`BatchProcessor`** and choose **`EventType.SQS`** for the event type +2. Define your function to handle each batch record, and use [`SQSRecord`](data_classes.md#sqs){target="_blank"} type annotation for autocompletion +3. Use either **`batch_processor`** decorator or your instantiated processor as a context manager to kick off processing +4. Return the appropriate response contract to Lambda via **`.response()`** processor method + +!!! info "This code example optionally uses Tracer and Logger for completion" + +=== "As a decorator" + + ```python hl_lines="4-5 9 15 23 25" + import json + + from aws_lambda_powertools import Logger, Tracer + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord + from aws_lambda_powertools.utilities.typing import LambdaContext + + + processor = BatchProcessor(event_type=EventType.SQS) + tracer = Tracer() + logger = Logger() + + + @tracer.capture_method + def record_handler(record: SQSRecord): + payload: str = record.body + if payload: + item: dict = json.loads(payload) + ... + + @logger.inject_lambda_context + @tracer.capture_lambda_handler + @batch_processor(record_handler=record_handler, processor=processor) + def lambda_handler(event, context: LambdaContext): + return processor.response() + ``` + +=== "As a context manager" + + ```python hl_lines="4-5 9 15 24-26 28" + import json + + from aws_lambda_powertools import Logger, Tracer + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord + from aws_lambda_powertools.utilities.typing import LambdaContext + + + processor = BatchProcessor(event_type=EventType.SQS) + tracer = Tracer() + logger = Logger() + + + @tracer.capture_method + def record_handler(record: SQSRecord): + payload: str = record.body + if payload: + item: dict = json.loads(payload) + ... + + @logger.inject_lambda_context + @tracer.capture_lambda_handler + def lambda_handler(event, context: LambdaContext): + batch = event["Records"] + with processor(records=batch, processor=processor): + processed_messages = processor.process() # kick off processing, return list[tuple] + + return processor.response() + ``` + +=== "Sample response" + + The second record failed to be processed, therefore the processor added its message ID in the response. + + ```python + { + 'batchItemFailures': [ + { + 'itemIdentifier': '244fc6b4-87a3-44ab-83d2-361172410c3a' + } + ] + } + ``` + +=== "Sample event" + + ```json + { + "Records": [ + { + "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", + "body": "{\"Message\": \"success\"}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue", + "awsRegion": "us-east-1" + }, + { + "messageId": "244fc6b4-87a3-44ab-83d2-361172410c3a", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", + "body": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue", + "awsRegion": "us-east-1" + } + ] + } + ``` + +### Processing Kinesis batch + +Processing batches from Kinesis works in four stages: + +1. Instantiate **`BatchProcessor`** and choose **`EventType.KinesisDataStreams`** for the event type +2. Define your function to handle each batch record, and use [`KinesisStreamRecord`](data_classes.md#kinesis-streams){target="_blank"} type annotation for autocompletion +3. Use either **`batch_processor`** decorator or your instantiated processor as a context manager to kick off processing +4. Return the appropriate response contract to Lambda via **`.response()`** processor method + +!!! info "This code example optionally uses Tracer and Logger for completion" + +=== "As a decorator" + + ```python hl_lines="4-5 9 15 22 24" + import json + + from aws_lambda_powertools import Logger, Tracer + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord + from aws_lambda_powertools.utilities.typing import LambdaContext + + + processor = BatchProcessor(event_type=EventType.KinesisDataStreams) + tracer = Tracer() + logger = Logger() + + + @tracer.capture_method + def record_handler(record: KinesisStreamRecord): + logger.info(record.kinesis.data_as_text) + payload: dict = record.kinesis.data_as_json() + ... + + @logger.inject_lambda_context + @tracer.capture_lambda_handler + @batch_processor(record_handler=record_handler, processor=processor) + def lambda_handler(event, context: LambdaContext): + return processor.response() + ``` + +=== "As a context manager" + + ```python hl_lines="4-5 9 15 23-25 27" + import json + + from aws_lambda_powertools import Logger, Tracer + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord + from aws_lambda_powertools.utilities.typing import LambdaContext + + + processor = BatchProcessor(event_type=EventType.KinesisDataStreams) + tracer = Tracer() + logger = Logger() + + + @tracer.capture_method + def record_handler(record: KinesisStreamRecord): + logger.info(record.kinesis.data_as_text) + payload: dict = record.kinesis.data_as_json() + ... + + @logger.inject_lambda_context + @tracer.capture_lambda_handler + def lambda_handler(event, context: LambdaContext): + batch = event["Records"] + with processor(records=batch, processor=processor): + processed_messages = processor.process() # kick off processing, return list[tuple] + + return processor.response() + ``` + +=== "Sample response" + + The second record failed to be processed, therefore the processor added its sequence number in the response. + + ```python + { + 'batchItemFailures': [ + { + 'itemIdentifier': '6006958808509702859251049540584488075644979031228738' + } + ] + } + ``` + + +=== "Sample event" + + ```json + { + "Records": [ + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "4107859083838847772757075850904226111829882106684065", + "data": "eyJNZXNzYWdlIjogInN1Y2Nlc3MifQ==", + "approximateArrivalTimestamp": 1545084650.987 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:4107859083838847772757075850904226111829882106684065", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + }, + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "1", + "sequenceNumber": "6006958808509702859251049540584488075644979031228738", + "data": "c3VjY2Vzcw==", + "approximateArrivalTimestamp": 1545084650.987 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000006:6006958808509702859251049540584488075644979031228738", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", + "awsRegion": "us-east-2", + "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" + } + ] + } + ``` + + +### Processing DynamoDB batch + +Processing batches from Kinesis works in four stages: + +1. Instantiate **`BatchProcessor`** and choose **`EventType.DynamoDBStreams`** for the event type +2. Define your function to handle each batch record, and use [`DynamoDBRecord`](data_classes.md#dynamodb-streams){target="_blank"} type annotation for autocompletion +3. Use either **`batch_processor`** decorator or your instantiated processor as a context manager to kick off processing +4. Return the appropriate response contract to Lambda via **`.response()`** processor method + +!!! info "This code example optionally uses Tracer and Logger for completion" + +=== "As a decorator" + + ```python hl_lines="4-5 9 15 25 27" + import json + + from aws_lambda_powertools import Logger, Tracer + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord + from aws_lambda_powertools.utilities.typing import LambdaContext + + + processor = BatchProcessor(event_type=EventType.DynamoDBStreams) + tracer = Tracer() + logger = Logger() + + + @tracer.capture_method + def record_handler(record: DynamoDBRecord): + logger.info(record.dynamodb.new_image) + payload: dict = json.loads(record.dynamodb.new_image.get("Message").get_value) + # alternatively: + # changes: Dict[str, dynamo_db_stream_event.AttributeValue] = record.dynamodb.new_image + # payload = change.get("Message").raw_event -> {"S": "<payload>"} + ... + + @logger.inject_lambda_context + @tracer.capture_lambda_handler + @batch_processor(record_handler=record_handler, processor=processor) + def lambda_handler(event, context: LambdaContext): + return processor.response() + ``` + +=== "As a context manager" + + ```python hl_lines="4-5 9 15 26-28 30" + import json + + from aws_lambda_powertools import Logger, Tracer + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord + from aws_lambda_powertools.utilities.typing import LambdaContext + + + processor = BatchProcessor(event_type=EventType.DynamoDBStreams) + tracer = Tracer() + logger = Logger() + + + @tracer.capture_method + def record_handler(record: DynamoDBRecord): + logger.info(record.dynamodb.new_image) + payload: dict = json.loads(record.dynamodb.new_image.get("item").s_value) + # alternatively: + # changes: Dict[str, dynamo_db_stream_event.AttributeValue] = record.dynamodb.new_image + # payload = change.get("Message").raw_event -> {"S": "<payload>"} + ... + + @logger.inject_lambda_context + @tracer.capture_lambda_handler + def lambda_handler(event, context: LambdaContext): + batch = event["Records"] + with processor(records=batch, processor=processor): + processed_messages = processor.process() # kick off processing, return list[tuple] + + return processor.response() + ``` + +=== "Sample response" + + The second record failed to be processed, therefore the processor added its sequence number in the response. + + ```python + { + 'batchItemFailures': [ + { + 'itemIdentifier': '8640712661' + } + ] + } + ``` + + +=== "Sample event" + + ```json + { + "Records": [ + { + "eventID": "1", + "eventVersion": "1.0", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "Message": { + "S": "failure" + } + }, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "3275880929", + "SizeBytes": 26 + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "eventsource_arn", + "eventSource": "aws:dynamodb" + }, + { + "eventID": "1", + "eventVersion": "1.0", + "dynamodb": { + "Keys": { + "Id": { + "N": "101" + } + }, + "NewImage": { + "SomethingElse": { + "S": "success" + } + }, + "StreamViewType": "NEW_AND_OLD_IMAGES", + "SequenceNumber": "8640712661", + "SizeBytes": 26 + }, + "awsRegion": "us-west-2", + "eventName": "INSERT", + "eventSourceARN": "eventsource_arn", + "eventSource": "aws:dynamodb" + } + ] + } + ``` + +<!-- ### IAM Permissions Before your use this utility, your AWS Lambda function must have `sqs:DeleteMessageBatch` permission to delete successful messages directly from the queue. @@ -93,9 +686,9 @@ You need to create a function to handle each record from the batch - We call it !!! tip **Any non-exception/successful return from your record handler function** will instruct both decorator and context manager to queue up each individual message for deletion. - If the entire batch succeeds, we let Lambda to proceed in deleting the records from the queue for cost reasons. + If the entire batch succeeds, we let Lambda to proceed in deleting the records from the queue for cost reasons. --> -### Partial failure mechanics +<!-- ### Partial failure mechanics All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch: @@ -105,11 +698,11 @@ All records in the batch will be passed to this handler for processing, even if !!! warning You will not have accessed to the **processed messages** within the Lambda Handler. - All processing logic will and should be performed by the `record_handler` function. + All processing logic will and should be performed by the `record_handler` function. --> ## Advanced -### Choosing between decorator and context manager +<!-- ### Choosing between decorator and context manager They have nearly the same behaviour when it comes to processing messages from the batch: @@ -118,9 +711,9 @@ They have nearly the same behaviour when it comes to processing messages from th * **1)** Delete successfully processed messages from the queue by directly calling `sqs:DeleteMessageBatch` * **2)** Raise `SQSBatchProcessingError` to ensure failed messages return to your SQS queue -The only difference is that **PartialSQSProcessor** will give you access to processed messages if you need. +The only difference is that **PartialSQSProcessor** will give you access to processed messages if you need. --> -### Accessing processed messages +<!-- ### Accessing processed messages Use `PartialSQSProcessor` context manager to access a list of all return values from your `record_handler` function. @@ -141,9 +734,9 @@ Use `PartialSQSProcessor` context manager to access a list of all return values result = proc.process() # Returns a list of all results from record_handler return result - ``` + ``` --> -### Customizing boto configuration +<!-- ### Customizing boto configuration The **`config`** and **`boto3_session`** parameters enable you to pass in a custom [botocore config object](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html) or a custom [boto3 session](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html) when using the `sqs_batch_processor` @@ -241,9 +834,9 @@ decorator or `PartialSQSProcessor` class. result = processor.process() return result - ``` + ``` --> -### Suppressing exceptions +<!-- ### Suppressing exceptions If you want to disable the default behavior where `SQSBatchProcessingError` is raised if there are any errors, you can pass the `suppress_exception` boolean argument. @@ -266,9 +859,9 @@ If you want to disable the default behavior where `SQSBatchProcessingError` is r with processor(records, record_handler): result = processor.process() - ``` + ``` --> -### Create your own partial processor +<!-- ### Create your own partial processor You can create your own partial batch processor by inheriting the `BasePartialProcessor` class, and implementing `_prepare()`, `_clean()` and `_process_record()`. @@ -361,4 +954,4 @@ When using Sentry.io for error monitoring, you can override `failure_handler` to capture_exception() # send exception to Sentry logger.exception("got exception while processing SQS message") return super().failure_handler(record, exception) # type: ignore - ``` + ``` --> From 0d5d24ef284cf5e4b38415f12fab6fdc8a3a509c Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Sun, 19 Dec 2021 13:42:01 +0100 Subject: [PATCH 20/32] fix: ensure BatchProcessorError is raised when entire batch fails --- aws_lambda_powertools/utilities/batch/base.py | 16 ++++++++++ .../utilities/batch/exceptions.py | 21 ++++++++++++ tests/functional/test_utilities_batch.py | 32 ++++++++++++++++--- 3 files changed, 64 insertions(+), 5 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 582068b01f7..e5c96b39129 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -12,6 +12,7 @@ from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union, overload from aws_lambda_powertools.middleware_factory import lambda_handler_decorator +from aws_lambda_powertools.utilities.batch.exceptions import BatchProcessingError from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord @@ -199,6 +200,11 @@ def __init__(self, event_type: EventType, model: Optional["BatchTypeModels"] = N Whether this is a SQS, DynamoDB Streams, or Kinesis Data Stream event model: Optional["BatchTypeModels"] Parser's data model using either SqsRecordModel, DynamoDBStreamRecordModel, KinesisDataStreamRecord + + Exceptions + ---------- + BatchProcessingError + Raised when the entire batch has failed processing """ self.event_type = event_type self.model = model @@ -252,6 +258,13 @@ def _clean(self): if not self._has_messages_to_report(): return + if self._entire_batch_failed(): + raise BatchProcessingError( + msg=f"All records failed processing. {len(self.exceptions)} individual errors logged" + f"separately below.", + child_exceptions=self.exceptions, + ) + messages = self._get_messages_to_report() self.batch_response = {"batchItemFailures": [messages]} @@ -262,6 +275,9 @@ def _has_messages_to_report(self) -> bool: logger.debug(f"All {len(self.success_messages)} records successfully processed") return False + def _entire_batch_failed(self) -> bool: + return len(self.exceptions) == len(self.records) + def _get_messages_to_report(self) -> Dict[str, str]: """ Format messages to use in batch deletion diff --git a/aws_lambda_powertools/utilities/batch/exceptions.py b/aws_lambda_powertools/utilities/batch/exceptions.py index c2ead04a7b1..7c5b6edb45e 100644 --- a/aws_lambda_powertools/utilities/batch/exceptions.py +++ b/aws_lambda_powertools/utilities/batch/exceptions.py @@ -23,3 +23,24 @@ def __str__(self): exception_list.append(formatted) return "\n".join(exception_list) + + +class BatchProcessingError(Exception): + """When batch messages could not be processed""" + + def __init__(self, msg="", child_exceptions=()): + super().__init__(msg) + self.msg = msg + self.child_exceptions = child_exceptions + + # Overriding this method so we can output all child exception tracebacks when we raise this exception to prevent + # errors being lost. See https://github.com/awslabs/aws-lambda-powertools-python/issues/275 + def __str__(self): + parent_exception_str = super(BatchProcessingError, self).__str__() + exception_list = [f"{parent_exception_str}\n"] + for exception in self.child_exceptions: + extype, ex, tb = exception + formatted = "".join(traceback.format_exception(extype, ex, tb)) + exception_list.append(formatted) + + return "\n".join(exception_list) diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index 929a5bbff19..fa2b27f0ec3 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -7,9 +7,14 @@ from botocore.config import Config from botocore.stub import Stubber -from aws_lambda_powertools.utilities.batch import PartialSQSProcessor, batch_processor, sqs_batch_processor -from aws_lambda_powertools.utilities.batch.base import BatchProcessor, EventType -from aws_lambda_powertools.utilities.batch.exceptions import SQSBatchProcessingError +from aws_lambda_powertools.utilities.batch import ( + BatchProcessor, + EventType, + PartialSQSProcessor, + batch_processor, + sqs_batch_processor, +) +from aws_lambda_powertools.utilities.batch.exceptions import BatchProcessingError, SQSBatchProcessingError from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord @@ -559,7 +564,7 @@ def test_batch_processor_dynamodb_context_with_failure(dynamodb_event_factory, d def test_batch_processor_dynamodb_middleware_with_failure(dynamodb_event_factory, dynamodb_record_handler): # GIVEN first_record = dynamodb_event_factory("failure") - second_record = dynamodb_event_factory("success") + second_record = dynamodb_event_factory("failure") event = {"Records": [first_record, second_record]} processor = BatchProcessor(event_type=EventType.DynamoDBStreams) @@ -745,7 +750,7 @@ class OrderKinesisPayloadRecord(KinesisDataStreamRecordPayload): # so Pydantic can auto-initialize nested Order model @validator("data", pre=True) def transform_message_to_dict(cls, value: str): - # Powertools KinesisDataStreamRecordModel + # Powertools KinesisDataStreamRecordModel already decodes b64 to str here return json.loads(value) class OrderKinesisRecord(KinesisDataStreamRecordModel): @@ -812,3 +817,20 @@ def record_handler(record: OrderKinesisRecord): # THEN assert len(batch.fail_messages) == 1 assert batch.response() == {"batchItemFailures": [{"itemIdentifier": first_record["kinesis"]["sequenceNumber"]}]} + + +def test_batch_processor_error_when_entire_batch_fails(sqs_event_factory, record_handler): + # GIVEN + first_record = SQSRecord(sqs_event_factory("fail")) + second_record = SQSRecord(sqs_event_factory("fail")) + event = {"Records": [first_record.raw_event, second_record.raw_event]} + + processor = BatchProcessor(event_type=EventType.SQS) + + @batch_processor(record_handler=record_handler, processor=processor) + def lambda_handler(event, context): + return processor.response() + + # WHEN/THEN + with pytest.raises(BatchProcessingError): + lambda_handler(event, {}) From e1dc4cfa47a0f0b8f8980512144e771a97fb9af9 Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Sun, 19 Dec 2021 13:58:29 +0100 Subject: [PATCH 21/32] fix: exception leftover --- tests/functional/test_utilities_batch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/functional/test_utilities_batch.py b/tests/functional/test_utilities_batch.py index fa2b27f0ec3..cd6fc67ea15 100644 --- a/tests/functional/test_utilities_batch.py +++ b/tests/functional/test_utilities_batch.py @@ -564,7 +564,7 @@ def test_batch_processor_dynamodb_context_with_failure(dynamodb_event_factory, d def test_batch_processor_dynamodb_middleware_with_failure(dynamodb_event_factory, dynamodb_record_handler): # GIVEN first_record = dynamodb_event_factory("failure") - second_record = dynamodb_event_factory("failure") + second_record = dynamodb_event_factory("success") event = {"Records": [first_record, second_record]} processor = BatchProcessor(event_type=EventType.DynamoDBStreams) From cf3b01a12f829e8e136918eed0fd5ab26d33bf05 Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Sun, 19 Dec 2021 14:30:18 +0100 Subject: [PATCH 22/32] chore: cleanup exceptions --- .../utilities/batch/exceptions.py | 39 +++++++++---------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/exceptions.py b/aws_lambda_powertools/utilities/batch/exceptions.py index 7c5b6edb45e..fe51433a5d6 100644 --- a/aws_lambda_powertools/utilities/batch/exceptions.py +++ b/aws_lambda_powertools/utilities/batch/exceptions.py @@ -2,20 +2,16 @@ Batch processing exceptions """ import traceback +from typing import Optional, Tuple -class SQSBatchProcessingError(Exception): - """When at least one message within a batch could not be processed""" - +class BaseBatchProcessingError(Exception): def __init__(self, msg="", child_exceptions=()): super().__init__(msg) self.msg = msg self.child_exceptions = child_exceptions - # Overriding this method so we can output all child exception tracebacks when we raise this exception to prevent - # errors being lost. See https://github.com/awslabs/aws-lambda-powertools-python/issues/275 - def __str__(self): - parent_exception_str = super(SQSBatchProcessingError, self).__str__() + def format_exceptions(self, parent_exception_str): exception_list = [f"{parent_exception_str}\n"] for exception in self.child_exceptions: extype, ex, tb = exception @@ -25,22 +21,25 @@ def __str__(self): return "\n".join(exception_list) -class BatchProcessingError(Exception): - """When batch messages could not be processed""" +class SQSBatchProcessingError(BaseBatchProcessingError): + """When at least one message within a batch could not be processed""" - def __init__(self, msg="", child_exceptions=()): - super().__init__(msg) - self.msg = msg - self.child_exceptions = child_exceptions + def __init__(self, msg="", child_exceptions: Optional[Tuple[Exception]] = None): + super().__init__(msg, child_exceptions) # Overriding this method so we can output all child exception tracebacks when we raise this exception to prevent # errors being lost. See https://github.com/awslabs/aws-lambda-powertools-python/issues/275 def __str__(self): - parent_exception_str = super(BatchProcessingError, self).__str__() - exception_list = [f"{parent_exception_str}\n"] - for exception in self.child_exceptions: - extype, ex, tb = exception - formatted = "".join(traceback.format_exception(extype, ex, tb)) - exception_list.append(formatted) + parent_exception_str = super(SQSBatchProcessingError, self).__str__() + return self.format_exceptions(parent_exception_str) - return "\n".join(exception_list) + +class BatchProcessingError(BaseBatchProcessingError): + """When all batch records failed to be processed""" + + def __init__(self, msg="", child_exceptions: Optional[Tuple[Exception]] = None): + super().__init__(msg, child_exceptions) + + def __str__(self): + parent_exception_str = super(BatchProcessingError, self).__str__() + return self.format_exceptions(parent_exception_str) From 3fc3e40c3ad34869e5900522cc6022f9e275ea9a Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Sun, 19 Dec 2021 14:40:19 +0100 Subject: [PATCH 23/32] docs: update mechanics section --- docs/utilities/batch.md | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 023edd8c0bb..594c45aae57 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -620,6 +620,20 @@ Processing batches from Kinesis works in four stages: } ``` +### Partial failure mechanics + +All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch: + +* **All records successfully processed**. We will return an empty list of item failures `{'batchItemFailures': []}` +* **Partial success with some exceptions**. We will return a list of all item IDs/sequence numbers ``{'batchItemFailures': [{"itemIdentifier": "<id>}]}` +* **All records failed to be processed**. We will raise `BatchProcessingError` exception with a list of all exceptions captured + +!!! warning + You will not have access to the **processed messages** within the Lambda Handler; use context manager for that. + + All processing logic will and should be performed by the `record_handler` function. + + <!-- ### IAM Permissions Before your use this utility, your AWS Lambda function must have `sqs:DeleteMessageBatch` permission to delete successful messages directly from the queue. @@ -688,17 +702,6 @@ You need to create a function to handle each record from the batch - We call it If the entire batch succeeds, we let Lambda to proceed in deleting the records from the queue for cost reasons. --> -<!-- ### Partial failure mechanics - -All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch: - -* **Any successfully processed messages**, we will delete them from the queue via `sqs:DeleteMessageBatch` -* **Any unprocessed messages detected**, we will raise `SQSBatchProcessingError` to ensure failed messages return to your SQS queue - -!!! warning - You will not have accessed to the **processed messages** within the Lambda Handler. - - All processing logic will and should be performed by the `record_handler` function. --> ## Advanced From 9ceb74b0171e9a7adce42591a6e788fd42c78adb Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Sun, 19 Dec 2021 14:42:28 +0100 Subject: [PATCH 24/32] docs: update IAM permission --- docs/utilities/batch.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 594c45aae57..739bcdfe1ea 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -32,6 +32,8 @@ With this utility, batch records are processed individually – only messages th Regardless whether you're using SQS, Kinesis Data Streams or DynamoDB Streams, you must configure your Lambda function event source to use ``ReportBatchItemFailures`. +You do not need any additional IAM permissions to use this utility, except for what each event source requires. + ### Required resources The remaining sections of the documentation will rely on these samples. For completeness, this demonstrates IAM permissions and Dead Letter Queue where batch records will be sent after 2 retries were attempted. From be8ab0374e0f67aa35d90f3e45a940cd06477437 Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Sun, 19 Dec 2021 14:43:44 +0100 Subject: [PATCH 25/32] docs: keep old section name for stats --- docs/utilities/batch.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 739bcdfe1ea..4b50e63b6c7 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -216,7 +216,7 @@ The remaining sections of the documentation will rely on these samples. For comp ``` -### Processing SQS batch +### Processing messages from SQS Processing batches from SQS works in four stages: @@ -345,7 +345,7 @@ Processing batches from SQS works in four stages: } ``` -### Processing Kinesis batch +### Processing messages from Kinesis Processing batches from Kinesis works in four stages: @@ -474,7 +474,7 @@ Processing batches from Kinesis works in four stages: ``` -### Processing DynamoDB batch +### Processing messages from DynamoDB Processing batches from Kinesis works in four stages: From 27f29374b792d3c1a2f46e95fd38e65c28a5faa3 Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Sun, 19 Dec 2021 16:34:46 +0100 Subject: [PATCH 26/32] docs: update accessing processed messages section --- .../utilities/batch/__init__.py | 11 ++- aws_lambda_powertools/utilities/batch/base.py | 2 +- docs/utilities/batch.md | 71 +++++++++++++------ 3 files changed, 61 insertions(+), 23 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/__init__.py b/aws_lambda_powertools/utilities/batch/__init__.py index 1ad1cae7141..d708d5f480b 100644 --- a/aws_lambda_powertools/utilities/batch/__init__.py +++ b/aws_lambda_powertools/utilities/batch/__init__.py @@ -4,14 +4,23 @@ Batch processing utility """ -from aws_lambda_powertools.utilities.batch.base import BasePartialProcessor, BatchProcessor, EventType, batch_processor +from aws_lambda_powertools.utilities.batch.base import ( + BasePartialProcessor, + BatchProcessor, + EventType, + FailureResponse, + SuccessResponse, + batch_processor, +) from aws_lambda_powertools.utilities.batch.sqs import PartialSQSProcessor, sqs_batch_processor __all__ = ( "BatchProcessor", "BasePartialProcessor", "EventType", + "FailureResponse", "PartialSQSProcessor", + "SuccessResponse", "batch_processor", "sqs_batch_processor", ) diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index e5c96b39129..943bf7fe1af 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -46,7 +46,7 @@ class EventType(Enum): # When using processor with default arguments, records will carry EventSourceDataClassTypes # and depending on what EventType it's passed it'll correctly map to the right record -# When using Pydantic Models, it'll accept any +# When using Pydantic Models, it'll accept any subclass from SQS, DynamoDB and Kinesis EventSourceDataClassTypes = Union[SQSRecord, KinesisStreamRecord, DynamoDBRecord] BatchEventTypes = Union[EventSourceDataClassTypes, "BatchTypeModels"] SuccessResponse = Tuple[str, Any, BatchEventTypes] diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 4b50e63b6c7..9c5d902fe48 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -707,39 +707,68 @@ You need to create a function to handle each record from the batch - We call it ## Advanced -<!-- ### Choosing between decorator and context manager +### Accessing processed messages -They have nearly the same behaviour when it comes to processing messages from the batch: +Use the context manager to access a list of all returned values from your `record_handler` function. -* **Entire batch has been successfully processed**, where your Lambda handler returned successfully, we will let SQS delete the batch to optimize your cost -* **Entire Batch has been partially processed successfully**, where exceptions were raised within your `record handler`, we will: - * **1)** Delete successfully processed messages from the queue by directly calling `sqs:DeleteMessageBatch` - * **2)** Raise `SQSBatchProcessingError` to ensure failed messages return to your SQS queue +> Signature: List[Tuple[Union[SuccessResponse, FailureResponse]]] -The only difference is that **PartialSQSProcessor** will give you access to processed messages if you need. --> +* **When successful**. We will include a tuple with `success`, the result of `record_handler`, and the batch record +* **When failed**. We will include a tuple with `fail`, exception as a string, and the batch record -<!-- ### Accessing processed messages - -Use `PartialSQSProcessor` context manager to access a list of all return values from your `record_handler` function. === "app.py" - ```python - from aws_lambda_powertools.utilities.batch import PartialSQSProcessor + ```python hl_lines="31-38" + import json - def record_handler(record): - return do_something_with(record["body"]) + from typing import Any, List, Literal, Union - def lambda_handler(event, context): - records = event["Records"] + from aws_lambda_powertools import Logger, Tracer + from aws_lambda_powertools.utilities.batch import (BatchProcessor, + EventType, + FailureResponse, + SuccessResponse, + batch_processor) + from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord + from aws_lambda_powertools.utilities.typing import LambdaContext - processor = PartialSQSProcessor() - with processor(records, record_handler) as proc: - result = proc.process() # Returns a list of all results from record_handler + processor = BatchProcessor(event_type=EventType.SQS) + tracer = Tracer() + logger = Logger() + + + @tracer.capture_method + def record_handler(record: SQSRecord): + payload: str = record.body + if payload: + item: dict = json.loads(payload) + ... + + @logger.inject_lambda_context + @tracer.capture_lambda_handler + def lambda_handler(event, context: LambdaContext): + batch = event["Records"] + with processor(records=batch, processor=processor): + processed_messages: List[Union[SuccessResponse, FailureResponse]] = processor.process() + + for messages in processed_messages: + for message in messages: + status: Union[Literal["success"], Literal["fail"]] = message[0] + result: Any = message[1] + record: SQSRecord = message[2] + + + return processor.response() + ``` + +## FAQ + +### Choosing between decorator and context manager + +Use context manager when you want access to the processed messages or handle `BatchProcessingError` exception when all records within the batch fail to be processed. - return result - ``` --> <!-- ### Customizing boto configuration From 1496b6ff4f595cacb23f05331f1562cd6660dbb8 Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Sun, 19 Dec 2021 16:45:03 +0100 Subject: [PATCH 27/32] docs: update sentry section Signed-off-by: heitorlessa <lessa@amazon.co.uk> --- docs/utilities/batch.md | 56 ++++++++++++++++++++++------------------- 1 file changed, 30 insertions(+), 26 deletions(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 9c5d902fe48..99234657378 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -769,8 +769,33 @@ Use the context manager to access a list of all returned values from your `recor Use context manager when you want access to the processed messages or handle `BatchProcessingError` exception when all records within the batch fail to be processed. +### Integrating exception handling with Sentry.io + +When using Sentry.io for error monitoring, you can override `failure_handler` to capture each processing exception with Sentry SDK: + +> Credits to [Charles-Axel Dein](https://github.com/awslabs/aws-lambda-powertools-python/issues/293#issuecomment-781961732) + +=== "sentry_integration.py" + + ```python hl_lines="4 7-8" + from typing import Tuple + + from aws_lambda_powertools.utilities.batch import BatchProcessor, FailureResponse + from sentry_sdk import capture_exception + + + class MyProcessor(BatchProcessor): + def failure_handler(self, record, exception) -> FailureResponse: + capture_exception() # send exception to Sentry + return super().failure_handler(record, exception) + ``` + + +## Legacy + +!!! tip "This is kept for historical purposes. Use the new [BatchProcessor](#processing-messages-from-sqs) instead. " -<!-- ### Customizing boto configuration +### Customizing boto configuration The **`config`** and **`boto3_session`** parameters enable you to pass in a custom [botocore config object](https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html) or a custom [boto3 session](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html) when using the `sqs_batch_processor` @@ -868,9 +893,9 @@ decorator or `PartialSQSProcessor` class. result = processor.process() return result - ``` --> + ``` -<!-- ### Suppressing exceptions +### Suppressing exceptions If you want to disable the default behavior where `SQSBatchProcessingError` is raised if there are any errors, you can pass the `suppress_exception` boolean argument. @@ -893,9 +918,9 @@ If you want to disable the default behavior where `SQSBatchProcessingError` is r with processor(records, record_handler): result = processor.process() - ``` --> + ``` -<!-- ### Create your own partial processor +### Create your own partial processor You can create your own partial batch processor by inheriting the `BasePartialProcessor` class, and implementing `_prepare()`, `_clean()` and `_process_record()`. @@ -968,24 +993,3 @@ You can then use this class as a context manager, or pass it to `batch_processor def lambda_handler(event, context): return {"statusCode": 200} ``` - -### Integrating exception handling with Sentry.io - -When using Sentry.io for error monitoring, you can override `failure_handler` to include to capture each processing exception: - -> Credits to [Charles-Axel Dein](https://github.com/awslabs/aws-lambda-powertools-python/issues/293#issuecomment-781961732) - -=== "sentry_integration.py" - - ```python hl_lines="4 7-8" - from typing import Tuple - - from aws_lambda_powertools.utilities.batch import PartialSQSProcessor - from sentry_sdk import capture_exception - - class SQSProcessor(PartialSQSProcessor): - def failure_handler(self, record: Event, exception: Tuple) -> Tuple: # type: ignore - capture_exception() # send exception to Sentry - logger.exception("got exception while processing SQS message") - return super().failure_handler(record, exception) # type: ignore - ``` --> From 9057791cadf7f779bd6899ad690415b9d58d6fd4 Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Sun, 19 Dec 2021 17:40:45 +0100 Subject: [PATCH 28/32] docs: add extension, update create own processor --- .../utilities/batch/__init__.py | 2 + aws_lambda_powertools/utilities/batch/base.py | 138 +++++++++++- docs/utilities/batch.md | 201 +++++++++++------- 3 files changed, 257 insertions(+), 84 deletions(-) diff --git a/aws_lambda_powertools/utilities/batch/__init__.py b/aws_lambda_powertools/utilities/batch/__init__.py index d708d5f480b..584342e5fd0 100644 --- a/aws_lambda_powertools/utilities/batch/__init__.py +++ b/aws_lambda_powertools/utilities/batch/__init__.py @@ -8,6 +8,7 @@ BasePartialProcessor, BatchProcessor, EventType, + ExceptionInfo, FailureResponse, SuccessResponse, batch_processor, @@ -17,6 +18,7 @@ __all__ = ( "BatchProcessor", "BasePartialProcessor", + "ExceptionInfo", "EventType", "FailureResponse", "PartialSQSProcessor", diff --git a/aws_lambda_powertools/utilities/batch/base.py b/aws_lambda_powertools/utilities/batch/base.py index 943bf7fe1af..02eb00ffaed 100644 --- a/aws_lambda_powertools/utilities/batch/base.py +++ b/aws_lambda_powertools/utilities/batch/base.py @@ -30,8 +30,8 @@ class EventType(Enum): # type specifics # has_pydantic = "pydantic" in sys.modules -_ExcInfo = Tuple[Type[BaseException], BaseException, TracebackType] -_OptExcInfo = Union[_ExcInfo, Tuple[None, None, None]] +ExceptionInfo = Tuple[Type[BaseException], BaseException, TracebackType] +OptExcInfo = Union[ExceptionInfo, Tuple[None, None, None]] # For IntelliSense and Mypy to work, we need to account for possible SQS, Kinesis and DynamoDB subclasses # We need them as subclasses as we must access their message ID or sequence number metadata via dot notation @@ -114,24 +114,38 @@ def __call__(self, records: List[dict], handler: Callable): def success_handler(self, record, result: Any) -> SuccessResponse: """ - Success callback + Keeps track of batch records that were processed successfully + + Parameters + ---------- + record: Any + record that failed processing + result: Any + result from record handler Returns ------- - tuple + SuccessResponse "success", result, original record """ entry = ("success", result, record) self.success_messages.append(record) return entry - def failure_handler(self, record, exception: _OptExcInfo) -> FailureResponse: + def failure_handler(self, record, exception: OptExcInfo) -> FailureResponse: """ - Failure callback + Keeps track of batch records that failed processing + + Parameters + ---------- + record: Any + record that failed processing + exception: OptExcInfo + Exception information containing type, value, and traceback (sys.exc_info()) Returns ------- - tuple + FailureResponse "fail", exceptions args, original record """ exception_string = f"{exception[0]}:{exception[1]}" @@ -189,6 +203,114 @@ def batch_processor( class BatchProcessor(BasePartialProcessor): + """Process native partial responses from SQS, Kinesis Data Streams, and DynamoDB. + + + Example + ------- + + ## Process batch triggered by SQS + + ```python + import json + + from aws_lambda_powertools import Logger, Tracer + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord + from aws_lambda_powertools.utilities.typing import LambdaContext + + + processor = BatchProcessor(event_type=EventType.SQS) + tracer = Tracer() + logger = Logger() + + + @tracer.capture_method + def record_handler(record: SQSRecord): + payload: str = record.body + if payload: + item: dict = json.loads(payload) + ... + + @logger.inject_lambda_context + @tracer.capture_lambda_handler + @batch_processor(record_handler=record_handler, processor=processor) + def lambda_handler(event, context: LambdaContext): + return processor.response() + ``` + + ## Process batch triggered by Kinesis Data Streams + + ```python + import json + + from aws_lambda_powertools import Logger, Tracer + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import KinesisStreamRecord + from aws_lambda_powertools.utilities.typing import LambdaContext + + + processor = BatchProcessor(event_type=EventType.KinesisDataStreams) + tracer = Tracer() + logger = Logger() + + + @tracer.capture_method + def record_handler(record: KinesisStreamRecord): + logger.info(record.kinesis.data_as_text) + payload: dict = record.kinesis.data_as_json() + ... + + @logger.inject_lambda_context + @tracer.capture_lambda_handler + @batch_processor(record_handler=record_handler, processor=processor) + def lambda_handler(event, context: LambdaContext): + return processor.response() + ``` + + + ## Process batch triggered by DynamoDB Data Streams + + ```python + import json + + from aws_lambda_powertools import Logger, Tracer + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBRecord + from aws_lambda_powertools.utilities.typing import LambdaContext + + + processor = BatchProcessor(event_type=EventType.DynamoDBStreams) + tracer = Tracer() + logger = Logger() + + + @tracer.capture_method + def record_handler(record: DynamoDBRecord): + logger.info(record.dynamodb.new_image) + payload: dict = json.loads(record.dynamodb.new_image.get("item").s_value) + # alternatively: + # changes: Dict[str, dynamo_db_stream_event.AttributeValue] = record.dynamodb.new_image # noqa: E800 + # payload = change.get("Message").raw_event -> {"S": "<payload>"} + ... + + @logger.inject_lambda_context + @tracer.capture_lambda_handler + def lambda_handler(event, context: LambdaContext): + batch = event["Records"] + with processor(records=batch, processor=processor): + processed_messages = processor.process() # kick off processing, return list[tuple] + + return processor.response() + ``` + + + Raises + ------ + BatchProcessingError + When all batch records fail processing + """ + DEFAULT_RESPONSE: Dict[str, List[Optional[dict]]] = {"batchItemFailures": []} def __init__(self, event_type: EventType, model: Optional["BatchTypeModels"] = None): @@ -232,7 +354,7 @@ def _prepare(self): """ self.success_messages.clear() self.fail_messages.clear() - self.batch_response = self.DEFAULT_RESPONSE + self.batch_response = copy.deepcopy(self.DEFAULT_RESPONSE) def _process_record(self, record: dict) -> Union[SuccessResponse, FailureResponse]: """ diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 99234657378..7ced7e59950 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -627,8 +627,8 @@ Processing batches from Kinesis works in four stages: All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch: * **All records successfully processed**. We will return an empty list of item failures `{'batchItemFailures': []}` -* **Partial success with some exceptions**. We will return a list of all item IDs/sequence numbers ``{'batchItemFailures': [{"itemIdentifier": "<id>}]}` -* **All records failed to be processed**. We will raise `BatchProcessingError` exception with a list of all exceptions captured +* **Partial success with some exceptions**. We will return a list of all item IDs/sequence numbers that failed processing +* **All records failed to be processed**. We will raise `BatchProcessingError` exception with a list of all exceptions raised when processing !!! warning You will not have access to the **processed messages** within the Lambda Handler; use context manager for that. @@ -763,6 +763,129 @@ Use the context manager to access a list of all returned values from your `recor return processor.response() ``` + +### Extending BatchProcessor + +You might want to bring custom logic to the existing `BatchProcessor` to slightly override how we handle successes and failures. + +For these scenarios, you can subclass `BatchProcessor` and quickly override `success_handler` and `failure_handler` methods: + +* **`success_handler()`** – Keeps track of successful batch records +* **`failure_handler()`** – Keeps track of failed batch records + +**Example** + +Let's suppose you'd like to add a metric named `BatchRecordFailures` for each batch record that failed processing: + +=== "app.py" + + ```python + + from typing import Tuple + + from aws_lambda_powertools import Metrics + from aws_lambda_powertools.metrics import MetricUnit + from aws_lambda_powertools.utilities.batch import batch_processor, BatchProcessor, ExceptionInfo, EventType, FailureResponse + from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord + + + class MyProcessor(BatchProcessor): + def failure_handler(self, record: SQSRecord, exception: ExceptionInfo) -> FailureResponse: + metrics.add_metric(name="BatchRecordFailures", unit=MetricUnit.Count, value=1) + return super().failure_handler(record, exception) + + processor = MyProcessor(event_type=EventType.SQS) + metrics = Metrics(namespace="test") + + + @tracer.capture_method + def record_handler(record: SQSRecord): + payload: str = record.body + if payload: + item: dict = json.loads(payload) + ... + + @metrics.log_metrics(capture_cold_start_metric=True) + @batch_processor(record_handler=record_handler, processor=processor) + def lambda_handler(event, context: LambdaContext): + return processor.response() + ``` + +### Create your own partial processor + +You can create your own partial batch processor from scratch by inheriting the `BasePartialProcessor` class, and implementing `_prepare()`, `_clean()` and `_process_record()`. + +* **`_process_record()`** – handles all processing logic for each individual message of a batch, including calling the `record_handler` (self.handler) +* **`_prepare()`** – called once as part of the processor initialization +* **`clean()`** – teardown logic called once after `_process_record` completes + +You can then use this class as a context manager, or pass it to `batch_processor` to use as a decorator on your Lambda handler function. + +=== "custom_processor.py" + + ```python hl_lines="3 9 24 30 37 57" + from random import randint + + from aws_lambda_powertools.utilities.batch import BasePartialProcessor, batch_processor + import boto3 + import os + + table_name = os.getenv("TABLE_NAME", "table_not_found") + + class MyPartialProcessor(BasePartialProcessor): + """ + Process a record and stores successful results at a Amazon DynamoDB Table + + Parameters + ---------- + table_name: str + DynamoDB table name to write results to + """ + + def __init__(self, table_name: str): + self.table_name = table_name + + super().__init__() + + def _prepare(self): + # It's called once, *before* processing + # Creates table resource and clean previous results + self.ddb_table = boto3.resource("dynamodb").Table(self.table_name) + self.success_messages.clear() + + def _clean(self): + # It's called once, *after* closing processing all records (closing the context manager) + # Here we're sending, at once, all successful messages to a ddb table + with ddb_table.batch_writer() as batch: + for result in self.success_messages: + batch.put_item(Item=result) + + def _process_record(self, record): + # It handles how your record is processed + # Here we're keeping the status of each run + # where self.handler is the record_handler function passed as an argument + try: + result = self.handler(record) # record_handler passed to decorator/context manager + return self.success_handler(record, result) + except Exception as exc: + return self.failure_handler(record, exc) + + def success_handler(self, record): + entry = ("success", result, record) + message = {"age": result} + self.success_messages.append(message) + return entry + + + def record_handler(record): + return randint(0, 100) + + @batch_processor(record_handler=record_handler, processor=MyPartialProcessor(table_name)) + def lambda_handler(event, context): + return {"statusCode": 200} + ``` + + ## FAQ ### Choosing between decorator and context manager @@ -919,77 +1042,3 @@ If you want to disable the default behavior where `SQSBatchProcessingError` is r with processor(records, record_handler): result = processor.process() ``` - -### Create your own partial processor - -You can create your own partial batch processor by inheriting the `BasePartialProcessor` class, and implementing `_prepare()`, `_clean()` and `_process_record()`. - -* **`_process_record()`** - Handles all processing logic for each individual message of a batch, including calling the `record_handler` (self.handler) -* **`_prepare()`** - Called once as part of the processor initialization -* **`clean()`** - Teardown logic called once after `_process_record` completes - -You can then use this class as a context manager, or pass it to `batch_processor` to use as a decorator on your Lambda handler function. - -=== "custom_processor.py" - - ```python hl_lines="3 9 24 30 37 57" - from random import randint - - from aws_lambda_powertools.utilities.batch import BasePartialProcessor, batch_processor - import boto3 - import os - - table_name = os.getenv("TABLE_NAME", "table_not_found") - - class MyPartialProcessor(BasePartialProcessor): - """ - Process a record and stores successful results at a Amazon DynamoDB Table - - Parameters - ---------- - table_name: str - DynamoDB table name to write results to - """ - - def __init__(self, table_name: str): - self.table_name = table_name - - super().__init__() - - def _prepare(self): - # It's called once, *before* processing - # Creates table resource and clean previous results - self.ddb_table = boto3.resource("dynamodb").Table(self.table_name) - self.success_messages.clear() - - def _clean(self): - # It's called once, *after* closing processing all records (closing the context manager) - # Here we're sending, at once, all successful messages to a ddb table - with ddb_table.batch_writer() as batch: - for result in self.success_messages: - batch.put_item(Item=result) - - def _process_record(self, record): - # It handles how your record is processed - # Here we're keeping the status of each run - # where self.handler is the record_handler function passed as an argument - try: - result = self.handler(record) # record_handler passed to decorator/context manager - return self.success_handler(record, result) - except Exception as exc: - return self.failure_handler(record, exc) - - def success_handler(self, record): - entry = ("success", result, record) - message = {"age": result} - self.success_messages.append(message) - return entry - - - def record_handler(record): - return randint(0, 100) - - @batch_processor(record_handler=record_handler, processor=MyPartialProcessor(table_name)) - def lambda_handler(event, context): - return {"statusCode": 200} - ``` From 580eeaef01f144431b06eeef65ea7248e2629e76 Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Sun, 19 Dec 2021 17:59:45 +0100 Subject: [PATCH 29/32] docs: add pydantic section --- docs/utilities/batch.md | 141 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 141 insertions(+) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 7ced7e59950..5f06791d412 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -707,6 +707,147 @@ You need to create a function to handle each record from the batch - We call it ## Advanced +### Pydantic integration + +You can bring your own Pydantic models via **`model`** parameter when inheriting from **`SqsRecordModel`**, **`KinesisDataStreamRecord`**, or **`DynamoDBStreamRecordModel`** + +Inheritance is importance because we need to access message IDs and sequence numbers from these records in the event of failure. Mypy is fully integrated with this utility, so it should identify whether you're passing the incorrect Model. + + +=== "SQS" + + ```python hl_lines="5 9-10 12-19 21 27" + import json + + from aws_lambda_powertools import Logger, Tracer + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + from aws_lambda_powertools.utilities.parser.models import SqsRecordModel + from aws_lambda_powertools.utilities.typing import LambdaContext + + + class Order(BaseModel): + item: dict + + class OrderSqsRecord(SqsRecordModel): + body: Order + + # auto transform json string + # so Pydantic can auto-initialize nested Order model + @validator("body", pre=True) + def transform_body_to_dict(cls, value: str): + return json.loads(value) + + processor = BatchProcessor(event_type=EventType.SQS, model=OrderSqsRecord) + tracer = Tracer() + logger = Logger() + + + @tracer.capture_method + def record_handler(record: OrderSqsRecord): + return record.body.item + + @logger.inject_lambda_context + @tracer.capture_lambda_handler + @batch_processor(record_handler=record_handler, processor=processor) + def lambda_handler(event, context: LambdaContext): + return processor.response() + ``` + +=== "Kinesis Data Streams" + + ```python hl_lines="5 9-10 12-20 22-23 26 32" + import json + + from aws_lambda_powertools import Logger, Tracer + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + from aws_lambda_powertools.utilities.parser.models import KinesisDataStreamRecord + from aws_lambda_powertools.utilities.typing import LambdaContext + + + class Order(BaseModel): + item: dict + + class OrderKinesisPayloadRecord(KinesisDataStreamRecordPayload): + data: Order + + # auto transform json string + # so Pydantic can auto-initialize nested Order model + @validator("data", pre=True) + def transform_message_to_dict(cls, value: str): + # Powertools KinesisDataStreamRecordModel already decodes b64 to str here + return json.loads(value) + + class OrderKinesisRecord(KinesisDataStreamRecordModel): + kinesis: OrderKinesisPayloadRecord + + + processor = BatchProcessor(event_type=EventType.KinesisDataStreams, model=OrderKinesisRecord) + tracer = Tracer() + logger = Logger() + + + @tracer.capture_method + def record_handler(record: OrderKinesisRecord): + return record.kinesis.data.item + + + @logger.inject_lambda_context + @tracer.capture_lambda_handler + @batch_processor(record_handler=record_handler, processor=processor) + def lambda_handler(event, context: LambdaContext): + return processor.response() + ``` + +=== "DynamoDB Streams" + + ```python hl_lines="7 11-12 14-21 23-25 27-28 31 37" + import json + + from typing import Dict, Literal + + from aws_lambda_powertools import Logger, Tracer + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + from aws_lambda_powertools.utilities.parser.models import DynamoDBStreamRecordModel + from aws_lambda_powertools.utilities.typing import LambdaContext + + + class Order(BaseModel): + item: dict + + class OrderDynamoDB(BaseModel): + Message: Order + + # auto transform json string + # so Pydantic can auto-initialize nested Order model + @validator("Message", pre=True) + def transform_message_to_dict(cls, value: Dict[Literal["S"], str]): + return json.loads(value["S"]) + + class OrderDynamoDBChangeRecord(DynamoDBStreamChangedRecordModel): + NewImage: Optional[OrderDynamoDB] + OldImage: Optional[OrderDynamoDB] + + class OrderDynamoDBRecord(DynamoDBStreamRecordModel): + dynamodb: OrderDynamoDBChangeRecord + + + processor = BatchProcessor(event_type=EventType.DynamoDBStreams, model=OrderKinesisRecord) + tracer = Tracer() + logger = Logger() + + + @tracer.capture_method + def record_handler(record: OrderDynamoDBRecord): + return record.dynamodb.NewImage.Message.item + + + @logger.inject_lambda_context + @tracer.capture_lambda_handler + @batch_processor(record_handler=record_handler, processor=processor) + def lambda_handler(event, context: LambdaContext): + return processor.response() + ``` + ### Accessing processed messages Use the context manager to access a list of all returned values from your `record_handler` function. From f380f5738db3f5556e0de51ff49955c6dcdffea0 Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Sun, 19 Dec 2021 18:23:51 +0100 Subject: [PATCH 30/32] docs: add migration guide --- docs/utilities/batch.md | 97 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 96 insertions(+), 1 deletion(-) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 5f06791d412..c44d8d670e7 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -1057,7 +1057,102 @@ When using Sentry.io for error monitoring, you can override `failure_handler` to ## Legacy -!!! tip "This is kept for historical purposes. Use the new [BatchProcessor](#processing-messages-from-sqs) instead. " +!!! tip "This is kept for historical purposes. Use the new [BatchProcessor](#processing-messages-from-sqs) instead." + + +### Migration guide + +!!! info "keep reading if you are using `sqs_batch_processor` or `PartialSQSProcessor`" + +[As of Nov 2021](https://aws.amazon.com/about-aws/whats-new/2021/11/aws-lambda-partial-batch-response-sqs-event-source/){target="_blank"}, this is no longer needed as both SQS, Kinesis, and DynamoDB Streams offer this capability natively with one caveat - it's an [opt-in feature](#required-resources). + +Being a native feature, we no longer need to instantiate boto3 nor other customizations like exception suppressing – this lowers the cost of your Lambda function as you can delegate deleting partial failures to Lambda. + +!!! tip "It's also easier to test since it's mostly a [contract based response](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#sqs-batchfailurereporting-syntax){target="_blank"}." + +You can migrate in three steps: + +1. If you are using **`sqs_batch_decorator`** you can now use **`batch_processor`** decorator +2. If you were using **`PartialSQSProcessor`** you can now use **`BatchProcessor`** +3. Change your Lambda Handler to return the new response format + + +=== "Decorator: Before" + + ```python hl_lines="1 6" + from aws_lambda_powertools.utilities.batch import sqs_batch_processor + + def record_handler(record): + return do_something_with(record["body"]) + + @sqs_batch_processor(record_handler=record_handler) + def lambda_handler(event, context): + return {"statusCode": 200} + ``` + +=== "Decorator: After" + + ```python hl_lines="3 5 11" + import json + + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + + processor = BatchProcessor(event_type=EventType.SQS) + + + def record_handler(record): + return do_something_with(record["body"]) + + @batch_processor(record_handler=record_handler, processor=processor) + def lambda_handler(event, context): + return processor.response() + ``` + + +=== "Context manager: Before" + + ```python hl_lines="1-2 4 14 19" + from aws_lambda_powertools.utilities.batch import PartialSQSProcessor + from botocore.config import Config + + config = Config(region_name="us-east-1") + + def record_handler(record): + return_value = do_something_with(record["body"]) + return return_value + + + def lambda_handler(event, context): + records = event["Records"] + + processor = PartialSQSProcessor(config=config) + + with processor(records, record_handler): + result = processor.process() + + return result + ``` + +=== "Context manager: After" + + ```python hl_lines="1 11" + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + + + def record_handler(record): + return_value = do_something_with(record["body"]) + return return_value + + def lambda_handler(event, context): + records = event["Records"] + + processor = BatchProcessor(event_type=EventType.SQS) + + with processor(records, record_handler): + result = processor.process() + + return processor.response() + ``` ### Customizing boto configuration From 58d78ca0c7e69c0b8811376e7adbe74dbf58c04b Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Sun, 19 Dec 2021 18:28:57 +0100 Subject: [PATCH 31/32] docs: add caveat section --- docs/utilities/batch.md | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index c44d8d670e7..02464f4a14c 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -1026,6 +1026,43 @@ You can then use this class as a context manager, or pass it to `batch_processor return {"statusCode": 200} ``` +### Caveats + +#### Tracer response auto-capture for large batch sizes + +When using Tracer to capture responses for each batch record processing, you might exceed 64K of tracing data depending on what you return from your `record_handler` function, or how big is your batch size. + +If that's the case, you can configure [Tracer to disable response auto-capturing](../core/tracer.md#disabling-response-auto-capture){target="_blank"}. + + +```python hl_lines="14" title="Disabling Tracer response auto-capturing" +import json + +from aws_lambda_powertools import Logger, Tracer +from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor +from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord +from aws_lambda_powertools.utilities.typing import LambdaContext + + +processor = BatchProcessor(event_type=EventType.SQS) +tracer = Tracer() +logger = Logger() + + +@tracer.capture_method(capture_response=False) +def record_handler(record: SQSRecord): + payload: str = record.body + if payload: + item: dict = json.loads(payload) + ... + +@logger.inject_lambda_context +@tracer.capture_lambda_handler +@batch_processor(record_handler=record_handler, processor=processor) +def lambda_handler(event, context: LambdaContext): + return processor.response() + +``` ## FAQ From 95715d005aef2b036de6fda2f852e2c84d955c0b Mon Sep 17 00:00:00 2001 From: heitorlessa <lessa@amazon.co.uk> Date: Sun, 19 Dec 2021 18:55:41 +0100 Subject: [PATCH 32/32] docs: add testing section --- docs/utilities/batch.md | 135 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 135 insertions(+) diff --git a/docs/utilities/batch.md b/docs/utilities/batch.md index 02464f4a14c..3ea9413749e 100644 --- a/docs/utilities/batch.md +++ b/docs/utilities/batch.md @@ -1064,6 +1064,141 @@ def lambda_handler(event, context: LambdaContext): ``` +## Testing your code + +As there is no external calls, you can unit test your code with `BatchProcessor` quite easily. + +**Example**: Given a SQS batch where the first batch record succeeds and the second fails processing, we should have a single item reported in the function response. + +=== "test_app.py" + + ```python + import json + + from pathlib import Path + from dataclasses import dataclass + + import pytest + from src.app import lambda_handler, processor + + + def load_event(path: Path): + with path.open() as f: + return json.load(f) + + + @pytest.fixture + def lambda_context(): + @dataclass + class LambdaContext: + function_name: str = "test" + memory_limit_in_mb: int = 128 + invoked_function_arn: str = "arn:aws:lambda:eu-west-1:809313241:function:test" + aws_request_id: str = "52fdfc07-2182-154f-163f-5f0f9a621d72" + + return LambdaContext() + + @pytest.fixture() + def sqs_event(): + """Generates API GW Event""" + return load_event(path=Path("events/sqs_event.json")) + + + def test_app_batch_partial_response(sqs_event, lambda_context): + # GIVEN + processor = app.processor # access processor for additional assertions + successful_record = sqs_event["Records"][0] + failed_record = sqs_event["Records"][1] + expected_response = { + "batchItemFailures: [ + { + "itemIdentifier": failed_record["messageId"] + } + ] + } + + # WHEN + ret = app.lambda_handler(sqs_event, lambda_context) + + # THEN + assert ret == expected_response + assert len(processor.fail_messages) == 1 + assert processor.success_messages[0] == successful_record + ``` + +=== "src/app.py" + + ```python + import json + + from aws_lambda_powertools import Logger, Tracer + from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, batch_processor + from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord + from aws_lambda_powertools.utilities.typing import LambdaContext + + + processor = BatchProcessor(event_type=EventType.SQS) + tracer = Tracer() + logger = Logger() + + + @tracer.capture_method + def record_handler(record: SQSRecord): + payload: str = record.body + if payload: + item: dict = json.loads(payload) + ... + + @logger.inject_lambda_context + @tracer.capture_lambda_handler + @batch_processor(record_handler=record_handler, processor=processor) + def lambda_handler(event, context: LambdaContext): + return processor.response() + ``` + +=== "Sample SQS event" + + ```json title="events/sqs_sample.json" + { + "Records": [ + { + "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", + "body": "{\"Message\": \"success\"}", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue", + "awsRegion": "us-east-1" + }, + { + "messageId": "244fc6b4-87a3-44ab-83d2-361172410c3a", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a", + "body": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2: 123456789012:my-queue", + "awsRegion": "us-east-1" + } + ] + } + ``` + + + ## FAQ ### Choosing between decorator and context manager