Skip to content

feat(event_source): Add support for S3 batch operations #3572

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
9d8e9a4
Add support for S3 Batch Operations event and response along with uni…
sbailliez Dec 29, 2023
319a048
Add documentation with example based on the AWS S3 documentation
sbailliez Dec 29, 2023
d834c96
Use unquote_plus and add unit test for key encoded with space
sbailliez Dec 30, 2023
0045c06
Merge branch 'aws-powertools:develop' into feat/s3-batch-operations
sbailliez Dec 30, 2023
7af9a73
Merge branch 'develop' into feat/s3-batch-operations
leandrodamascena Jan 2, 2024
f3955eb
Merge branch 'develop' into feat/s3-batch-operations
leandrodamascena Jan 2, 2024
da5105a
Merge branch 'develop' into feat/s3-batch-operations
leandrodamascena Jan 4, 2024
eee9536
Merge branch 'develop' into feat/s3-batch-operations
leandrodamascena Jan 10, 2024
688e746
Merge branch 'develop' into feat/s3-batch-operations
leandrodamascena Jan 15, 2024
a401507
Initial refactor
leandrodamascena Jan 16, 2024
bd29d0a
Changing the DX to improve usability
leandrodamascena Jan 16, 2024
dfb4618
Documentation
leandrodamascena Jan 16, 2024
b778609
Adding parser
leandrodamascena Jan 16, 2024
fe5424f
Small refactor
leandrodamascena Jan 16, 2024
b4996a3
Merge branch 'develop' into feat/s3-batch-operations
leandrodamascena Jan 16, 2024
c49adb7
Merge branch 'develop' into feat/s3-batch-operations
leandrodamascena Jan 17, 2024
deb270a
Addressing Ruben's feedback - Docs and examples
leandrodamascena Jan 17, 2024
12e81d4
Addressing Ruben's feedback - Docs and examples
leandrodamascena Jan 17, 2024
57681c4
Addressing Ruben's feedback - Code
leandrodamascena Jan 17, 2024
a99594f
Addressing Ruben's feedback - Code
leandrodamascena Jan 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions aws_lambda_powertools/utilities/data_classes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
)
from .kinesis_stream_event import KinesisStreamEvent
from .lambda_function_url_event import LambdaFunctionUrlEvent
from .s3_batch_operation_event import (
S3BatchOperationEvent,
S3BatchOperationResponse,
S3BatchOperationResult,
)
from .s3_event import S3Event, S3EventBridgeNotificationEvent
from .secrets_manager_event import SecretsManagerEvent
from .ses_event import SESEvent
Expand Down Expand Up @@ -52,6 +57,9 @@
"LambdaFunctionUrlEvent",
"S3Event",
"S3EventBridgeNotificationEvent",
"S3BatchOperationEvent",
"S3BatchOperationResponse",
"S3BatchOperationResult",
"SESEvent",
"SNSEvent",
"SQSEvent",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
from dataclasses import dataclass, field
from typing import Any, Dict, Iterator, List, Optional, Tuple
from urllib.parse import unquote_plus

from typing_extensions import Literal

from aws_lambda_powertools.utilities.data_classes.common import DictWrapper


class S3BatchOperationJob(DictWrapper):
@property
def id(self) -> str: # noqa: A003
return self["id"]

@property
def user_arguments(self) -> Optional[Dict[str, str]]:
"""Get user arguments provided for this job (only for invocation schema 2.0)"""
return self.get("userArguments")


class S3BatchOperationTask(DictWrapper):
@property
def task_id(self) -> str:
"""Get the task id"""
return self["taskId"]

@property
def s3_key(self) -> str:
"""Get the object key unquote_plus using strict utf-8 encoding"""
# note: AWS documentation example is using unquote but this actually
# contradicts what happens in practice. The key is url encoded with %20
# in the inventory file but in the event it is sent with +. So use unquote_plus
return unquote_plus(self["s3Key"], encoding="utf-8", errors="strict")

@property
def s3_version_id(self) -> Optional[str]:
"""Object version if bucket is versioning-enabled, otherwise null"""
return self.get("s3VersionId")

@property
def s3_bucket_arn(self) -> Optional[str]:
"""Get the s3 bucket arn (present only for invocationSchemaVersion '1.0')"""
return self.get("s3BucketArn")

@property
def s3_bucket(self) -> str:
""" "
Get the s3 bucket, either from 's3Bucket' property (invocationSchemaVersion '2.0')
or from 's3BucketArn' (invocationSchemaVersion '1.0')
"""
if self.s3_bucket_arn:
return self.s3_bucket_arn.split(":::")[-1]
return self["s3Bucket"]


class S3BatchOperationEvent(DictWrapper):
"""Amazon S3BatchOperation Event

Documentation:
--------------
- https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html
"""

@property
def invocation_id(self) -> str:
"""Get the identifier of the invocation request"""
return self["invocationId"]

@property
def invocation_schema_version(self) -> str:
""" "
Get the schema version for the payload that Batch Operations sends when invoking an
AWS Lambda function. Either '1.0' or '2.0'.
"""
return self["invocationSchemaVersion"]

@property
def tasks(self) -> Iterator[S3BatchOperationTask]:
for task in self["tasks"]:
yield S3BatchOperationTask(task)

@property
def task(self) -> S3BatchOperationTask:
"""Get the first s3 batch operation task"""
return next(self.tasks)

@property
def job(self) -> S3BatchOperationJob:
"""Get the s3 batch operation job"""
return S3BatchOperationJob(self["job"])


# list of valid result code. Used both in S3BatchOperationResult and S3BatchOperationResponse
VALID_RESULT_CODE_TYPES: Tuple[str, str, str] = ("Succeeded", "TemporaryFailure", "PermanentFailure")


@dataclass(repr=False, order=False)
class S3BatchOperationResult:
task_id: str
result_code: Literal["Succeeded", "TemporaryFailure", "PermanentFailure"]
result_string: Optional[str] = None

def __post_init__(self):
if self.result_code not in VALID_RESULT_CODE_TYPES:
raise ValueError(f"Invalid result_code: {self.result_code}")

def asdict(self) -> Dict[str, Any]:
return {
"taskId": self.task_id,
"resultCode": self.result_code,
"resultString": self.result_string,
}

@classmethod
def as_succeeded(cls, task: S3BatchOperationTask, result_string: Optional[str] = None) -> "S3BatchOperationResult":
"""Create a `Succeeded` result for a given task"""
return S3BatchOperationResult(task.task_id, "Succeeded", result_string)

@classmethod
def as_permanent_failure(
cls,
task: S3BatchOperationTask,
result_string: Optional[str] = None,
) -> "S3BatchOperationResult":
"""Create a `PermanentFailure` result for a given task"""
return S3BatchOperationResult(task.task_id, "PermanentFailure", result_string)

@classmethod
def as_temporary_failure(
cls,
task: S3BatchOperationTask,
result_string: Optional[str] = None,
) -> "S3BatchOperationResult":
"""Create a `TemporaryFailure` result for a given task"""
return S3BatchOperationResult(task.task_id, "TemporaryFailure", result_string)


@dataclass(repr=False, order=False)
class S3BatchOperationResponse:
"""S3 Batch Operations response object

Documentation:
--------------
- https://docs.aws.amazon.com/lambda/latest/dg/services-s3-batch.html
- https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html#batch-ops-invoke-lambda-custom-functions
- https://docs.aws.amazon.com/AmazonS3/latest/API/API_control_LambdaInvokeOperation.html#AmazonS3-Type-control_LambdaInvokeOperation-InvocationSchemaVersion

Parameters
----------
invocation_schema_version : str
Specifies the schema version for the payload that Batch Operations sends when invoking
an AWS Lambda function., either '1.0' or '2.0'. This must be copied from the event.

invocation_id : str
The identifier of the invocation request. This must be copied from the event.

treat_missing_keys_as : Literal["Succeeded", "TemporaryFailure", "PermanentFailure"]
undocumented parameter, defaults to "PermanentFailure"

results : List[S3BatchOperationResult]
results of each S3 Batch Operations task,
optional parameter at start. can be added later using `add_result` function.

Examples
--------

**S3 Batch Operations**

```python
from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools.utilities.data_classes import (
S3BatchOperationEvent,
S3BatchOperationResponse,
S3BatchOperationResult
)

def lambda_handler(event: dict, context: LambdaContext):
s3_event = S3BatchOperationEvent(event)
response = S3BatchOperationResponse(s3_event.invocation_schema_version, s3_event.invocation_id)
result = None

task = s3_event.task
try:
do_work(task.s3_bucket, task.s3_key)
result = S3BatchOperationResult.as_succeeded(task)
except TimeoutError as e:
result = S3BatchOperationResult.as_temporary_failure(task, str(e))
except Exception as e:
result = S3BatchOperationResult.as_permanent_failure(task, str(e))
finally:
response.add_result(result)

return response.asdict()
```
"""

invocation_schema_version: str
invocation_id: str
treat_missing_keys_as: Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] = "PermanentFailure"
results: List[S3BatchOperationResult] = field(default_factory=list)

def __post_init__(self):
if self.treat_missing_keys_as not in VALID_RESULT_CODE_TYPES:
raise ValueError(f"Invalid treat_missing_keys_as: {self.treat_missing_keys_as}")

def add_result(self, result: S3BatchOperationResult):
self.results.append(result)

def asdict(self) -> Dict:
if not self.results:
raise ValueError("Response must have one result")
if len(self.results) > 1:
raise ValueError("Response cannot have more than one result")

return {
"invocationSchemaVersion": self.invocation_schema_version,
"treatMissingKeysAs": self.treat_missing_keys_as,
"invocationId": self.invocation_id,
"results": [result.asdict() for result in self.results],
}
89 changes: 88 additions & 1 deletion docs/utilities/data_classes.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Log Data Event for Troubleshooting
## Supported event sources

| Event Source | Data_class |
| ------------------------------------------------------------------------- | -------------------------------------------------- |
|---------------------------------------------------------------------------|----------------------------------------------------|
| [Active MQ](#active-mq) | `ActiveMQEvent` |
| [API Gateway Authorizer](#api-gateway-authorizer) | `APIGatewayAuthorizerRequestEvent` |
| [API Gateway Authorizer V2](#api-gateway-authorizer-v2) | `APIGatewayAuthorizerEventV2` |
Expand All @@ -99,6 +99,7 @@ Log Data Event for Troubleshooting
| [Lambda Function URL](#lambda-function-url) | `LambdaFunctionUrlEvent` |
| [Rabbit MQ](#rabbit-mq) | `RabbitMQEvent` |
| [S3](#s3) | `S3Event` |
| [S3 Batch Operations](#s3-batch-operations) | `S3BatchOperationEvent` |
| [S3 Object Lambda](#s3-object-lambda) | `S3ObjectLambdaEvent` |
| [S3 EventBridge Notification](#s3-eventbridge-notification) | `S3EventBridgeNotificationEvent` |
| [SES](#ses) | `SESEvent` |
Expand Down Expand Up @@ -1076,6 +1077,92 @@ for more details.
do_something_with(f"{bucket_name}/{object_key}")
```

### S3 Batch Operations

This example is based on the AWS S3 Batch Operations documentation [Example Lambda function for S3 Batch Operations](https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html){target="_blank"}.

=== "app.py"

```python hl_lines="4-8 17-19 24-26 38-40 55-57 61-63 72 75 77"
import boto3
from botocore.exceptions import ClientError

from aws_lambda_powertools.utilities.data_classes import (
S3BatchOperationEvent,
S3BatchOperationResult,
S3BatchOperationResponse,
)

import logging

logger = logging.getLogger(__name__)
logger.setLevel("INFO")

s3 = boto3.client("s3")

@event_route(data_class=S3BatchOperationEvent)
def lambda_handler(event: S3BatchOperationEvent, context):
response = S3BatchOperationResponse(event.invocation_schema_version, event.invocation_id)
result = None
task = event.task

try:
obj_key = task.s3_key
obj_version_id = task.s3_version_id
bucket_name = task.s3_bucket

logger.info(
"Got task: remove delete marker %s from object %s.", obj_version_id, obj_key
)

try:
# If this call does not raise an error, the object version is not a delete
# marker and should not be deleted.
head_response = s3.head_object(
Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id
)
result = S3BatchOperationResult.as_permanent_failure(task,
f"Object {obj_key}, ID {obj_version_id} is not a delete marker."
)

logger.debug(head_response)
except ClientError as error:
delete_marker = error.response["ResponseMetadata"]["HTTPHeaders"].get(
"x-amz-delete-marker", "false"
)
if delete_marker == "true":
logger.info(
"Object %s, version %s is a delete marker.", obj_key, obj_version_id
)
try:
s3.delete_object(
Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id
)
result = S3BatchOperationResult.as_succeeded(task,
f"Successfully removed delete marker {obj_version_id} from object {obj_key}."
)
except ClientError as error:
# Mark request timeout as a temporary failure so it will be retried.
if error.response["Error"]["Code"] == "RequestTimeout":
result = S3BatchOperationResult.as_temporary_failure(
task, f"Attempt to remove delete marker from object {obj_key} timed out."
)
else:
raise
else:
raise ValueError(
f"The x-amz-delete-marker header is either not present or is not 'true'."
)
except Exception as error:
# Mark all other exceptions as permanent failures.
result = S3BatchOperationResult.as_permanent_failure(task, str(error))
logger.exception(error)
finally:
response.add_result(result)

return response.asdict()
```

### S3 Object Lambda

This example is based on the AWS Blog post [Introducing Amazon S3 Object Lambda – Use Your Code to Process Data as It Is Being Retrieved from S3](https://aws.amazon.com/blogs/aws/introducing-amazon-s3-object-lambda-use-your-code-to-process-data-as-it-is-being-retrieved-from-s3/){target="_blank"}.
Expand Down
15 changes: 15 additions & 0 deletions tests/events/s3BatchOperationEventSchemaV1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"invocationSchemaVersion": "1.0",
"invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo",
"job": {
"id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce"
},
"tasks": [
{
"taskId": "dGFza2lkZ29lc2hlcmUK",
"s3Key": "prefix/dataset/dataset.20231222.json.gz",
"s3VersionId": "1",
"s3BucketArn": "arn:aws:s3:::powertools-dataset"
}
]
}
19 changes: 19 additions & 0 deletions tests/events/s3BatchOperationEventSchemaV2.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"invocationSchemaVersion": "2.0",
"invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo",
"job": {
"id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce",
"userArguments": {
"k1": "v1",
"k2": "v2"
}
},
"tasks": [
{
"taskId": "dGFza2lkZ29lc2hlcmUK",
"s3Key": "prefix/dataset/dataset.20231222.json.gz",
"s3VersionId": null,
"s3Bucket": "powertools-dataset"
}
]
}
Loading