-
Notifications
You must be signed in to change notification settings - Fork 429
feat(event_source): Add support for S3 batch operations #3572
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
leandrodamascena
merged 20 commits into
aws-powertools:develop
from
sbailliez:feat/s3-batch-operations
Jan 17, 2024
Merged
Changes from all commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
9d8e9a4
Add support for S3 Batch Operations event and response along with uni…
sbailliez 319a048
Add documentation with example based on the AWS S3 documentation
sbailliez d834c96
Use unquote_plus and add unit test for key encoded with space
sbailliez 0045c06
Merge branch 'aws-powertools:develop' into feat/s3-batch-operations
sbailliez 7af9a73
Merge branch 'develop' into feat/s3-batch-operations
leandrodamascena f3955eb
Merge branch 'develop' into feat/s3-batch-operations
leandrodamascena da5105a
Merge branch 'develop' into feat/s3-batch-operations
leandrodamascena eee9536
Merge branch 'develop' into feat/s3-batch-operations
leandrodamascena 688e746
Merge branch 'develop' into feat/s3-batch-operations
leandrodamascena a401507
Initial refactor
leandrodamascena bd29d0a
Changing the DX to improve usability
leandrodamascena dfb4618
Documentation
leandrodamascena b778609
Adding parser
leandrodamascena fe5424f
Small refactor
leandrodamascena b4996a3
Merge branch 'develop' into feat/s3-batch-operations
leandrodamascena c49adb7
Merge branch 'develop' into feat/s3-batch-operations
leandrodamascena deb270a
Addressing Ruben's feedback - Docs and examples
leandrodamascena 12e81d4
Addressing Ruben's feedback - Docs and examples
leandrodamascena 57681c4
Addressing Ruben's feedback - Code
leandrodamascena a99594f
Addressing Ruben's feedback - Code
leandrodamascena File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
242 changes: 242 additions & 0 deletions
242
aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,242 @@ | ||
import warnings | ||
from dataclasses import dataclass, field | ||
from typing import Any, Dict, Iterator, List, Optional, Tuple | ||
from urllib.parse import unquote_plus | ||
|
||
from aws_lambda_powertools.shared.types import Literal | ||
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper | ||
|
||
# list of valid result code. Used both in S3BatchOperationResponse and S3BatchOperationResponseRecord | ||
VALID_RESULT_CODES: Tuple[str, str, str] = ("Succeeded", "TemporaryFailure", "PermanentFailure") | ||
RESULT_CODE_TYPE = Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] | ||
|
||
|
||
@dataclass(repr=False, order=False) | ||
class S3BatchOperationResponseRecord: | ||
task_id: str | ||
result_code: RESULT_CODE_TYPE | ||
result_string: Optional[str] = None | ||
|
||
def asdict(self) -> Dict[str, Any]: | ||
if self.result_code not in VALID_RESULT_CODES: | ||
warnings.warn( | ||
stacklevel=2, | ||
message=f"The resultCode {self.result_code} is not valid. " | ||
f"Choose from {', '.join(map(repr, VALID_RESULT_CODES))}.", | ||
) | ||
|
||
return { | ||
"taskId": self.task_id, | ||
"resultCode": self.result_code, | ||
"resultString": self.result_string, | ||
} | ||
|
||
|
||
@dataclass(repr=False, order=False) | ||
class S3BatchOperationResponse: | ||
"""S3 Batch Operations response object | ||
|
||
Documentation: | ||
-------------- | ||
- https://docs.aws.amazon.com/lambda/latest/dg/services-s3-batch.html | ||
- https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html#batch-ops-invoke-lambda-custom-functions | ||
- https://docs.aws.amazon.com/AmazonS3/latest/API/API_control_LambdaInvokeOperation.html#AmazonS3-Type-control_LambdaInvokeOperation-InvocationSchemaVersion | ||
|
||
Parameters | ||
---------- | ||
invocation_schema_version : str | ||
Specifies the schema version for the payload that Batch Operations sends when invoking | ||
an AWS Lambda function., either '1.0' or '2.0'. This must be copied from the event. | ||
|
||
invocation_id : str | ||
The identifier of the invocation request. This must be copied from the event. | ||
|
||
treat_missing_keys_as : Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] | ||
Undocumented parameter, defaults to "Succeeded" | ||
|
||
results : List[S3BatchOperationResult] | ||
Results of each S3 Batch Operations task, | ||
optional parameter at start. Can be added later using `add_result` function. | ||
|
||
Examples | ||
-------- | ||
|
||
**S3 Batch Operations** | ||
|
||
```python | ||
import boto3 | ||
|
||
from botocore.exceptions import ClientError | ||
|
||
from aws_lambda_powertools.utilities.data_classes import ( | ||
S3BatchOperationEvent, | ||
S3BatchOperationResponse, | ||
event_source | ||
) | ||
from aws_lambda_powertools.utilities.typing import LambdaContext | ||
|
||
|
||
@event_source(data_class=S3BatchOperationEvent) | ||
def lambda_handler(event: S3BatchOperationEvent, context: LambdaContext): | ||
response = S3BatchOperationResponse( | ||
event.invocation_schema_version, | ||
event.invocation_id, | ||
"PermanentFailure" | ||
) | ||
|
||
result = None | ||
task = event.task | ||
src_key: str = task.s3_key | ||
src_bucket: str = task.s3_bucket | ||
|
||
s3 = boto3.client("s3", region_name='us-east-1') | ||
|
||
try: | ||
dest_bucket, dest_key = do_some_work(s3, src_bucket, src_key) | ||
result = task.build_task_batch_response("Succeeded", f"s3://{dest_bucket}/{dest_key}") | ||
except ClientError as e: | ||
error_code = e.response['Error']['Code'] | ||
error_message = e.response['Error']['Message'] | ||
if error_code == 'RequestTimeout': | ||
result = task.build_task_batch_response("TemporaryFailure", "Timeout - trying again") | ||
else: | ||
result = task.build_task_batch_response("PermanentFailure", f"{error_code}: {error_message}") | ||
except Exception as e: | ||
result = task.build_task_batch_response("PermanentFailure", str(e)) | ||
finally: | ||
response.add_result(result) | ||
|
||
return response.asdict() | ||
``` | ||
""" | ||
|
||
invocation_schema_version: str | ||
invocation_id: str | ||
treat_missing_keys_as: RESULT_CODE_TYPE = "Succeeded" | ||
results: List[S3BatchOperationResponseRecord] = field(default_factory=list) | ||
|
||
def __post_init__(self): | ||
if self.treat_missing_keys_as not in VALID_RESULT_CODES: | ||
warnings.warn( | ||
stacklevel=2, | ||
message=f"The value {self.treat_missing_keys_as} is not valid for treat_missing_keys_as, " | ||
f"Choose from {', '.join(map(repr, VALID_RESULT_CODES))}.", | ||
) | ||
|
||
def add_result(self, result: S3BatchOperationResponseRecord): | ||
self.results.append(result) | ||
|
||
def asdict(self) -> Dict: | ||
result_count = len(self.results) | ||
|
||
if result_count != 1: | ||
raise ValueError(f"Response must have exactly one result, but got {result_count}") | ||
|
||
return { | ||
"invocationSchemaVersion": self.invocation_schema_version, | ||
"treatMissingKeysAs": self.treat_missing_keys_as, | ||
"invocationId": self.invocation_id, | ||
"results": [result.asdict() for result in self.results], | ||
} | ||
|
||
|
||
class S3BatchOperationJob(DictWrapper): | ||
@property | ||
def get_id(self) -> str: | ||
# Note: this name conflicts with existing python builtins | ||
return self["id"] | ||
|
||
@property | ||
def user_arguments(self) -> Optional[Dict[str, str]]: | ||
"""Get user arguments provided for this job (only for invocation schema 2.0)""" | ||
return self.get("userArguments") | ||
|
||
|
||
class S3BatchOperationTask(DictWrapper): | ||
@property | ||
def task_id(self) -> str: | ||
"""Get the task id""" | ||
return self["taskId"] | ||
|
||
@property | ||
def s3_key(self) -> str: | ||
"""Get the object key using unquote_plus""" | ||
return unquote_plus(self["s3Key"]) | ||
|
||
@property | ||
def s3_version_id(self) -> Optional[str]: | ||
"""Object version if bucket is versioning-enabled, otherwise null""" | ||
return self.get("s3VersionId") | ||
|
||
@property | ||
def s3_bucket_arn(self) -> Optional[str]: | ||
"""Get the s3 bucket arn (present only for invocationSchemaVersion '1.0')""" | ||
return self.get("s3BucketArn") | ||
|
||
@property | ||
def s3_bucket(self) -> str: | ||
""" | ||
Get the s3 bucket, either from 's3Bucket' property (invocationSchemaVersion '2.0') | ||
or from 's3BucketArn' (invocationSchemaVersion '1.0') | ||
""" | ||
if self.s3_bucket_arn: | ||
return self.s3_bucket_arn.split(":::")[-1] | ||
return self["s3Bucket"] | ||
|
||
def build_task_batch_response( | ||
self, | ||
result_code: Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] = "Succeeded", | ||
result_string: str = "", | ||
) -> S3BatchOperationResponseRecord: | ||
"""Create a S3BatchOperationResponseRecord directly using the task_id and given values | ||
|
||
Parameters | ||
---------- | ||
result_code : Literal["Succeeded", "TemporaryFailure", "PermanentFailure"] = "Succeeded" | ||
task result, supported value: "Succeeded", "TemporaryFailure", "PermanentFailure" | ||
result_string : str | ||
string to identify in the report | ||
""" | ||
return S3BatchOperationResponseRecord( | ||
task_id=self.task_id, | ||
result_code=result_code, | ||
result_string=result_string, | ||
) | ||
|
||
|
||
class S3BatchOperationEvent(DictWrapper): | ||
"""Amazon S3BatchOperation Event | ||
|
||
Documentation: | ||
-------------- | ||
- https://docs.aws.amazon.com/AmazonS3/latest/userguide/batch-ops-invoke-lambda.html | ||
""" | ||
|
||
@property | ||
def invocation_id(self) -> str: | ||
"""Get the identifier of the invocation request""" | ||
return self["invocationId"] | ||
|
||
@property | ||
def invocation_schema_version(self) -> Literal["1.0", "2.0"]: | ||
""" | ||
Get the schema version for the payload that Batch Operations sends when invoking an | ||
AWS Lambda function. Either '1.0' or '2.0'. | ||
""" | ||
return self["invocationSchemaVersion"] | ||
|
||
@property | ||
def tasks(self) -> Iterator[S3BatchOperationTask]: | ||
"""Get s3 batch operation tasks""" | ||
for task in self["tasks"]: | ||
yield S3BatchOperationTask(task) | ||
|
||
@property | ||
def task(self) -> S3BatchOperationTask: | ||
"""Get the first s3 batch operation task""" | ||
return next(self.tasks) | ||
|
||
@property | ||
def job(self) -> S3BatchOperationJob: | ||
"""Get the s3 batch operation job""" | ||
return S3BatchOperationJob(self["job"]) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
34 changes: 34 additions & 0 deletions
34
aws_lambda_powertools/utilities/parser/models/s3_batch_operation.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
from typing import Any, Dict, List, Optional | ||
|
||
from pydantic import BaseModel, validator | ||
|
||
from aws_lambda_powertools.utilities.parser.types import Literal | ||
|
||
|
||
class S3BatchOperationTaskModel(BaseModel): | ||
taskId: str | ||
s3Key: str | ||
s3VersionId: Optional[str] = None | ||
s3BucketArn: Optional[str] = None | ||
s3Bucket: Optional[str] = None | ||
|
||
@validator("s3Bucket", pre=True, always=True) | ||
def validate_bucket(cls, current_value, values): | ||
# Get the s3 bucket, either from 's3Bucket' property (invocationSchemaVersion '2.0') | ||
# or from 's3BucketArn' (invocationSchemaVersion '1.0') | ||
if values.get("s3BucketArn") and not current_value: | ||
# Replace s3Bucket value with the value from s3BucketArn | ||
return values["s3BucketArn"].split(":::")[-1] | ||
return current_value | ||
|
||
|
||
class S3BatchOperationJobModel(BaseModel): | ||
id: str | ||
userArguments: Optional[Dict[str, Any]] = None | ||
|
||
|
||
class S3BatchOperationModel(BaseModel): | ||
invocationId: str | ||
invocationSchemaVersion: Literal["1.0", "2.0"] | ||
job: S3BatchOperationJobModel | ||
tasks: List[S3BatchOperationTaskModel] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
import boto3 | ||
from botocore.exceptions import ClientError | ||
|
||
from aws_lambda_powertools.utilities.data_classes import S3BatchOperationEvent, S3BatchOperationResponse, event_source | ||
from aws_lambda_powertools.utilities.typing import LambdaContext | ||
|
||
|
||
@event_source(data_class=S3BatchOperationEvent) | ||
def lambda_handler(event: S3BatchOperationEvent, context: LambdaContext): | ||
response = S3BatchOperationResponse(event.invocation_schema_version, event.invocation_id, "PermanentFailure") | ||
|
||
task = event.task | ||
src_key: str = task.s3_key | ||
src_bucket: str = task.s3_bucket | ||
|
||
s3 = boto3.client("s3", region_name="us-east-1") | ||
|
||
try: | ||
dest_bucket, dest_key = do_some_work(s3, src_bucket, src_key) | ||
result = task.build_task_batch_response("Succeeded", f"s3://{dest_bucket}/{dest_key}") | ||
except ClientError as e: | ||
error_code = e.response["Error"]["Code"] | ||
error_message = e.response["Error"]["Message"] | ||
if error_code == "RequestTimeout": | ||
result = task.build_task_batch_response("TemporaryFailure", "Retry request to Amazon S3 due to timeout.") | ||
else: | ||
result = task.build_task_batch_response("PermanentFailure", f"{error_code}: {error_message}") | ||
except Exception as e: | ||
result = task.build_task_batch_response("PermanentFailure", str(e)) | ||
finally: | ||
response.add_result(result) | ||
|
||
return response.asdict() | ||
|
||
|
||
def do_some_work(s3_client, src_bucket: str, src_key: str): | ||
... |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.