diff --git a/docs/media/utilities_data_classes.png b/docs/media/utilities_data_classes.png
index 94ed83bde97..bb224772355 100644
Binary files a/docs/media/utilities_data_classes.png and b/docs/media/utilities_data_classes.png differ
diff --git a/docs/utilities/data_classes.md b/docs/utilities/data_classes.md
index 4cd738fad70..2574d119acd 100644
--- a/docs/utilities/data_classes.md
+++ b/docs/utilities/data_classes.md
@@ -5,115 +5,105 @@ description: Utility
-Event Source Data Classes utility provides classes self-describing Lambda event sources.
+Event Source Data Classes provides self-describing and strongly-typed classes for various AWS Lambda event sources.
## Key features
* Type hinting and code completion for common event types
* Helper functions for decoding/deserializing nested fields
* Docstrings for fields contained in event schemas
-
-**Background**
-
-When authoring Lambda functions, you often need to understand the schema of the event dictionary which is passed to the
-handler. There are several common event types which follow a specific schema, depending on the service triggering the
-Lambda function.
+* Standardized attribute-based access to event properties
## Getting started
-### Utilizing the data classes
+???+ tip
+ All examples shared in this documentation are available within the [project repository](https://github.com/aws-powertools/powertools-lambda-python/tree/develop/examples){target="_blank"}.
+
+There are two ways to use Event Source Data Classes in your Lambda functions.
-The classes are initialized by passing in the Lambda event object into the constructor of the appropriate data class or
-by using the `event_source` decorator.
+**Method 1: Direct Initialization**
-For example, if your Lambda function is being triggered by an API Gateway proxy integration, you can use the
-`APIGatewayProxyEvent` class.
+You can initialize the appropriate data class by passing the Lambda event object to its constructor.
=== "app.py"
```python hl_lines="1 4"
- from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEvent
-
- def lambda_handler(event: dict, context):
- event = APIGatewayProxyEvent(event)
- if 'helloworld' in event.path and event.http_method == 'GET':
- do_something_with(event.body, user)
+ --8<-- "examples/event_sources/src/getting_started_data_classes.py"
```
-Same example as above, but using the `event_source` decorator
-
-=== "app.py"
-
- ```python hl_lines="1 3"
- from aws_lambda_powertools.utilities.data_classes import event_source, APIGatewayProxyEvent
+=== "API Gateway Proxy Example Event"
- @event_source(data_class=APIGatewayProxyEvent)
- def lambda_handler(event: APIGatewayProxyEvent, context):
- if 'helloworld' in event.path and event.http_method == 'GET':
- do_something_with(event.body, user)
+ ```json hl_lines="3-4"
+ --8<-- "examples/event_sources/events/apigw_event.json"
```
-Log Data Event for Troubleshooting
+**Method 2: Using the event_source Decorator**
+
+Alternatively, you can use the `event_source` decorator to automatically parse the event.
=== "app.py"
- ```python hl_lines="4 8"
- from aws_lambda_powertools.utilities.data_classes import event_source, APIGatewayProxyEvent
- from aws_lambda_powertools.logging.logger import Logger
+ ```python hl_lines="1 4"
+ --8<-- "examples/event_sources/src/apigw_proxy_decorator.py"
+ ```
- logger = Logger(service="hello_logs", level="DEBUG")
+=== "API Gateway Proxy Example Event"
- @event_source(data_class=APIGatewayProxyEvent)
- def lambda_handler(event: APIGatewayProxyEvent, context):
- logger.debug(event)
+ ```json hl_lines="3-4"
+ --8<-- "examples/event_sources/events/apigw_event.json"
```
-**Autocomplete with self-documented properties and methods**
+### Autocomplete with self-documented properties and methods
+
+Event Source Data Classes has the ability to leverage IDE autocompletion and inline documentation.
+When using the APIGatewayProxyEvent class, for example, the IDE will offer autocomplete suggestions for various properties and methods.

## Supported event sources
-| Event Source | Data_class |
-|-------------------------------------------------------------------------------|----------------------------------------------------|
-| [Active MQ](#active-mq) | `ActiveMQEvent` |
-| [API Gateway Authorizer](#api-gateway-authorizer) | `APIGatewayAuthorizerRequestEvent` |
-| [API Gateway Authorizer V2](#api-gateway-authorizer-v2) | `APIGatewayAuthorizerEventV2` |
-| [API Gateway Proxy](#api-gateway-proxy) | `APIGatewayProxyEvent` |
-| [API Gateway Proxy V2](#api-gateway-proxy-v2) | `APIGatewayProxyEventV2` |
-| [Application Load Balancer](#application-load-balancer) | `ALBEvent` |
-| [AppSync Authorizer](#appsync-authorizer) | `AppSyncAuthorizerEvent` |
-| [AppSync Resolver](#appsync-resolver) | `AppSyncResolverEvent` |
-| [AWS Config Rule](#aws-config-rule) | `AWSConfigRuleEvent` |
-| [Bedrock Agent](#bedrock-agent) | `BedrockAgent` |
-| [CloudFormation Custom Resource](#cloudformation-custom-resource) | `CloudFormationCustomResourceEvent` |
-| [CloudWatch Alarm State Change Action](#cloudwatch-alarm-state-change-action) | `CloudWatchAlarmEvent` |
-| [CloudWatch Dashboard Custom Widget](#cloudwatch-dashboard-custom-widget) | `CloudWatchDashboardCustomWidgetEvent` |
-| [CloudWatch Logs](#cloudwatch-logs) | `CloudWatchLogsEvent` |
-| [CodeDeploy Lifecycle Hook](#codedeploy-lifecycle-hook) | `CodeDeployLifecycleHookEvent` |
-| [CodePipeline Job Event](#codepipeline-job) | `CodePipelineJobEvent` |
-| [Cognito User Pool](#cognito-user-pool) | Multiple available under `cognito_user_pool_event` |
-| [Connect Contact Flow](#connect-contact-flow) | `ConnectContactFlowEvent` |
-| [DynamoDB streams](#dynamodb-streams) | `DynamoDBStreamEvent`, `DynamoDBRecordEventName` |
-| [EventBridge](#eventbridge) | `EventBridgeEvent` |
-| [Kafka](#kafka) | `KafkaEvent` |
-| [Kinesis Data Stream](#kinesis-streams) | `KinesisStreamEvent` |
-| [Kinesis Firehose Delivery Stream](#kinesis-firehose-delivery-stream) | `KinesisFirehoseEvent` |
-| [Lambda Function URL](#lambda-function-url) | `LambdaFunctionUrlEvent` |
-| [Rabbit MQ](#rabbit-mq) | `RabbitMQEvent` |
-| [S3](#s3) | `S3Event` |
-| [S3 Batch Operations](#s3-batch-operations) | `S3BatchOperationEvent` |
-| [S3 Object Lambda](#s3-object-lambda) | `S3ObjectLambdaEvent` |
-| [S3 EventBridge Notification](#s3-eventbridge-notification) | `S3EventBridgeNotificationEvent` |
-| [SES](#ses) | `SESEvent` |
-| [SNS](#sns) | `SNSEvent` |
-| [SQS](#sqs) | `SQSEvent` |
-| [VPC Lattice V2](#vpc-lattice-v2) | `VPCLatticeV2Event` |
-| [VPC Lattice V1](#vpc-lattice-v1) | `VPCLatticeEvent` |
+Each event source is linked to its corresponding GitHub file with the full set of properties, methods, and docstrings specific to each event type.
+
+| Event Source | Data_class | Properties |
+|--------------|------------|------------|
+| [Active MQ](#active-mq) | `ActiveMQEvent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/active_mq_event.py) |
+| [API Gateway Authorizer](#api-gateway-authorizer) | `APIGatewayAuthorizerRequestEvent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/api_gateway_authorizer_event.py) |
+| [API Gateway Authorizer V2](#api-gateway-authorizer-v2) | `APIGatewayAuthorizerEventV2` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/api_gateway_authorizer_event.py) |
+| [API Gateway Proxy](#api-gateway-proxy) | `APIGatewayProxyEvent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/api_gateway_proxy_event.py) |
+| [API Gateway Proxy V2](#api-gateway-proxy-v2) | `APIGatewayProxyEventV2` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/api_gateway_proxy_event.py) |
+| [Application Load Balancer](#application-load-balancer) | `ALBEvent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/alb_event.py) |
+| [AppSync Authorizer](#appsync-authorizer) | `AppSyncAuthorizerEvent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/appsync_authorizer_event.py) |
+| [AppSync Resolver](#appsync-resolver) | `AppSyncResolverEvent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/appsync_resolver_event.py) |
+| [AWS Config Rule](#aws-config-rule) | `AWSConfigRuleEvent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/aws_config_rule_event.py) |
+| [Bedrock Agent](#bedrock-agent) | `BedrockAgent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/bedrock_agent_event.py) |
+| [CloudFormation Custom Resource](#cloudformation-custom-resource) | `CloudFormationCustomResourceEvent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/cloudformation_custom_resource_event.py) |
+| [CloudWatch Alarm State Change Action](#cloudwatch-alarm-state-change-action) | `CloudWatchAlarmEvent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/cloud_watch_alarm_event.py) |
+| [CloudWatch Dashboard Custom Widget](#cloudwatch-dashboard-custom-widget) | `CloudWatchDashboardCustomWidgetEvent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/cloud_watch_custom_widget_event.py) |
+| [CloudWatch Logs](#cloudwatch-logs) | `CloudWatchLogsEvent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/cloud_watch_logs_event.py) |
+| [CodeDeploy Lifecycle Hook](#codedeploy-lifecycle-hook) | `CodeDeployLifecycleHookEvent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/code_deploy_lifecycle_hook_event.py) |
+| [CodePipeline Job Event](#codepipeline-job) | `CodePipelineJobEvent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/code_pipeline_job_event.py) |
+| [Cognito User Pool](#cognito-user-pool) | Multiple available under `cognito_user_pool_event` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/cognito_user_pool_event.py) |
+| [Connect Contact Flow](#connect-contact-flow) | `ConnectContactFlowEvent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/connect_contact_flow_event.py) |
+| [DynamoDB streams](#dynamodb-streams) | `DynamoDBStreamEvent`, `DynamoDBRecordEventName` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py) |
+| [EventBridge](#eventbridge) | `EventBridgeEvent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/event_bridge_event.py) |
+| [Kafka](#kafka) | `KafkaEvent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/kafka_event.py) |
+| [Kinesis Data Stream](#kinesis-streams) | `KinesisStreamEvent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/kinesis_stream_event.py) |
+| [Kinesis Firehose Delivery Stream](#kinesis-firehose-delivery-stream) | `KinesisFirehoseEvent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py) |
+| [Lambda Function URL](#lambda-function-url) | `LambdaFunctionUrlEvent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/lambda_function_url_event.py) |
+| [Rabbit MQ](#rabbit-mq) | `RabbitMQEvent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/rabbit_mq_event.py) |
+| [S3](#s3) | `S3Event` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/s3_event.py) |
+| [S3 Batch Operations](#s3-batch-operations) | `S3BatchOperationEvent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/s3_batch_operation_event.py) |
+| [S3 Object Lambda](#s3-object-lambda) | `S3ObjectLambdaEvent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/s3_object_event.py) |
+| [S3 EventBridge Notification](#s3-eventbridge-notification) | `S3EventBridgeNotificationEvent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/s3_event.py) |
+| [SES](#ses) | `SESEvent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/ses_event.py) |
+| [SNS](#sns) | `SNSEvent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/sns_event.py) |
+| [SQS](#sqs) | `SQSEvent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/sqs_event.py) |
+| [VPC Lattice V2](#vpc-lattice-v2) | `VPCLatticeV2Event` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/vpc_lattice.py) |
+| [VPC Lattice V1](#vpc-lattice-v1) | `VPCLatticeEvent` | [Github](https://github.com/aws-powertools/powertools-lambda-python/blob/develop/aws_lambda_powertools/utilities/data_classes/vpc_lattice.py) |
???+ info
- The examples provided below are far from exhaustive - the data classes themselves are designed to provide a form of
- documentation inherently (via autocompletion, types and docstrings).
+ The examples showcase a subset of Event Source Data Classes capabilities - for comprehensive details, leverage your IDE's
+ autocompletion, refer to type hints and docstrings, and explore the [full API reference](https://docs.powertools.aws.dev/lambda/python/latest/api/utilities/data_classes/) for complete property listings of each event source.
### Active MQ
@@ -123,155 +113,61 @@ for more details.
=== "app.py"
- ```python hl_lines="4-5 9-10"
- from typing import Dict
-
- from aws_lambda_powertools import Logger
- from aws_lambda_powertools.utilities.data_classes import event_source
- from aws_lambda_powertools.utilities.data_classes.active_mq_event import ActiveMQEvent
+ ```python hl_lines="5 10"
+ --8<-- "examples/event_sources/src/active_mq_example.py"
+ ```
- logger = Logger()
+=== "Active MQ Example Event"
- @event_source(data_class=ActiveMQEvent)
- def lambda_handler(event: ActiveMQEvent, context):
- for message in event.messages:
- logger.debug(f"MessageID: {message.message_id}")
- data: Dict = message.json_data
- logger.debug("Process json in base64 encoded data str", data)
+ ```json hl_lines="6 9 18 21"
+ --8<-- "tests/events/activeMQEvent.json"
```
### API Gateway Authorizer
-> New in 1.20.0
-
It is used for [API Gateway Rest API Lambda Authorizer payload](https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-use-lambda-authorizer.html){target="_blank"}.
Use **`APIGatewayAuthorizerRequestEvent`** for type `REQUEST` and **`APIGatewayAuthorizerTokenEvent`** for type `TOKEN`.
-=== "app_type_request.py"
-
- This example uses the `APIGatewayAuthorizerResponse` to decline a given request if the user is not found.
-
- When the user is found, it includes the user details in the request context that will be available to the back-end, and returns a full access policy for admin users.
-
- ```python hl_lines="2-6 29 36-42 47 49"
- from aws_lambda_powertools.utilities.data_classes import event_source
- from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import (
- DENY_ALL_RESPONSE,
- APIGatewayAuthorizerRequestEvent,
- APIGatewayAuthorizerResponse,
- HttpVerb,
- )
- from secrets import compare_digest
-
-
- def get_user_by_token(token):
- if compare_digest(token, "admin-foo"):
- return {"id": 0, "name": "Admin", "isAdmin": True}
- elif compare_digest(token, "regular-foo"):
- return {"id": 1, "name": "Joe"}
- else:
- return None
-
-
- @event_source(data_class=APIGatewayAuthorizerRequestEvent)
- def handler(event: APIGatewayAuthorizerRequestEvent, context):
- user = get_user_by_token(event.headers["Authorization"])
-
- if user is None:
- # No user was found
- # to return 401 - `{"message":"Unauthorized"}`, but pollutes lambda error count metrics
- # raise Exception("Unauthorized")
- # to return 403 - `{"message":"Forbidden"}`
- return DENY_ALL_RESPONSE
-
- # parse the `methodArn` as an `APIGatewayRouteArn`
- arn = event.parsed_arn
-
- # Create the response builder from parts of the `methodArn`
- # and set the logged in user id and context
- policy = APIGatewayAuthorizerResponse(
- principal_id=user["id"],
- context=user,
- region=arn.region,
- aws_account_id=arn.aws_account_id,
- api_id=arn.api_id,
- stage=arn.stage,
- )
-
- # Conditional IAM Policy
- if user.get("isAdmin", False):
- policy.allow_all_routes()
- else:
- policy.allow_route(HttpVerb.GET.value, "/user-profile")
-
- return policy.asdict()
- ```
-=== "app_type_token.py"
-
- ```python hl_lines="2-5 12-18 21 23-24"
- from aws_lambda_powertools.utilities.data_classes import event_source
- from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import (
- APIGatewayAuthorizerTokenEvent,
- APIGatewayAuthorizerResponse,
- )
-
-
- @event_source(data_class=APIGatewayAuthorizerTokenEvent)
- def handler(event: APIGatewayAuthorizerTokenEvent, context):
- arn = event.parsed_arn
-
- policy = APIGatewayAuthorizerResponse(
- principal_id="user",
- region=arn.region,
- aws_account_id=arn.aws_account_id,
- api_id=arn.api_id,
- stage=arn.stage
- )
-
- if event.authorization_token == "42":
- policy.allow_all_routes()
- else:
- policy.deny_all_routes()
- return policy.asdict()
+=== "app.py"
+
+ ```python hl_lines="2-4 8"
+ --8<-- "examples/event_sources/src/apigw_authorizer_request.py"
```
-### API Gateway Authorizer V2
+=== "API Gateway Authorizer Request Example Event"
-> New in 1.20.0
+ ```json hl_lines="3 11"
+ --8<-- "tests/events/apiGatewayAuthorizerRequestEvent.json"
+ ```
-It is used for [API Gateway HTTP API Lambda Authorizer payload version 2](https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-lambda-authorizer.html){target="_blank"}.
-See also [this blog post](https://aws.amazon.com/blogs/compute/introducing-iam-and-lambda-authorizers-for-amazon-api-gateway-http-apis/){target="_blank"} for more details.
+=== "app_token.py"
-=== "app.py"
+ ```python hl_lines="2-4 8"
+ --8<-- "examples/event_sources/src/apigw_authorizer_token.py"
+ ```
- This example looks up user details via `x-token` header. It uses `APIGatewayAuthorizerResponseV2` to return a deny policy when user is not found or authorized.
+=== "API Gateway Authorizer Token Example Event"
- ```python hl_lines="2-5 21 24"
- from aws_lambda_powertools.utilities.data_classes import event_source
- from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import (
- APIGatewayAuthorizerEventV2,
- APIGatewayAuthorizerResponseV2,
- )
- from secrets import compare_digest
+ ```json hl_lines="2 3"
+ --8<-- "tests/events/apiGatewayAuthorizerTokenEvent.json"
+ ```
+### API Gateway Authorizer V2
- def get_user_by_token(token):
- if compare_digest(token, "Foo"):
- return {"name": "Foo"}
- return None
+It is used for [API Gateway HTTP API Lambda Authorizer payload version 2](https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-lambda-authorizer.html){target="_blank"}.
+See also [this blog post](https://aws.amazon.com/blogs/compute/introducing-iam-and-lambda-authorizers-for-amazon-api-gateway-http-apis/){target="_blank"} for more details.
+=== "app.py"
- @event_source(data_class=APIGatewayAuthorizerEventV2)
- def handler(event: APIGatewayAuthorizerEventV2, context):
- user = get_user_by_token(event.headers["x-token"])
+ ```python hl_lines="4-6 16"
+ --8<-- "examples/event_sources/src/apigw_auth_v2.py"
+ ```
- if user is None:
- # No user was found, so we return not authorized
- return APIGatewayAuthorizerResponseV2().asdict()
+=== "API Gateway Authorizer V2 Example Event"
- # Found the user and setting the details in the context
- return APIGatewayAuthorizerResponseV2(authorize=True, context=user).asdict()
+ ```json
+ --8<-- "tests/events/apiGatewayAuthorizerV2Event.json"
```
### API Gateway Proxy
@@ -280,16 +176,14 @@ It is used for either API Gateway REST API or HTTP API using v1 proxy event.
=== "app.py"
- ```python
- from aws_lambda_powertools.utilities.data_classes import event_source, APIGatewayProxyEvent
+ ```python hl_lines="1 4"
+ --8<-- "examples/event_sources/src/apigw_proxy_decorator.py"
+ ```
+
+=== "API Gateway Proxy Example Event"
- @event_source(data_class=APIGatewayProxyEvent)
- def lambda_handler(event: APIGatewayProxyEvent, context):
- if "helloworld" in event.path and event.http_method == "GET":
- request_context = event.request_context
- identity = request_context.identity
- user = identity.user
- do_something_with(event.json_body, user)
+ ```json hl_lines="3 4"
+ --8<-- "examples/event_sources/events/apigw_event.json"
```
### API Gateway Proxy V2
@@ -298,245 +192,126 @@ It is used for HTTP API using v2 proxy event.
=== "app.py"
- ```python
- from aws_lambda_powertools.utilities.data_classes import event_source, APIGatewayProxyEventV2
+ ```python hl_lines="1 4"
+ --8<-- "examples/event_sources/src/apigw_proxy_v2.py"
+ ```
+
+=== "API Gateway Proxy V2 Example Event"
- @event_source(data_class=APIGatewayProxyEventV2)
- def lambda_handler(event: APIGatewayProxyEventV2, context):
- if "helloworld" in event.path and event.http_method == "POST":
- do_something_with(event.json_body, event.query_string_parameters)
+ ```json
+ --8<-- "tests/events/apiGatewayProxyV2Event.json"
```
### Application Load Balancer
-Is it used for Application load balancer event.
+Is it used for [Application load balancer](https://docs.aws.amazon.com/elasticloadbalancing/latest/application/introduction.html) event.
=== "app.py"
- ```python
- from aws_lambda_powertools.utilities.data_classes import event_source, ALBEvent
+ ```python hl_lines="1 4"
+ --8<-- "examples/event_sources/src/albEvent.py"
+ ```
+
+=== "Application Load Balancer Example Event"
- @event_source(data_class=ALBEvent)
- def lambda_handler(event: ALBEvent, context):
- if "helloworld" in event.path and event.http_method == "POST":
- do_something_with(event.json_body, event.query_string_parameters)
+ ```json hl_lines="7 8"
+ --8<-- "tests/events/albEvent.json"
```
### AppSync Authorizer
-> New in 1.20.0
-
Used when building an [AWS_LAMBDA Authorization](https://docs.aws.amazon.com/appsync/latest/devguide/security-authz.html#aws-lambda-authorization){target="_blank"} with AppSync.
See blog post [Introducing Lambda authorization for AWS AppSync GraphQL APIs](https://aws.amazon.com/blogs/mobile/appsync-lambda-auth/){target="_blank"}
or read the Amplify documentation on using [AWS Lambda for authorization](https://docs.amplify.aws/lib/graphqlapi/authz/q/platform/js#aws-lambda){target="_blank"} with AppSync.
-In this example extract the `requestId` as the `correlation_id` for logging, used `@event_source` decorator and builds the AppSync authorizer using the `AppSyncAuthorizerResponse` helper.
-
=== "app.py"
- ```python
- from typing import Dict
-
- from aws_lambda_powertools.logging import correlation_paths
- from aws_lambda_powertools.logging.logger import Logger
- from aws_lambda_powertools.utilities.data_classes.appsync_authorizer_event import (
- AppSyncAuthorizerEvent,
- AppSyncAuthorizerResponse,
- )
- from aws_lambda_powertools.utilities.data_classes.event_source import event_source
-
- logger = Logger()
-
-
- def get_user_by_token(token: str):
- """Look a user by token"""
- ...
-
-
- @logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_AUTHORIZER)
- @event_source(data_class=AppSyncAuthorizerEvent)
- def lambda_handler(event: AppSyncAuthorizerEvent, context) -> Dict:
- user = get_user_by_token(event.authorization_token)
+ ```python hl_lines="5-7 20"
+ --8<-- "examples/event_sources/src/appSyncAuthorizer.py"
+ ```
- if not user:
- # No user found, return not authorized
- return AppSyncAuthorizerResponse().asdict()
+=== "AppSync Authorizer Example Event"
- return AppSyncAuthorizerResponse(
- authorize=True,
- resolver_context={"id": user.id},
- # Only allow admins to delete events
- deny_fields=None if user.is_admin else ["Mutation.deleteEvent"],
- ).asdict()
+ ```json
+ --8<-- "tests/events/appSyncAuthorizerEvent.json"
```
### AppSync Resolver
-> New in 1.12.0
-
Used when building Lambda GraphQL Resolvers with [Amplify GraphQL Transform Library](https://docs.amplify.aws/cli/graphql-transformer/function){target="_blank"} (`@function`),
and [AppSync Direct Lambda Resolvers](https://aws.amazon.com/blogs/mobile/appsync-direct-lambda/){target="_blank"}.
-In this example, we also use the new Logger `correlation_id` and built-in `correlation_paths` to extract, if available, X-Ray Trace ID in AppSync request headers:
+The example serves as an AppSync resolver for the `locations` field of the `Merchant` type. It uses the `@event_source` decorator to parse the AppSync event, handles pagination and filtering for locations, and demonstrates `AppSyncIdentityCognito`.
=== "app.py"
- ```python hl_lines="2-5 12 14 19 21 29-30"
- from aws_lambda_powertools.logging import Logger, correlation_paths
- from aws_lambda_powertools.utilities.data_classes.appsync_resolver_event import (
- AppSyncResolverEvent,
- AppSyncIdentityCognito
- )
-
- logger = Logger()
-
- def get_locations(name: str = None, size: int = 0, page: int = 0):
- """Your resolver logic here"""
-
- @logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_RESOLVER)
- def lambda_handler(event, context):
- event: AppSyncResolverEvent = AppSyncResolverEvent(event)
-
- # Case insensitive look up of request headers
- x_forwarded_for = event.headers.get("x-forwarded-for")
-
- # Support for AppSyncIdentityCognito or AppSyncIdentityIAM identity types
- assert isinstance(event.identity, AppSyncIdentityCognito)
- identity: AppSyncIdentityCognito = event.identity
-
- # Logging with correlation_id
- logger.debug({
- "x-forwarded-for": x_forwarded_for,
- "username": identity.username
- })
-
- if event.type_name == "Merchant" and event.field_name == "locations":
- return get_locations(**event.arguments)
-
- raise ValueError(f"Unsupported field resolver: {event.field_name}")
-
- ```
-
-=== "Example AppSync Event"
-
- ```json hl_lines="2-8 14 19 20"
- {
- "typeName": "Merchant",
- "fieldName": "locations",
- "arguments": {
- "page": 2,
- "size": 1,
- "name": "value"
- },
- "identity": {
- "claims": {
- "iat": 1615366261
- ...
- },
- "username": "mike",
- ...
- },
- "request": {
- "headers": {
- "x-amzn-trace-id": "Root=1-60488877-0b0c4e6727ab2a1c545babd0",
- "x-forwarded-for": "127.0.0.1"
- ...
- }
- },
- ...
- }
- ```
-
-=== "Example CloudWatch Log"
-
- ```json hl_lines="5 6 16"
- {
- "level":"DEBUG",
- "location":"lambda_handler:22",
- "message":{
- "x-forwarded-for":"127.0.0.1",
- "username":"mike"
- },
- "timestamp":"2021-03-10 12:38:40,062",
- "service":"service_undefined",
- "sampling_rate":0.0,
- "cold_start":true,
- "function_name":"func_name",
- "function_memory_size":512,
- "function_arn":"func_arn",
- "function_request_id":"6735a29c-c000-4ae3-94e6-1f1c934f7f94",
- "correlation_id":"Root=1-60488877-0b0c4e6727ab2a1c545babd0"
- }
+ ```python hl_lines="2-4 9"
+ --8<-- "examples/event_sources/src/appSyncResolver.py"
+ ```
+
+=== "AppSync Resolver Example Event"
+
+ ```json
+ --8<-- "tests/events/appSyncResolverEvent.json"
```
### AWS Config Rule
-=== "aws_config_rule.py"
- ```python hl_lines="3 11"
+The example utilizes AWSConfigRuleEvent to parse the incoming event. The function logs the message type of the invoking event and returns a simple success response. The example event receives a Scheduled Event Notification, but could also be ItemChanged and Oversized.
+
+=== "app.py"
+ ```python hl_lines="2-3 10"
--8<-- "examples/event_sources/src/aws_config_rule.py"
```
-=== "Event - ItemChanged"
+=== "ScheduledNotification Example Event"
```json
- --8<-- "examples/event_sources/src/aws_config_rule_item_changed.json"
- ```
-=== "Event - Oversized"
- ```json
- --8<-- "examples/event_sources/src/aws_config_rule_oversized.json"
- ```
-=== "Event - ScheduledNotification"
- ```json
- --8<-- "examples/event_sources/src/aws_config_rule_scheduled.json"
+ --8<-- "tests/events/awsConfigRuleScheduled.json"
```
### Bedrock Agent
+The example handles [Bedrock Agent event](https://aws.amazon.com/bedrock/agents/) with `BedrockAgentEvent` to parse the incoming event. The function logs the action group and input text, then returns a structured response compatible with Bedrock Agent's expected format, including a mock response body.
+
=== "app.py"
- ```python hl_lines="2 8 10"
- --8<-- "examples/event_sources/src/bedrock_agent_event.py"
+ ```python hl_lines="2 7"
+ --8<-- "examples/event_sources/src/bedrock_agent.py"
+ ```
+
+=== "Bedrock Agent Example Event"
+ ```json
+ --8<-- "tests/events/bedrockAgentEvent.json"
```
### CloudFormation Custom Resource
+The example focuses on the `Create` request type, generating a unique physical resource ID and logging the process. The function is structured to potentially handle `Update` and `Delete` operations as well.
+
=== "app.py"
- ```python hl_lines="11 13 15 17 19"
+ ```python hl_lines="2-3 11 15 21"
--8<-- "examples/event_sources/src/cloudformation_custom_resource_handler.py"
```
-### CloudWatch Dashboard Custom Widget
-
-=== "app.py"
-
- ```python
- from aws_lambda_powertools.utilities.data_classes import event_source, CloudWatchDashboardCustomWidgetEvent
-
- const DOCS = `
- ## Echo
- A simple echo script. Anything passed in \`\`\`echo\`\`\` parameter is returned as the content of custom widget.
+=== "CloudFormation Custom Resource Example Event"
+ ```json
+ --8<-- "tests/events/cloudformationCustomResourceCreate.json"
+ ```
- ### Widget parameters
- | Param | Description |
- | -------- | ------------------------ |
- | **echo** | The content to echo back |
+### CloudWatch Dashboard Custom Widget
- ### Example parameters
- \`\`\` yaml
- echo:
Hello world
- \`\`\`
- `
+Thie example for `CloudWatchDashboardCustomWidgetEvent` logs the dashboard name, extracts key information like widget ID and time range, and returns a formatted response with a title and markdown content. Read more about [custom widgets for Cloudwatch dashboard](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/add_custom_widget_samples.html).
- @event_source(data_class=CloudWatchDashboardCustomWidgetEvent)
- def lambda_handler(event: CloudWatchDashboardCustomWidgetEvent, context):
+=== "app.py"
- if event.describe:
- return DOCS
+ ```python hl_lines="2 7"
+ --8<-- "examples/event_sources/src/cloudWatchDashboard.py"
+ ```
- # You can directly return HTML or JSON content
- # Alternatively, you can return markdown that will be rendered by CloudWatch
- echo = event.widget_context.params["echo"]
- return { "markdown": f"# {echo}" }
+=== "CloudWatch Dashboard Example Event"
+ ```json
+ --8<-- "tests/events/cloudWatchDashboardEvent.json"
```
### CloudWatch Alarm State Change Action
@@ -550,6 +325,11 @@ You can use the `CloudWathAlarmEvent` data class to access the fields containing
--8<-- "examples/event_sources/src/cloudwatch_alarm_event.py"
```
+=== "CloudWatch Alarm Example Event"
+ ```json
+ --8<-- "tests/events/cloudWatchAlarmEventSingleMetric.json"
+ ```
+
### CloudWatch Logs
CloudWatch Logs events by default are compressed and base64 encoded. You can use the helper function provided to decode,
@@ -557,16 +337,13 @@ decompress and parse json data from the event.
=== "app.py"
- ```python
- from aws_lambda_powertools.utilities.data_classes import event_source, CloudWatchLogsEvent
- from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import CloudWatchLogsDecodedData
+ ```python hl_lines="2-3 8"
+ --8<-- "examples/event_sources/src/cloudwatch_logs.py"
+ ```
- @event_source(data_class=CloudWatchLogsEvent)
- def lambda_handler(event: CloudWatchLogsEvent, context):
- decompressed_log: CloudWatchLogsDecodedData = event.parse_logs_data()
- log_events = decompressed_log.log_events
- for event in log_events:
- do_something_with(event.timestamp, event.message)
+=== "CloudWatch Logs Example Event"
+ ```json
+ --8<-- "tests/events/cloudWatchLogEvent.json"
```
#### Kinesis integration
@@ -575,45 +352,26 @@ decompress and parse json data from the event.
=== "app.py"
- ```python hl_lines="5-6 11"
- from typing import List
-
- from aws_lambda_powertools.utilities.data_classes import event_source
- from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import CloudWatchLogsDecodedData
- from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import (
- KinesisStreamEvent, extract_cloudwatch_logs_from_event)
-
+ ```python hl_lines="5-7 11"
+ --8<-- "examples/event_sources/src/kinesisStreamCloudWatchLogs.py"
+ ```
- @event_source(data_class=KinesisStreamEvent)
- def simple_handler(event: KinesisStreamEvent, context):
- logs: List[CloudWatchLogsDecodedData] = extract_cloudwatch_logs_from_event(event)
- for log in logs:
- if log.message_type == "DATA_MESSAGE":
- return "success"
- return "nothing to be processed"
+=== "Kinesis Stream CloudWatch Logs Example Event"
+ ```json
+ --8<-- "tests/events/kinesisStreamCloudWatchLogsEvent.json"
```
Alternatively, you can use `extract_cloudwatch_logs_from_record` to seamless integrate with the [Batch utility](./batch.md){target="_blank"} for more robust log processing.
=== "app.py"
- ```python hl_lines="3-4 10"
- from aws_lambda_powertools.utilities.batch import (BatchProcessor, EventType,
- batch_processor)
- from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import (
- KinesisStreamRecord, extract_cloudwatch_logs_from_record)
-
- processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
-
-
- def record_handler(record: KinesisStreamRecord):
- log = extract_cloudwatch_logs_from_record(record)
- return log.message_type == "DATA_MESSAGE"
-
+ ```python hl_lines="7-9 18"
+ --8<-- "examples/event_sources/src/kinesis_batch_example.py"
+ ```
- @batch_processor(record_handler=record_handler, processor=processor)
- def lambda_handler(event, context):
- return processor.response()
+=== "Kinesis Stream CloudWatch Logs Example Event"
+ ```json
+ --8<-- "tests/events/kinesisStreamCloudWatchLogsEvent.json"
```
### CodeDeploy LifeCycle Hook
@@ -623,77 +381,27 @@ CodeDeploy triggers Lambdas with this event when defined in
to test applications at different stages of deployment.
=== "app.py"
- ```python
- from aws_lambda_powertools import Logger
- from aws_lambda_powertools.utilities.data_classes import (
- event_source,
- CodeDeployLifecycleHookEvent,
- )
- logger = Logger()
+ ```python hl_lines="1 4"
+ --8<-- "examples/event_sources/src/codedeploy_lifecycle_hook.py"
+ ```
- def lambda_handler(
- event: CodeDeployLifecycleHookEvent, context: LambdaContext
- ) -> None:
- deployment_id = event.deployment_id
- lifecycle_event_hook_execution_id = event.lifecycle_event_hook_execution_id
+=== "CodeDeploy LifeCycle Hook Example Event"
+ ```json
+ --8<-- "tests/events/codeDeployLifecycleHookEvent.json"
```
### CodePipeline Job
-Data classes and utility functions to help create continuous delivery pipelines tasks with AWS Lambda
+Data classes and utility functions to help create continuous delivery pipelines tasks with AWS Lambda.
=== "app.py"
-
- ```python
- from aws_lambda_powertools import Logger
- from aws_lambda_powertools.utilities.data_classes import event_source, CodePipelineJobEvent
-
- logger = Logger()
-
- @event_source(data_class=CodePipelineJobEvent)
- def lambda_handler(event, context):
- """The Lambda function handler
-
- If a continuing job then checks the CloudFormation stack status
- and updates the job accordingly.
-
- If a new job then kick of an update or creation of the target
- CloudFormation stack.
- """
-
- # Extract the Job ID
- job_id = event.get_id
-
- # Extract the params
- params: dict = event.decoded_user_parameters
- stack = params["stack"]
- artifact_name = params["artifact"]
- template_file = params["file"]
-
- try:
- if event.data.continuation_token:
- # If we're continuing then the create/update has already been triggered
- # we just need to check if it has finished.
- check_stack_update_status(job_id, stack)
- else:
- template = event.get_artifact(artifact_name, template_file)
- # Kick off a stack update or create
- result = start_update_or_create(job_id, stack, template)
- artifact: io.BytesIO = zip_data(result)
- event.put_artifact(
- artifact_name=event.data.output_artifacts[0].name,
- body=artifact,
- content_type="application/zip"
- )
- except Exception as e:
- # If any other exceptions which we didn't expect are raised
- # then fail the job and log the exception message.
- logger.exception("Function failed due to exception.")
- put_job_failure(job_id, "Function exception: " + str(e))
-
- logger.debug("Function complete.")
- return "Complete."
+ ```python hl_lines="1 4"
+ --8<-- "examples/event_sources/src/code_pipeline_job.py"
+ ```
+=== "CodePipeline Job Example Event"
+ ```json hl_lines="3 19"
+ --8<-- "tests/events/codePipelineEvent.json"
```
### Cognito User Pool
@@ -717,18 +425,19 @@ can be imported from `aws_lambda_powertools.data_classes.cognito_user_pool_event
| Custom Email Sender | `data_classes.cognito_user_pool_event.CustomEmailSenderTriggerEvent` |
| Custom SMS Sender | `data_classes.cognito_user_pool_event.CustomSMSSenderTriggerEvent` |
+Some examples for the Cognito User Pools Lambda triggers sources:
+
#### Post Confirmation Example
=== "app.py"
- ```python
- from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import PostConfirmationTriggerEvent
-
- def lambda_handler(event, context):
- event: PostConfirmationTriggerEvent = PostConfirmationTriggerEvent(event)
+ ```python hl_lines="1 5"
+ --8<-- "examples/event_sources/src/cognito_post_confirmation.py"
+ ```
- user_attributes = event.request.user_attributes
- do_something_with(user_attributes)
+=== "Cognito Post Confirmation Example Event"
+ ```json hl_lines="12-14"
+ --8<-- "tests/events/cognitoPostConfirmationEvent.json"
```
#### Define Auth Challenge Example
@@ -740,152 +449,13 @@ This example is based on the AWS Cognito docs for [Define Auth Challenge Lambda
=== "app.py"
- ```python
- from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import DefineAuthChallengeTriggerEvent
-
- def handler(event: dict, context) -> dict:
- event: DefineAuthChallengeTriggerEvent = DefineAuthChallengeTriggerEvent(event)
- if (
- len(event.request.session) == 1
- and event.request.session[0].challenge_name == "SRP_A"
- ):
- event.response.issue_tokens = False
- event.response.fail_authentication = False
- event.response.challenge_name = "PASSWORD_VERIFIER"
- elif (
- len(event.request.session) == 2
- and event.request.session[1].challenge_name == "PASSWORD_VERIFIER"
- and event.request.session[1].challenge_result
- ):
- event.response.issue_tokens = False
- event.response.fail_authentication = False
- event.response.challenge_name = "CUSTOM_CHALLENGE"
- elif (
- len(event.request.session) == 3
- and event.request.session[2].challenge_name == "CUSTOM_CHALLENGE"
- and event.request.session[2].challenge_result
- ):
- event.response.issue_tokens = True
- event.response.fail_authentication = False
- else:
- event.response.issue_tokens = False
- event.response.fail_authentication = True
-
- return event.raw_event
- ```
-=== "SPR_A response"
-
- ```json hl_lines="25-27"
- {
- "version": "1",
- "region": "us-east-1",
- "userPoolId": "us-east-1_example",
- "userName": "UserName",
- "callerContext": {
- "awsSdkVersion": "awsSdkVersion",
- "clientId": "clientId"
- },
- "triggerSource": "DefineAuthChallenge_Authentication",
- "request": {
- "userAttributes": {
- "sub": "4A709A36-7D63-4785-829D-4198EF10EBDA",
- "email_verified": "true",
- "name": "First Last",
- "email": "define-auth@mail.com"
- },
- "session": [
- {
- "challengeName": "SRP_A",
- "challengeResult": true
- }
- ]
- },
- "response": {
- "issueTokens": false,
- "failAuthentication": false,
- "challengeName": "PASSWORD_VERIFIER"
- }
- }
- ```
-=== "PASSWORD_VERIFIER success response"
-
- ```json hl_lines="30-32"
- {
- "version": "1",
- "region": "us-east-1",
- "userPoolId": "us-east-1_example",
- "userName": "UserName",
- "callerContext": {
- "awsSdkVersion": "awsSdkVersion",
- "clientId": "clientId"
- },
- "triggerSource": "DefineAuthChallenge_Authentication",
- "request": {
- "userAttributes": {
- "sub": "4A709A36-7D63-4785-829D-4198EF10EBDA",
- "email_verified": "true",
- "name": "First Last",
- "email": "define-auth@mail.com"
- },
- "session": [
- {
- "challengeName": "SRP_A",
- "challengeResult": true
- },
- {
- "challengeName": "PASSWORD_VERIFIER",
- "challengeResult": true
- }
- ]
- },
- "response": {
- "issueTokens": false,
- "failAuthentication": false,
- "challengeName": "CUSTOM_CHALLENGE"
- }
- }
-
- ```
-=== "CUSTOM_CHALLENGE success response"
-
- ```json hl_lines="34 35"
- {
- "version": "1",
- "region": "us-east-1",
- "userPoolId": "us-east-1_example",
- "userName": "UserName",
- "callerContext": {
- "awsSdkVersion": "awsSdkVersion",
- "clientId": "clientId"
- },
- "triggerSource": "DefineAuthChallenge_Authentication",
- "request": {
- "userAttributes": {
- "sub": "4A709A36-7D63-4785-829D-4198EF10EBDA",
- "email_verified": "true",
- "name": "First Last",
- "email": "define-auth@mail.com"
- },
- "session": [
- {
- "challengeName": "SRP_A",
- "challengeResult": true
- },
- {
- "challengeName": "PASSWORD_VERIFIER",
- "challengeResult": true
- },
- {
- "challengeName": "CUSTOM_CHALLENGE",
- "challengeResult": true
- }
- ]
- },
- "response": {
- "issueTokens": true,
- "failAuthentication": false
- }
- }
+ ```python hl_lines="1 5"
+ --8<-- "examples/event_sources/src/cognito_define_auth.py"
+ ```
+
+=== "Cognito Define Auth Challengen Example Event"
+ ```json
+ --8<-- "tests/events/cognitoDefineAuthChallengeEvent.json"
```
#### Create Auth Challenge Example
@@ -894,17 +464,13 @@ This example is based on the AWS Cognito docs for [Create Auth Challenge Lambda
=== "app.py"
- ```python
- from aws_lambda_powertools.utilities.data_classes import event_source
- from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import CreateAuthChallengeTriggerEvent
+ ```python hl_lines="2 5"
+ --8<-- "examples/event_sources/src/cognito_create_auth.py"
+ ```
- @event_source(data_class=CreateAuthChallengeTriggerEvent)
- def handler(event: CreateAuthChallengeTriggerEvent, context) -> dict:
- if event.request.challenge_name == "CUSTOM_CHALLENGE":
- event.response.public_challenge_parameters = {"captchaUrl": "url/123.jpg"}
- event.response.private_challenge_parameters = {"answer": "5"}
- event.response.challenge_metadata = "CAPTCHA_CHALLENGE"
- return event.raw_event
+=== "Cognito Create Auth Challengen Example Event"
+ ```json
+ --8<-- "tests/events/cognitoCreateAuthChallengeEvent.json"
```
#### Verify Auth Challenge Response Example
@@ -913,38 +479,28 @@ This example is based on the AWS Cognito docs for [Verify Auth Challenge Respons
=== "app.py"
- ```python
- from aws_lambda_powertools.utilities.data_classes import event_source
- from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import VerifyAuthChallengeResponseTriggerEvent
+ ```python hl_lines="2 5"
+ --8<-- "examples/event_sources/src/cognito_verify_auth.py"
+ ```
- @event_source(data_class=VerifyAuthChallengeResponseTriggerEvent)
- def handler(event: VerifyAuthChallengeResponseTriggerEvent, context) -> dict:
- event.response.answer_correct = (
- event.request.private_challenge_parameters.get("answer") == event.request.challenge_answer
- )
- return event.raw_event
+=== "Cognito Verify Auth Challengen Example Event"
+ ```json
+ --8<-- "tests/events/cognitoVerifyAuthChallengeResponseEvent.json"
```
### Connect Contact Flow
-> New in 1.11.0
+The example integrates with [Amazon Connect](https://docs.aws.amazon.com/connect/latest/adminguide/what-is-amazon-connect.html) by handling contact flow events. The function converts the event into a `ConnectContactFlowEvent` object, providing a structured representation of the contact flow data.
=== "app.py"
- ```python
- from aws_lambda_powertools.utilities.data_classes.connect_contact_flow_event import (
- ConnectContactFlowChannel,
- ConnectContactFlowEndpointType,
- ConnectContactFlowEvent,
- ConnectContactFlowInitiationMethod,
- )
+ ```python hl_lines="1-5 10"
+ --8<-- "examples/event_sources/src/connect_contact_flow.py"
+ ```
- def lambda_handler(event, context):
- event: ConnectContactFlowEvent = ConnectContactFlowEvent(event)
- assert event.contact_data.attributes == {"Language": "en-US"}
- assert event.contact_data.channel == ConnectContactFlowChannel.VOICE
- assert event.contact_data.customer_endpoint.endpoint_type == ConnectContactFlowEndpointType.TELEPHONE_NUMBER
- assert event.contact_data.initiation_method == ConnectContactFlowInitiationMethod.API
+=== "Connect Contact Flow Example Event"
+ ```json
+ --8<-- "tests/events/connectContactFlowEventAll.json"
```
### DynamoDB Streams
@@ -954,49 +510,31 @@ The DynamoDB data class utility provides the base class for `DynamoDBStreamEvent
The class automatically deserializes DynamoDB types into their equivalent Python types.
=== "app.py"
-
- ```python
- from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import (
- DynamoDBStreamEvent,
- DynamoDBRecordEventName
- )
-
- def lambda_handler(event, context):
- event: DynamoDBStreamEvent = DynamoDBStreamEvent(event)
-
- # Multiple records can be delivered in a single event
- for record in event.records:
- if record.event_name == DynamoDBRecordEventName.MODIFY:
- do_something_with(record.dynamodb.new_image)
- do_something_with(record.dynamodb.old_image)
+ ```python hl_lines="1-3 8"
+ --8<-- "examples/event_sources/src/dynamodb_stream.py"
```
-
-=== "multiple_records_types.py"
-
- ```python
- from aws_lambda_powertools.utilities.data_classes import event_source, DynamoDBStreamEvent
- from aws_lambda_powertools.utilities.typing import LambdaContext
-
-
- @event_source(data_class=DynamoDBStreamEvent)
- def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
- for record in event.records:
- # {"N": "123.45"} => Decimal("123.45")
- key: str = record.dynamodb.keys["id"]
- print(key)
+=== "app_multiple_records.py"
+ ```python hl_lines="1 5"
+ --8<-- "examples/event_sources/src/dynamodb_multiple_records.py"
+ ```
+=== "DynamoDB Streams Example Event"
+ ```json
+ --8<-- "tests/events/dynamoStreamEvent.json"
```
### EventBridge
-=== "app.py"
+ When an event matching a defined rule occurs in EventBridge, it can [automatically trigger a Lambda function](https://docs.aws.amazon.com/lambda/latest/dg/with-eventbridge-scheduler.html), passing the event data as input.
- ```python
- from aws_lambda_powertools.utilities.data_classes import event_source, EventBridgeEvent
+=== "app.py"
- @event_source(data_class=EventBridgeEvent)
- def lambda_handler(event: EventBridgeEvent, context):
- do_something_with(event.detail)
+ ```python hl_lines="1 4"
+ --8<-- "examples/event_sources/src/eventBridgeEvent.py"
+ ```
+=== "EventBridge Example Event"
+ ```json
+ --8<-- "tests/events/eventBridgeEvent.json"
```
### Kafka
@@ -1005,14 +543,13 @@ This example is based on the AWS docs for [Amazon MSK](https://docs.aws.amazon.c
=== "app.py"
- ```python
- from aws_lambda_powertools.utilities.data_classes import event_source, KafkaEvent
-
- @event_source(data_class=KafkaEvent)
- def lambda_handler(event: KafkaEvent, context):
- for record in event.records:
- do_something_with(record.decoded_key, record.json_value)
+ ```python hl_lines="1 8"
+ --8<-- "examples/event_sources/src/kafka_event.py"
+ ```
+=== "Kafka Example Event"
+ ```json
+ --8<-- "tests/events/kafkaEventMsk.json"
```
### Kinesis streams
@@ -1022,20 +559,13 @@ or plain text, depending on the original payload.
=== "app.py"
- ```python
- from aws_lambda_powertools.utilities.data_classes import event_source, KinesisStreamEvent
-
- @event_source(data_class=KinesisStreamEvent)
- def lambda_handler(event: KinesisStreamEvent, context):
- kinesis_record = next(event.records).kinesis
-
- # if data was delivered as text
- data = kinesis_record.data_as_text()
-
- # if data was delivered as json
- data = kinesis_record.data_as_json()
+ ```python hl_lines="4 11"
+ --8<-- "examples/event_sources/src/kinesis_streams.py"
+ ```
- do_something_with(data)
+=== "Kinesis streams Example Event"
+ ```json
+ --8<-- "tests/events/kinesisStreamEvent.json"
```
### Kinesis Firehose delivery stream
@@ -1050,7 +580,7 @@ To do that, you can use `KinesisFirehoseDataTransformationResponse` class along
=== "Transforming streaming records"
- ```python hl_lines="2-3 12 28"
+ ```python hl_lines="2-3 10 12"
--8<-- "examples/event_sources/src/kinesis_firehose_delivery_stream.py"
```
@@ -1059,7 +589,7 @@ To do that, you can use `KinesisFirehoseDataTransformationResponse` class along
=== "Dropping invalid records"
- ```python hl_lines="5-6 16 34"
+ ```python hl_lines="5-6 14 16"
--8<-- "examples/event_sources/src/kinesis_firehose_response_drop.py"
```
@@ -1067,68 +597,62 @@ To do that, you can use `KinesisFirehoseDataTransformationResponse` class along
=== "Indicating a processing failure"
- ```python hl_lines="2-3 33"
+ ```python hl_lines="2-3 11 33"
--8<-- "examples/event_sources/src/kinesis_firehose_response_exception.py"
```
1. This record will now be sent to your [S3 bucket in the `processing-failed` folder](https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html#data-transformation-failure-handling){target="_blank"}.
+=== "kinesisFirehoseEvent.json"
+ ```json
+ --8<-- "tests/events/kinesisFirehoseKinesisEvent.json"
+ ```
+
### Lambda Function URL
+[Lambda Function URLs](https://docs.aws.amazon.com/lambda/latest/dg/urls-invocation.html) provide a direct HTTP endpoint for invoking Lambda functions. This feature allows functions to receive and process HTTP requests without the need for additional services like API Gateway.
+
=== "app.py"
- ```python
- from aws_lambda_powertools.utilities.data_classes import event_source, LambdaFunctionUrlEvent
+ ```python hl_lines="1 4"
+ --8<-- "examples/event_sources/src/lambdaFunctionUrl.py"
+ ```
- @event_source(data_class=LambdaFunctionUrlEvent)
- def lambda_handler(event: LambdaFunctionUrlEvent, context):
- do_something_with(event.body)
+=== "Lambda Function URL Example Event"
+ ```json
+ --8<-- "tests/events/lambdaFunctionUrlEvent.json"
```
### Rabbit MQ
-It is used for [Rabbit MQ payloads](https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html){target="_blank"}, also see
+It is used for [Rabbit MQ payloads](https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html){target="_blank"}. See
the [blog post](https://aws.amazon.com/blogs/compute/using-amazon-mq-for-rabbitmq-as-an-event-source-for-lambda/){target="_blank"}
for more details.
=== "app.py"
- ```python hl_lines="4-5 9-10"
- from typing import Dict
-
- from aws_lambda_powertools import Logger
- from aws_lambda_powertools.utilities.data_classes import event_source
- from aws_lambda_powertools.utilities.data_classes.rabbit_mq_event import RabbitMQEvent
-
- logger = Logger()
+ ```python hl_lines="5 10"
+ --8<-- "examples/event_sources/src/rabbit_mq_example.py"
+ ```
- @event_source(data_class=RabbitMQEvent)
- def lambda_handler(event: RabbitMQEvent, context):
- for queue_name, messages in event.rmq_messages_by_queue.items():
- logger.debug(f"Messages for queue: {queue_name}")
- for message in messages:
- logger.debug(f"MessageID: {message.basic_properties.message_id}")
- data: Dict = message.json_data
- logger.debug("Process json in base64 encoded data str", data)
+=== "Rabbit MQ Example Event"
+ ```json
+ --8<-- "tests/events/rabbitMQEvent.json"
```
### S3
-=== "app.py"
-
- ```python
- from urllib.parse import unquote_plus
- from aws_lambda_powertools.utilities.data_classes import event_source, S3Event
+Integration with Amazon S3 enables automatic, serverless processing of object-level events in S3 buckets. When triggered by actions like object creation or deletion, Lambda functions receive detailed event information, allowing for real-time file processing, data transformations, and automated workflows.
- @event_source(data_class=S3Event)
- def lambda_handler(event: S3Event, context):
- bucket_name = event.bucket_name
+=== "app.py"
- # Multiple records can be delivered in a single event
- for record in event.records:
- object_key = unquote_plus(record.s3.get_object.key)
+ ```python hl_lines="3 6"
+ --8<-- "examples/event_sources/src/s3Event.py"
+ ```
- do_something_with(f"{bucket_name}/{object_key}")
+=== "S3 Example Event"
+ ```json
+ --8<-- "tests/events/s3Event.json"
```
### S3 Batch Operations
@@ -1141,54 +665,42 @@ This example is based on the AWS S3 Batch Operations documentation [Example Lamb
--8<-- "examples/event_sources/src/s3_batch_operation.py"
```
+=== "S3 Batch Operations Example Event"
+
+ ```json
+ --8<-- "tests/events/s3BatchOperationEventSchemaV2.json"
+ ```
+
### S3 Object Lambda
This example is based on the AWS Blog post [Introducing Amazon S3 Object Lambda – Use Your Code to Process Data as It Is Being Retrieved from S3](https://aws.amazon.com/blogs/aws/introducing-amazon-s3-object-lambda-use-your-code-to-process-data-as-it-is-being-retrieved-from-s3/){target="_blank"}.
=== "app.py"
- ```python hl_lines="5-6 12 14"
- import boto3
- import requests
-
- from aws_lambda_powertools import Logger
- from aws_lambda_powertools.logging.correlation_paths import S3_OBJECT_LAMBDA
- from aws_lambda_powertools.utilities.data_classes.s3_object_event import S3ObjectLambdaEvent
-
- logger = Logger()
- session = boto3.session.Session()
- s3 = session.client("s3")
-
- @logger.inject_lambda_context(correlation_id_path=S3_OBJECT_LAMBDA, log_event=True)
- def lambda_handler(event, context):
- event = S3ObjectLambdaEvent(event)
-
- # Get object from S3
- response = requests.get(event.input_s3_url)
- original_object = response.content.decode("utf-8")
-
- # Make changes to the object about to be returned
- transformed_object = original_object.upper()
+ ```python hl_lines="5 6 13 15"
+ --8<-- "examples/event_sources/src/s3_object_lambda.py"
+ ```
- # Write object back to S3 Object Lambda
- s3.write_get_object_response(
- Body=transformed_object, RequestRoute=event.request_route, RequestToken=event.request_token
- )
+=== "S3 Object Lambda Example Event"
- return {"status_code": 200}
+ ```json
+ --8<-- "examples/event_sources/events/s3ObjectEvent.json"
```
### S3 EventBridge Notification
+[S3 EventBridge notifications](https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventBridge.html) enhance Lambda's ability to process S3 events by routing them through Amazon EventBridge. This integration offers advanced filtering, multiple destination support, and standardized CloudEvents format.
+
=== "app.py"
- ```python
- from aws_lambda_powertools.utilities.data_classes import event_source, S3EventBridgeNotificationEvent
+ ```python hl_lines="1 4"
+ --8<-- "examples/event_sources/src/s3_event_bridge.py"
+ ```
- @event_source(data_class=S3EventBridgeNotificationEvent)
- def lambda_handler(event: S3EventBridgeNotificationEvent, context):
- bucket_name = event.detail.bucket.name
- file_key = event.detail.object.key
+=== "S3 EventBridge Notification Example Event"
+
+ ```json
+ --8<-- "tests/events/s3EventBridgeNotificationObjectCreatedEvent.json"
```
### Secrets Manager
@@ -1209,50 +721,50 @@ AWS Secrets Manager rotation uses an AWS Lambda function to update the secret. [
### SES
+The integration with Simple Email Service (SES) enables serverless email processing. When configured, SES can trigger Lambda functions in response to incoming emails or delivery status notifications. The Lambda function receives an SES event containing details like sender, recipients, and email content.
+
=== "app.py"
- ```python
- from aws_lambda_powertools.utilities.data_classes import event_source, SESEvent
+ ```python hl_lines="1 4"
+ --8<-- "examples/event_sources/src/ses_event.py"
+ ```
- @event_source(data_class=SESEvent)
- def lambda_handler(event: SESEvent, context):
- # Multiple records can be delivered in a single event
- for record in event.records:
- mail = record.ses.mail
- common_headers = mail.common_headers
+=== "SES Example Event"
- do_something_with(common_headers.to, common_headers.subject)
+ ```json
+ --8<-- "tests/events/sesEvent.json"
```
### SNS
+The integration with Simple Notification Service (SNS) enables serverless message processing. When configured, SNS can trigger Lambda functions in response to published messages or notifications. The Lambda function receives an SNS event containing details like the message body, subject, and metadata.
+
=== "app.py"
- ```python
- from aws_lambda_powertools.utilities.data_classes import event_source, SNSEvent
+ ```python hl_lines="1 4"
+ --8<-- "examples/event_sources/src/sns_event.py"
+ ```
- @event_source(data_class=SNSEvent)
- def lambda_handler(event: SNSEvent, context):
- # Multiple records can be delivered in a single event
- for record in event.records:
- message = record.sns.message
- subject = record.sns.subject
+=== "SNS Example Event"
- do_something_with(subject, message)
+ ```json
+ --8<-- "tests/events/snsEvent.json"
```
### SQS
+The integration with Simple Queue Service (SQS) enables serverless queue processing. When configured, SQS can trigger Lambda functions in response to messages in the queue. The Lambda function receives an SQS event containing details like message body, attributes, and metadata.
+
=== "app.py"
- ```python
- from aws_lambda_powertools.utilities.data_classes import event_source, SQSEvent
+ ```python hl_lines="1 4"
+ --8<-- "examples/event_sources/src/sqs_event.py"
+ ```
+
+=== "SQS Example Event"
- @event_source(data_class=SQSEvent)
- def lambda_handler(event: SQSEvent, context):
- # Multiple records can be delivered in a single event
- for record in event.records:
- do_something_with(record.body)
+ ```json
+ --8<-- "tests/events/sqsEvent.json"
```
### VPC Lattice V2
@@ -1270,7 +782,7 @@ You can register your Lambda functions as targets within an Amazon VPC Lattice s
=== "Lattice Example Event"
```json
- --8<-- "examples/event_sources/src/vpc_lattice_v2_payload.json"
+ --8<-- "examples/event_sources/events/vpc_lattice_v2_payload.json"
```
### VPC Lattice V1
@@ -1288,7 +800,7 @@ You can register your Lambda functions as targets within an Amazon VPC Lattice s
=== "Lattice Example Event"
```json
- --8<-- "examples/event_sources/src/vpc_lattice_payload.json"
+ --8<-- "examples/event_sources/events/vpc_lattice_payload.json"
```
## Advanced
@@ -1308,10 +820,9 @@ However, certain events may contain sensitive fields such as `secret_access_key`
=== "debugging_event.json"
```json hl_lines="28 29"
- --8<-- "examples/event_sources/src/debugging_event.json"
+ --8<-- "examples/event_sources/events/debugging_event.json"
```
=== "debugging_output.json"
```json hl_lines="16 17 18"
- --8<-- "examples/event_sources/src/debugging_output.json"
- ```
+ --8<-- "examples/event_sources/events/debugging_output.json"
```
diff --git a/examples/event_sources/events/active_mq_event_example.json b/examples/event_sources/events/active_mq_event_example.json
new file mode 100644
index 00000000000..50da9596682
--- /dev/null
+++ b/examples/event_sources/events/active_mq_event_example.json
@@ -0,0 +1,27 @@
+{
+ "eventSource": "aws:mq",
+ "eventSourceArn": "arn:aws:mq:us-east-2:111122223333:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8",
+ "messages": [
+ {
+ "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-east-2.amazonaws.com-37557-1234520418293-4:1:1:1:1",
+ "messageType": "jms/text-message",
+ "destination": {
+ "physicalName": "testQueue"
+ },
+ "data": "QUJDOkFBQUE=",
+ "timestamp": 1598827811958,
+ "properties": {
+ "index": "1"
+ }
+ },
+ {
+ "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-east-2.amazonaws.com-37557-1234520418293-4:1:1:1:2",
+ "messageType": "jms/bytes-message",
+ "destination": {
+ "physicalName": "testQueue2"
+ },
+ "data": "LQaGQ82S48k=",
+ "timestamp": 1598827811959
+ }
+ ]
+}
\ No newline at end of file
diff --git a/examples/event_sources/events/apigw_event.json b/examples/event_sources/events/apigw_event.json
new file mode 100644
index 00000000000..dc0efd36604
--- /dev/null
+++ b/examples/event_sources/events/apigw_event.json
@@ -0,0 +1,20 @@
+{
+ "resource": "/helloworld",
+ "path": "/hello",
+ "httpMethod": "GET",
+ "headers": {
+ "Accept": "*/*",
+ "Host": "api.example.com"
+ },
+ "queryStringParameters": {
+ "name": "John"
+ },
+ "pathParameters": null,
+ "stageVariables": null,
+ "requestContext": {
+ "requestId": "c6af9ac6-7b61-11e6-9a41-93e8deadbeef",
+ "stage": "prod"
+ },
+ "body": null,
+ "isBase64Encoded": false
+}
\ No newline at end of file
diff --git a/examples/event_sources/src/aws_config_rule_scheduled.json b/examples/event_sources/events/aws_config_rule_scheduled.json
similarity index 100%
rename from examples/event_sources/src/aws_config_rule_scheduled.json
rename to examples/event_sources/events/aws_config_rule_scheduled.json
diff --git a/examples/event_sources/src/debugging_event.json b/examples/event_sources/events/debugging_event.json
similarity index 100%
rename from examples/event_sources/src/debugging_event.json
rename to examples/event_sources/events/debugging_event.json
diff --git a/examples/event_sources/src/debugging_output.json b/examples/event_sources/events/debugging_output.json
similarity index 100%
rename from examples/event_sources/src/debugging_output.json
rename to examples/event_sources/events/debugging_output.json
diff --git a/examples/event_sources/events/s3ObjectEvent.json b/examples/event_sources/events/s3ObjectEvent.json
new file mode 100644
index 00000000000..afec46fecca
--- /dev/null
+++ b/examples/event_sources/events/s3ObjectEvent.json
@@ -0,0 +1,29 @@
+{
+ "xAmzRequestId": "1a5ed718-5f53-471d-b6fe-5cf62d88d02a",
+ "getObjectContext": {
+ "inputS3Url": "https://myap-123412341234.s3-accesspoint.us-east-1.amazonaws.com/s3.txt?X-Amz-Security-Token=...",
+ "outputRoute": "io-iad-cell001",
+ "outputToken": "..."
+ },
+ "configuration": {
+ "accessPointArn": "arn:aws:s3-object-lambda:us-east-1:123412341234:accesspoint/myolap",
+ "supportingAccessPointArn": "arn:aws:s3:us-east-1:123412341234:accesspoint/myap",
+ "payload": "test"
+ },
+ "userRequest": {
+ "url": "/s3.txt",
+ "headers": {
+ "Host": "myolap-123412341234.s3-object-lambda.us-east-1.amazonaws.com",
+ "Accept-Encoding": "identity",
+ "X-Amz-Content-SHA256": "e3b0c44297fc1c149afbf4c8995fb92427ae41e4649b934ca495991b7852b855"
+ }
+ },
+ "userIdentity": {
+ "type": "IAMUser",
+ "principalId": "...",
+ "arn": "arn:aws:iam::123412341234:user/myuser",
+ "accountId": "123412341234",
+ "accessKeyId": "..."
+ },
+ "protocolVersion": "1.00"
+}
\ No newline at end of file
diff --git a/examples/event_sources/src/vpc_lattice_payload.json b/examples/event_sources/events/vpc_lattice_payload.json
similarity index 100%
rename from examples/event_sources/src/vpc_lattice_payload.json
rename to examples/event_sources/events/vpc_lattice_payload.json
diff --git a/examples/event_sources/src/vpc_lattice_v2_payload.json b/examples/event_sources/events/vpc_lattice_v2_payload.json
similarity index 100%
rename from examples/event_sources/src/vpc_lattice_v2_payload.json
rename to examples/event_sources/events/vpc_lattice_v2_payload.json
diff --git a/examples/event_sources/src/active_mq_example.py b/examples/event_sources/src/active_mq_example.py
new file mode 100644
index 00000000000..983233606ec
--- /dev/null
+++ b/examples/event_sources/src/active_mq_example.py
@@ -0,0 +1,18 @@
+import json
+
+from aws_lambda_powertools import Logger
+from aws_lambda_powertools.utilities.data_classes import event_source
+from aws_lambda_powertools.utilities.data_classes.active_mq_event import ActiveMQEvent
+
+logger = Logger()
+
+
+@event_source(data_class=ActiveMQEvent)
+def lambda_handler(event: ActiveMQEvent, context):
+ for message in event.messages:
+ msg = message.message_id
+ msg_pn = message.destination_physicalname
+
+ logger.info(f"Message ID: {msg} and physical name: {msg_pn}")
+
+ return {"statusCode": 200, "body": json.dumps("Processing complete")}
diff --git a/examples/event_sources/src/albEvent.py b/examples/event_sources/src/albEvent.py
new file mode 100644
index 00000000000..fd2b6aef05b
--- /dev/null
+++ b/examples/event_sources/src/albEvent.py
@@ -0,0 +1,9 @@
+from aws_lambda_powertools.utilities.data_classes import ALBEvent, event_source
+
+
+@event_source(data_class=ALBEvent)
+def lambda_handler(event: ALBEvent, context):
+ if "lambda" in event.path and event.http_method == "GET":
+ return {"statusCode": 200, "body": f"Hello from path: {event.path}"}
+ else:
+ return {"statusCode": 400, "body": "No Hello from path"}
diff --git a/examples/event_sources/src/apigw_auth_v2.py b/examples/event_sources/src/apigw_auth_v2.py
new file mode 100644
index 00000000000..128c7a57a6a
--- /dev/null
+++ b/examples/event_sources/src/apigw_auth_v2.py
@@ -0,0 +1,30 @@
+from secrets import compare_digest
+
+from aws_lambda_powertools.utilities.data_classes import event_source
+from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import (
+ APIGatewayAuthorizerEventV2,
+ APIGatewayAuthorizerResponseV2,
+)
+
+
+def get_user_by_token(token):
+ if compare_digest(token, "value"):
+ return {"name": "Foo"}
+ return None
+
+
+@event_source(data_class=APIGatewayAuthorizerEventV2)
+def lambda_handler(event: APIGatewayAuthorizerEventV2, context):
+ user = get_user_by_token(event.headers.get("Authorization"))
+
+ if user is None:
+ # No user was found, so we return not authorized
+ return APIGatewayAuthorizerResponseV2(authorize=False).asdict()
+
+ # Found the user and setting the details in the context
+ response = APIGatewayAuthorizerResponseV2(
+ authorize=True,
+ context=user,
+ )
+
+ return response.asdict()
diff --git a/examples/event_sources/src/apigw_authorizer_request.py b/examples/event_sources/src/apigw_authorizer_request.py
new file mode 100644
index 00000000000..e0d81196af2
--- /dev/null
+++ b/examples/event_sources/src/apigw_authorizer_request.py
@@ -0,0 +1,29 @@
+from aws_lambda_powertools.utilities.data_classes import event_source
+from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import (
+ APIGatewayAuthorizerRequestEvent,
+ APIGatewayAuthorizerResponse,
+)
+
+
+@event_source(data_class=APIGatewayAuthorizerRequestEvent)
+def lambda_handler(event: APIGatewayAuthorizerRequestEvent, context):
+ # Simple auth check (replace with your actual auth logic)
+ is_authorized = event.headers.get("HeaderAuth1") == "headerValue1"
+
+ if not is_authorized:
+ return {"principalId": "", "policyDocument": {"Version": "2012-10-17", "Statement": []}}
+
+ arn = event.parsed_arn
+
+ policy = APIGatewayAuthorizerResponse(
+ principal_id="user",
+ context={"user": "example"},
+ region=arn.region,
+ aws_account_id=arn.aws_account_id,
+ api_id=arn.api_id,
+ stage=arn.stage,
+ )
+
+ policy.allow_all_routes()
+
+ return policy.asdict()
diff --git a/examples/event_sources/src/apigw_authorizer_token.py b/examples/event_sources/src/apigw_authorizer_token.py
new file mode 100644
index 00000000000..e27eded5c7a
--- /dev/null
+++ b/examples/event_sources/src/apigw_authorizer_token.py
@@ -0,0 +1,29 @@
+from aws_lambda_powertools.utilities.data_classes import event_source
+from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import (
+ APIGatewayAuthorizerResponse,
+ APIGatewayAuthorizerTokenEvent,
+)
+
+
+@event_source(data_class=APIGatewayAuthorizerTokenEvent)
+def lambda_handler(event: APIGatewayAuthorizerTokenEvent, context):
+ # Simple token check (replace with your actual token validation logic)
+ is_valid_token = event.authorization_token == "allow"
+
+ if not is_valid_token:
+ return {"principalId": "", "policyDocument": {"Version": "2012-10-17", "Statement": []}}
+
+ arn = event.parsed_arn
+
+ policy = APIGatewayAuthorizerResponse(
+ principal_id="user",
+ context={"user": "example"},
+ region=arn.region,
+ aws_account_id=arn.aws_account_id,
+ api_id=arn.api_id,
+ stage=arn.stage,
+ )
+
+ policy.allow_all_routes()
+
+ return policy.asdict()
diff --git a/examples/event_sources/src/apigw_proxy_decorator.py b/examples/event_sources/src/apigw_proxy_decorator.py
new file mode 100644
index 00000000000..81db0b1a6aa
--- /dev/null
+++ b/examples/event_sources/src/apigw_proxy_decorator.py
@@ -0,0 +1,9 @@
+from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEvent, event_source
+
+
+@event_source(data_class=APIGatewayProxyEvent)
+def lambda_handler(event: APIGatewayProxyEvent, context):
+ if "hello" in event.path and event.http_method == "GET":
+ return {"statusCode": 200, "body": f"Hello from path: {event.path}"}
+ else:
+ return {"statusCode": 400, "body": "No Hello from path"}
diff --git a/examples/event_sources/src/apigw_proxy_v2.py b/examples/event_sources/src/apigw_proxy_v2.py
new file mode 100644
index 00000000000..fb468973e15
--- /dev/null
+++ b/examples/event_sources/src/apigw_proxy_v2.py
@@ -0,0 +1,9 @@
+from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEventV2, event_source
+
+
+@event_source(data_class=APIGatewayProxyEventV2)
+def lambda_handler(event: APIGatewayProxyEventV2, context):
+ if "hello" in event.path and event.http_method == "POST":
+ return {"statusCode": 200, "body": f"Hello from path: {event.path}"}
+ else:
+ return {"statusCode": 400, "body": "No Hello from path"}
diff --git a/examples/event_sources/src/appSyncAuthorizer.py b/examples/event_sources/src/appSyncAuthorizer.py
new file mode 100644
index 00000000000..012f7beb016
--- /dev/null
+++ b/examples/event_sources/src/appSyncAuthorizer.py
@@ -0,0 +1,33 @@
+from typing import Dict
+
+from aws_lambda_powertools.logging import correlation_paths
+from aws_lambda_powertools.logging.logger import Logger
+from aws_lambda_powertools.utilities.data_classes.appsync_authorizer_event import (
+ AppSyncAuthorizerEvent,
+ AppSyncAuthorizerResponse,
+)
+from aws_lambda_powertools.utilities.data_classes.event_source import event_source
+
+logger = Logger()
+
+
+def get_user_by_token(token: str):
+ """Look a user by token"""
+ ...
+
+
+@logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_AUTHORIZER)
+@event_source(data_class=AppSyncAuthorizerEvent)
+def lambda_handler(event: AppSyncAuthorizerEvent, context) -> Dict:
+ user = get_user_by_token(event.authorization_token)
+
+ if not user:
+ # No user found, return not authorized
+ return AppSyncAuthorizerResponse().asdict()
+
+ return AppSyncAuthorizerResponse(
+ authorize=True,
+ resolver_context={"id": user.id},
+ # Only allow admins to delete events
+ deny_fields=None if user.is_admin else ["Mutation.deleteEvent"],
+ ).asdict()
diff --git a/examples/event_sources/src/appSyncResolver.py b/examples/event_sources/src/appSyncResolver.py
new file mode 100644
index 00000000000..6884b0649fd
--- /dev/null
+++ b/examples/event_sources/src/appSyncResolver.py
@@ -0,0 +1,57 @@
+from aws_lambda_powertools.utilities.data_classes import event_source
+from aws_lambda_powertools.utilities.data_classes.appsync_resolver_event import (
+ AppSyncIdentityCognito,
+ AppSyncResolverEvent,
+)
+from aws_lambda_powertools.utilities.typing import LambdaContext
+
+
+@event_source(data_class=AppSyncResolverEvent)
+def lambda_handler(event: AppSyncResolverEvent, context: LambdaContext):
+ # Access the AppSync event details
+ type_name = event.type_name
+ field_name = event.field_name
+ arguments = event.arguments
+ source = event.source
+
+ print(f"Resolving field '{field_name}' for type '{type_name}'")
+ print(f"Arguments: {arguments}")
+ print(f"Source: {source}")
+
+ # Check if the identity is Cognito-based
+ if isinstance(event.identity, AppSyncIdentityCognito):
+ user_id = event.identity.sub
+ username = event.identity.username
+ print(f"Request from Cognito user: {username} (ID: {user_id})")
+ else:
+ print("Request is not from a Cognito-authenticated user")
+
+ if type_name == "Merchant" and field_name == "locations":
+ page = arguments.get("page", 1)
+ size = arguments.get("size", 10)
+ name_filter = arguments.get("name")
+
+ # Here you would typically fetch locations from a database
+ # This is a mock implementation
+ locations = [
+ {"id": "1", "name": "Location 1", "address": "123 Main St"},
+ {"id": "2", "name": "Location 2", "address": "456 Elm St"},
+ {"id": "3", "name": "Location 3", "address": "789 Oak St"},
+ ]
+
+ # Apply name filter if provided
+ if name_filter:
+ locations = [loc for loc in locations if name_filter.lower() in loc["name"].lower()]
+
+ # Apply pagination
+ start = (page - 1) * size
+ end = start + size
+ paginated_locations = locations[start:end]
+
+ return {
+ "items": paginated_locations,
+ "totalCount": len(locations),
+ "nextToken": str(page + 1) if end < len(locations) else None,
+ }
+ else:
+ raise Exception(f"Unhandled field: {field_name} for type: {type_name}")
diff --git a/examples/event_sources/src/aws_config_rule.py b/examples/event_sources/src/aws_config_rule.py
index b81ae39bd25..07d87999982 100644
--- a/examples/event_sources/src/aws_config_rule.py
+++ b/examples/event_sources/src/aws_config_rule.py
@@ -3,13 +3,12 @@
AWSConfigRuleEvent,
event_source,
)
-from aws_lambda_powertools.utilities.typing import LambdaContext
logger = Logger()
@event_source(data_class=AWSConfigRuleEvent)
-def lambda_handler(event: AWSConfigRuleEvent, context: LambdaContext):
+def lambda_handler(event: AWSConfigRuleEvent, context):
message_type = event.invoking_event.message_type
logger.info(f"Logging {message_type} event rule", invoke_event=event.raw_invoking_event)
diff --git a/examples/event_sources/src/aws_config_rule_item_changed.json b/examples/event_sources/src/aws_config_rule_item_changed.json
deleted file mode 100644
index cbf7abf67aa..00000000000
--- a/examples/event_sources/src/aws_config_rule_item_changed.json
+++ /dev/null
@@ -1,13 +0,0 @@
-{
- "version":"1.0",
- "invokingEvent":"{\"configurationItemDiff\":{\"changedProperties\":{\"Configuration.InstanceType\":{\"previousValue\":\"t2.micro\",\"updatedValue\":\"t2.medium\",\"changeType\":\"UPDATE\"},\"Configuration.State.Name\":{\"previousValue\":\"running\",\"updatedValue\":\"stopped\",\"changeType\":\"UPDATE\"},\"Configuration.StateTransitionReason\":{\"previousValue\":\"\",\"updatedValue\":\"User initiated (2023-04-27 15:01:07 GMT)\",\"changeType\":\"UPDATE\"},\"Configuration.StateReason\":{\"previousValue\":null,\"updatedValue\":{\"code\":\"Client.UserInitiatedShutdown\",\"message\":\"Client.UserInitiatedShutdown: User initiated shutdown\"},\"changeType\":\"CREATE\"},\"Configuration.CpuOptions.CoreCount\":{\"previousValue\":1,\"updatedValue\":2,\"changeType\":\"UPDATE\"}},\"changeType\":\"UPDATE\"},\"configurationItem\":{\"relatedEvents\":[],\"relationships\":[{\"resourceId\":\"eipalloc-0ebb4367662263cc1\",\"resourceName\":null,\"resourceType\":\"AWS::EC2::EIP\",\"name\":\"Is attached to ElasticIp\"},{\"resourceId\":\"eni-034dd31c4b17ada8c\",\"resourceName\":null,\"resourceType\":\"AWS::EC2::NetworkInterface\",\"name\":\"Contains NetworkInterface\"},{\"resourceId\":\"eni-09a604c0ec356b06f\",\"resourceName\":null,\"resourceType\":\"AWS::EC2::NetworkInterface\",\"name\":\"Contains NetworkInterface\"},{\"resourceId\":\"sg-0fb295a327d9b4835\",\"resourceName\":null,\"resourceType\":\"AWS::EC2::SecurityGroup\",\"name\":\"Is associated with SecurityGroup\"},{\"resourceId\":\"subnet-cad1f2f4\",\"resourceName\":null,\"resourceType\":\"AWS::EC2::Subnet\",\"name\":\"Is contained in Subnet\"},{\"resourceId\":\"vol-0a288b5eb9fea4b30\",\"resourceName\":null,\"resourceType\":\"AWS::EC2::Volume\",\"name\":\"Is attached to Volume\"},{\"resourceId\":\"vpc-2d96be57\",\"resourceName\":null,\"resourceType\":\"AWS::EC2::VPC\",\"name\":\"Is contained in Vpc\"}],\"configuration\":{\"amiLaunchIndex\":0,\"imageId\":\"ami-09d95fab7fff3776c\",\"instanceId\":\"i-042dd005362091826\",\"instanceType\":\"t2.medium\",\"kernelId\":null,\"keyName\":\"mihaec2\",\"launchTime\":\"2023-04-27T14:57:16.000Z\",\"monitoring\":{\"state\":\"disabled\"},\"placement\":{\"availabilityZone\":\"us-east-1e\",\"affinity\":null,\"groupName\":\"\",\"partitionNumber\":null,\"hostId\":null,\"tenancy\":\"default\",\"spreadDomain\":null,\"hostResourceGroupArn\":null},\"platform\":null,\"privateDnsName\":\"ip-172-31-78-41.ec2.internal\",\"privateIpAddress\":\"172.31.78.41\",\"productCodes\":[],\"publicDnsName\":\"ec2-3-232-229-57.compute-1.amazonaws.com\",\"publicIpAddress\":\"3.232.229.57\",\"ramdiskId\":null,\"state\":{\"code\":80,\"name\":\"stopped\"},\"stateTransitionReason\":\"User initiated (2023-04-27 15:01:07 GMT)\",\"subnetId\":\"subnet-cad1f2f4\",\"vpcId\":\"vpc-2d96be57\",\"architecture\":\"x86_64\",\"blockDeviceMappings\":[{\"deviceName\":\"/dev/xvda\",\"ebs\":{\"attachTime\":\"2020-05-30T15:21:58.000Z\",\"deleteOnTermination\":true,\"status\":\"attached\",\"volumeId\":\"vol-0a288b5eb9fea4b30\"}}],\"clientToken\":\"\",\"ebsOptimized\":false,\"enaSupport\":true,\"hypervisor\":\"xen\",\"iamInstanceProfile\":{\"arn\":\"arn:aws:iam::0123456789012:instance-profile/AmazonSSMRoleForInstancesQuickSetup\",\"id\":\"AIPAS5S4WFUBL72S3QXW5\"},\"instanceLifecycle\":null,\"elasticGpuAssociations\":[],\"elasticInferenceAcceleratorAssociations\":[],\"networkInterfaces\":[{\"association\":{\"carrierIp\":null,\"ipOwnerId\":\"0123456789012\",\"publicDnsName\":\"ec2-3-232-229-57.compute-1.amazonaws.com\",\"publicIp\":\"3.232.229.57\"},\"attachment\":{\"attachTime\":\"2020-05-30T15:21:57.000Z\",\"attachmentId\":\"eni-attach-0a7e75dc9c1c291a0\",\"deleteOnTermination\":true,\"deviceIndex\":0,\"status\":\"attached\",\"networkCardIndex\":0},\"description\":\"\",\"groups\":[{\"groupName\":\"minhaec2\",\"groupId\":\"sg-0fb295a327d9b4835\"}],\"ipv6Addresses\":[],\"macAddress\":\"06:cf:00:c2:17:db\",\"networkInterfaceId\":\"eni-034dd31c4b17ada8c\",\"ownerId\":\"0123456789012\",\"privateDnsName\":\"ip-172-31-78-41.ec2.internal\",\"privateIpAddress\":\"172.31.78.41\",\"privateIpAddresses\":[{\"association\":{\"carrierIp\":null,\"ipOwnerId\":\"0123456789012\",\"publicDnsName\":\"ec2-3-232-229-57.compute-1.amazonaws.com\",\"publicIp\":\"3.232.229.57\"},\"primary\":true,\"privateDnsName\":\"ip-172-31-78-41.ec2.internal\",\"privateIpAddress\":\"172.31.78.41\"}],\"sourceDestCheck\":true,\"status\":\"in-use\",\"subnetId\":\"subnet-cad1f2f4\",\"vpcId\":\"vpc-2d96be57\",\"interfaceType\":\"interface\"},{\"association\":null,\"attachment\":{\"attachTime\":\"2020-11-26T23:46:04.000Z\",\"attachmentId\":\"eni-attach-0e6d150ebbd19966e\",\"deleteOnTermination\":false,\"deviceIndex\":1,\"status\":\"attached\",\"networkCardIndex\":0},\"description\":\"MINHAEC2AAAAAA\",\"groups\":[{\"groupName\":\"minhaec2\",\"groupId\":\"sg-0fb295a327d9b4835\"},{\"groupName\":\"default\",\"groupId\":\"sg-88105fa0\"}],\"ipv6Addresses\":[],\"macAddress\":\"06:0a:62:00:64:5f\",\"networkInterfaceId\":\"eni-09a604c0ec356b06f\",\"ownerId\":\"0123456789012\",\"privateDnsName\":\"ip-172-31-70-9.ec2.internal\",\"privateIpAddress\":\"172.31.70.9\",\"privateIpAddresses\":[{\"association\":null,\"primary\":true,\"privateDnsName\":\"ip-172-31-70-9.ec2.internal\",\"privateIpAddress\":\"172.31.70.9\"}],\"sourceDestCheck\":true,\"status\":\"in-use\",\"subnetId\":\"subnet-cad1f2f4\",\"vpcId\":\"vpc-2d96be57\",\"interfaceType\":\"interface\"}],\"outpostArn\":null,\"rootDeviceName\":\"/dev/xvda\",\"rootDeviceType\":\"ebs\",\"securityGroups\":[{\"groupName\":\"minhaec2\",\"groupId\":\"sg-0fb295a327d9b4835\"}],\"sourceDestCheck\":true,\"spotInstanceRequestId\":null,\"sriovNetSupport\":null,\"stateReason\":{\"code\":\"Client.UserInitiatedShutdown\",\"message\":\"Client.UserInitiatedShutdown: User initiated shutdown\"},\"tags\":[{\"key\":\"projeto\",\"value\":\"meetup\"},{\"key\":\"Name\",\"value\":\"Minha\"},{\"key\":\"CentroCusto\",\"value\":\"TI\"},{\"key\":\"Setor\",\"value\":\"Desenvolvimento\"}],\"virtualizationType\":\"hvm\",\"cpuOptions\":{\"coreCount\":2,\"threadsPerCore\":1},\"capacityReservationId\":null,\"capacityReservationSpecification\":{\"capacityReservationPreference\":\"open\",\"capacityReservationTarget\":null},\"hibernationOptions\":{\"configured\":false},\"licenses\":[],\"metadataOptions\":{\"state\":\"applied\",\"httpTokens\":\"optional\",\"httpPutResponseHopLimit\":1,\"httpEndpoint\":\"enabled\"},\"enclaveOptions\":{\"enabled\":false},\"bootMode\":null},\"supplementaryConfiguration\":{},\"tags\":{\"projeto\":\"meetup\",\"Setor\":\"Desenvolvimento\",\"CentroCusto\":\"TI\",\"Name\":\"Minha\"},\"configurationItemVersion\":\"1.3\",\"configurationItemCaptureTime\":\"2023-04-27T15:03:11.636Z\",\"configurationStateId\":1682607791636,\"awsAccountId\":\"0123456789012\",\"configurationItemStatus\":\"OK\",\"resourceType\":\"AWS::EC2::Instance\",\"resourceId\":\"i-042dd005362091826\",\"resourceName\":null,\"ARN\":\"arn:aws:ec2:us-east-1:0123456789012:instance/i-042dd005362091826\",\"awsRegion\":\"us-east-1\",\"availabilityZone\":\"us-east-1e\",\"configurationStateMd5Hash\":\"\",\"resourceCreationTime\":\"2023-04-27T14:57:16.000Z\"},\"notificationCreationTime\":\"2023-04-27T15:03:13.332Z\",\"messageType\":\"ConfigurationItemChangeNotification\",\"recordVersion\":\"1.3\"}",
- "ruleParameters":"{\"desiredInstanceType\": \"t2.micro\"}",
- "resultToken":"eyJlbmNyeXB0ZWREYXRhIjpbLTQxLDEsLTU3LC0zMCwtMTIxLDUzLDUyLDQ1LC01NywtOCw3MywtODEsLTExNiwtMTAyLC01MiwxMTIsLTQ3LDU4LDY1LC0xMjcsMTAyLDUsLTY5LDQ0LC0xNSwxMTQsNDEsLTksMTExLC0zMCw2NSwtNzUsLTM1LDU0LDEwNSwtODksODYsNDAsLTEwNSw5OCw2NSwtMTE5LC02OSwyNCw2NiwtMjAsODAsLTExMiwtNzgsLTgwLDQzLC01NywzMCwtMjUsODIsLTEwLDMsLTQsLTg1LC01MywtMzcsLTkwLC04OCwtOTgsLTk4LC00MSwxOSwxMTYsNjIsLTIzLC0xMjEsLTEwOCw1NywtNTgsLTUyLDI5LDEwMSwxMjIsLTU2LC03MSwtODEsLTQ3LDc3LC0yMiwtMTI0LC0zLC04NiwtMTIyLC00MCwtODksLTEwMSw1NywtMTI3LC0zNywtMzcsLTMxLC05OCwtMzEsMTEsLTEyNSwwLDEwOCwtMzIsNjQsNjIsLTIyLDAsNDcsLTEwNiwtMTAwLDEwNCwxNCw1OCwxMjIsLTEwLC01MCwtOTAsLTgwLC01MCwtNSw2NSwwLC0yNSw4NSw4Miw3LDkzLDEyMiwtODIsLTExNiwtNzksLTQ0LDcyLC03MywtNjksMTQsLTU2LDk0LDkwLDExNCwtMjksLTExOSwtNzEsODgsMTA3LDEwNywxMTAsLTcsMTI3LC0xMjUsLTU3LC0xMjYsLTEyMCw2OSwtMTI3LC03NiwtMTE5LDcxLDEsLTY4LDEwNywxMTMsLTU2LDg3LC0xMDIsLTE2LDEwOCwtMTA3LC00MywtOTQsLTEwNiwzLDkwLDE0LDcyLC0xMiwtMTE2LC03Myw4MCwtMTIyLDQ0LC0xMDQsMTIsNzQsNTcsLTEwLC0xMDUsLTExMiwtMzYsMjgsLTQ1LDk3LDExLC00OSwtMTEsNjEsMzYsLTE3LC03NCw1MCw0LC0yNiwxMDQsLTI4LC0xMjUsMjQsNzAsLTg1LC00Niw5MiwtMTAzLC00MSwtMTA2LDY5LDEyMiwyMSwtMjUsODAsOTksLTkzLC01NiwtMjUsLTQ3LC0xMjMsLTU5LC0xMjQsLTUyLC0xNiwxMjcsLTM4LC0xNiwxMDEsMTE5LDEwNywyNywxMCwtNDYsLTg3LC0xMiwtMzksMTQsNDUsMiw3MCwxMDcsMTA0LC00LC02OSwtMTIsNTksLTEyNiwtOTEsMTI3LDU0LDEwNiwtMTI2LC0xMTYsLTEwMiw3Miw4MSw1MCw3NSwtNTEsMTA4LDQxLC0zLC02LC00NSwxMDMsLTg2LDM3LC00NiwtMzIsLTExMSwxMjQsMTExLDg3LDU0LC03NiwxMjIsLTUsLTM2LC04OCw5LC0xMTMsMTE2LC01OSw4Myw3NywyOCwxMiwtNjUsLTExMywtNzksLTEyOCw4MiwtMTE4LC04MywtMTI0LDMxLDk5LC05MCwtOTksMTYsLTEyMywyMSwtMTE0LC05OCwtMTE2LC0xMTksMiwtNzMsNDYsODIsLTEzLDU0LDcxLC00MiwyNSw3NCw3MywtODYsOTQsNDYsOTksOTMsLTgyLDU1LDY1LC05OCw0OSwtNjAsMTEyLDEwMSwyMiw2OSwtMTYsNzcsLTk0LC01OSwtNDYsMTE1LDMwLC00Myw5Myw4OCwtMjgsMzgsNiw4NCwzMSwtMTAxLDMyLC0yMiwtNjMsLTk1LDExNCwtNzUsMTE0LDM2LC04NCw0MCwtNDQsLTEzLDU5LDcyLC0xLC0xMDMsMzEsMTA1LDY5LDY5LDc3LC02NCwtNTYsMTE4LDEzLC0xMTQsODAsOTksLTUzLDI1LDQyLDk0LDczLC04MCwyNSwzOCwyNCwtMTcsNjYsLTExOCwtMjMsMTE5LDkwLDEyMSwxMTgsLTUxLDUxLC0xMiwtNzYsLTUxLDksLTIxLDExNCwtMzcsLTY0LC0yLC0xMjYsLTk1LDYzLDczLC00MSwtMzQsLTkwLC0yMiw1OSwtNzksMzAsLTQsLTEsLTUsMTIsMzksLTk5LC0xMDUsLTEwNCwtNjEsNjUsLTc0LDE5LC0xMywtNjAsLTI4LC04LDQsLTgsMTIxLC0xMTgsMTIyLC02NSwtMjEsMjMsMTcsLTg0LDQwLC05MiwxNCwtMTI2LC02MCwtNzksLTUzLDM3LC04Myw2NSwxMDQsLTM2LC02MCwtMTEwLC0zMywtMTE3LDYsMTA3LDEsLTMsOTMsNzgsLTk1LC0xMjIsNTMsMTA4LC00OSwtNDksMjQsLTY1LDgzLDEyNSwtNzcsLTE5LC04MSwzNCwtNjcsLTQzLC03MCwtMjYsMTgsMTA0LDY1LDQsLTEyNiw0NCwtMTE5LDUyLC00NiwyMiw2NywxMTMsMTE4LC0zMywzNCwtOTYsMTIxLDE5LC0yLC0zNSwwLC04MiwxNyw2NiwtMjcsNjksLTM2LC0xNCw1NiwtOTcsLTE2LDEyMywyOCwtOTUsLTMyLC02MywtNjksNzAsNjQsLTMzLC0xMDAsNDMsLTExMywxMDUsMTAwLDEwOCwtNjAsNDAsLTIsLTk2LC0xMjQsMzcsLTQ1LC0xMjQsLTY4LC02OSwtMTIzLDE3LC02LDg2LC01OSwtOTQsMTEwLDczLDU3LC0xMTYsMTA3LC00MSwtOTQsLTExOCwtMTI2LDEwLC04MCwtNzAsMTAyLDg4LC0xMjYsODcsLTI3LC0xMDEsLTk0LC0zNSwtMTA2LC02LC03MiwtODYsNTAsMTE2LC0yOCw5MCwxMywtMTIwLDYsMjcsOTIsNTYsLTkwLDM5LDQ5LC0xMywtODYsLTI1LC04NiwxMTMsLTEzLDQxLC0xMTksOTQsLTk0LC0xMDMsLTgzLC02MCwxMjcsLTE1LC0zOSwxMTksLTk1LDI3LDQ0LDExNiwxMDksNywtMTAyLC0xNyw0OCwtODIsLTMxLC04LC02OSwzNSw5NCw1NCwtNTUsMSwtMTE5LDU3LC0xMDgsLTMsLTkxLC0xMjIsLTUzLC04OCw0LC05NywtMzUsMTI2LDExOSw1OSwtMSw4NSw3MywtNTgsLTEyMCwtNjQsMTE5LC0xMTIsOTIsMTksOSwtNjYsLTkyLDEwOCwtMTEsLTQyLDExMSwtMTA0LC0xMjAsMjcsLTEwMywtNjksMTksMTExLDEyLDIzLDEwNyw1NCw0MSwtMjYsNjAsLTMxLC01XSwibWF0ZXJpYWxTZXRTZXJpYWxOdW1iZXIiOjEsIml2UGFyYW1ldGVyU3BlYyI6eyJpdiI6Wy05NSwzMiwxMDgsOTEsMzUsLTgyLC0zNywyNCwtNDQsLTExNSwtODIsLTEyOCwtMTIyLDMsNTMsLTI0XX19",
- "eventLeftScope":false,
- "executionRoleArn":"arn:aws:iam::0123456789012:role/aws-service-role/config.amazonaws.com/AWSServiceRoleForConfig",
- "configRuleArn":"arn:aws:config:us-east-1:0123456789012:config-rule/config-rule-i9y8j9",
- "configRuleName":"MyRule",
- "configRuleId":"config-rule-i9y8j9",
- "accountId":"0123456789012",
- "evaluationMode":"DETECTIVE"
- }
diff --git a/examples/event_sources/src/aws_config_rule_oversized.json b/examples/event_sources/src/aws_config_rule_oversized.json
deleted file mode 100644
index 5eaef4e0015..00000000000
--- a/examples/event_sources/src/aws_config_rule_oversized.json
+++ /dev/null
@@ -1,12 +0,0 @@
-{
- "invokingEvent": "{\"configurationItemSummary\": {\"changeType\": \"UPDATE\",\"configurationItemVersion\": \"1.2\",\"configurationItemCaptureTime\":\"2016-10-06T16:46:16.261Z\",\"configurationStateId\": 0,\"awsAccountId\":\"123456789012\",\"configurationItemStatus\": \"OK\",\"resourceType\": \"AWS::EC2::Instance\",\"resourceId\":\"i-00000000\",\"resourceName\":null,\"ARN\":\"arn:aws:ec2:us-west-2:123456789012:instance/i-00000000\",\"awsRegion\": \"us-west-2\",\"availabilityZone\":\"us-west-2a\",\"configurationStateMd5Hash\":\"8f1ee69b287895a0f8bc5753eca68e96\",\"resourceCreationTime\":\"2016-10-06T16:46:10.489Z\"},\"messageType\":\"OversizedConfigurationItemChangeNotification\", \"notificationCreationTime\": \"2016-10-06T16:46:16.261Z\", \"recordVersion\": \"1.0\"}",
- "ruleParameters": "{\"myParameterKey\":\"myParameterValue\"}",
- "resultToken": "myResultToken",
- "eventLeftScope": false,
- "executionRoleArn": "arn:aws:iam::123456789012:role/config-role",
- "configRuleArn": "arn:aws:config:us-east-2:123456789012:config-rule/config-rule-ec2-managed-instance-inventory",
- "configRuleName": "change-triggered-config-rule",
- "configRuleId": "config-rule-0123456",
- "accountId": "123456789012",
- "version": "1.0"
-}
diff --git a/examples/event_sources/src/bedrock_agent_event.py b/examples/event_sources/src/bedrock_agent.py
similarity index 83%
rename from examples/event_sources/src/bedrock_agent_event.py
rename to examples/event_sources/src/bedrock_agent.py
index b16d3c86bad..31d5684fa08 100644
--- a/examples/event_sources/src/bedrock_agent_event.py
+++ b/examples/event_sources/src/bedrock_agent.py
@@ -1,12 +1,11 @@
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import BedrockAgentEvent, event_source
-from aws_lambda_powertools.utilities.typing import LambdaContext
logger = Logger()
@event_source(data_class=BedrockAgentEvent)
-def lambda_handler(event: BedrockAgentEvent, context: LambdaContext) -> dict:
+def lambda_handler(event: BedrockAgentEvent, context) -> dict:
input_text = event.input_text
logger.info(f"Bedrock Agent {event.action_group} invoked with input", input_text=input_text)
diff --git a/examples/event_sources/src/cloudWatchDashboard.py b/examples/event_sources/src/cloudWatchDashboard.py
new file mode 100644
index 00000000000..ff8b896a806
--- /dev/null
+++ b/examples/event_sources/src/cloudWatchDashboard.py
@@ -0,0 +1,31 @@
+from aws_lambda_powertools import Logger
+from aws_lambda_powertools.utilities.data_classes import CloudWatchDashboardCustomWidgetEvent, event_source
+
+logger = Logger()
+
+
+@event_source(data_class=CloudWatchDashboardCustomWidgetEvent)
+def lambda_handler(event: CloudWatchDashboardCustomWidgetEvent, context):
+ if event.widget_context is None:
+ logger.warning("No widget context provided")
+ return {"title": "Error", "markdown": "Widget context is missing"}
+
+ logger.info(f"Processing custom widget for dashboard: {event.widget_context.dashboard_name}")
+
+ # Access specific event properties
+ widget_id = event.widget_context.widget_id
+ time_range = event.widget_context.time_range
+
+ if time_range is None:
+ logger.warning("No time range provided")
+ return {"title": f"Custom Widget {widget_id}", "markdown": "Time range is missing"}
+
+ # Your custom widget logic here
+ return {
+ "title": f"Custom Widget {widget_id}",
+ "markdown": f"""
+ Dashboard: {event.widget_context.dashboard_name}
+ Time Range: {time_range.start} to {time_range.end}
+ Theme: {event.widget_context.theme or 'default'}
+ """,
+ }
diff --git a/examples/event_sources/src/cloudformation_custom_resource_handler.py b/examples/event_sources/src/cloudformation_custom_resource_handler.py
index fa5b85d54df..87fa2bd1ab9 100644
--- a/examples/event_sources/src/cloudformation_custom_resource_handler.py
+++ b/examples/event_sources/src/cloudformation_custom_resource_handler.py
@@ -13,31 +13,15 @@ def lambda_handler(event: CloudFormationCustomResourceEvent, context: LambdaCont
request_type = event.request_type
if request_type == "Create":
- return on_create(event)
- if request_type == "Update":
- return on_update(event)
- if request_type == "Delete":
- return on_delete(event)
+ return on_create(event, context)
+ else:
+ raise ValueError(f"Invalid request type: {request_type}")
-def on_create(event: CloudFormationCustomResourceEvent):
+def on_create(event: CloudFormationCustomResourceEvent, context: LambdaContext):
props = event.resource_properties
logger.info(f"Create new resource with props {props}.")
- # Add your create code here ...
- physical_id = ...
+ physical_id = f"MyResource-{context.aws_request_id}"
- return {"PhysicalResourceId": physical_id}
-
-
-def on_update(event: CloudFormationCustomResourceEvent):
- physical_id = event.physical_resource_id
- props = event.resource_properties
- logger.info(f"Update resource {physical_id} with props {props}.")
- # ...
-
-
-def on_delete(event: CloudFormationCustomResourceEvent):
- physical_id = event.physical_resource_id
- logger.info(f"Delete resource {physical_id}.")
- # ...
+ return {"PhysicalResourceId": physical_id, "Data": {"Message": "Resource created successfully"}}
diff --git a/examples/event_sources/src/cloudwatch_logs.py b/examples/event_sources/src/cloudwatch_logs.py
new file mode 100644
index 00000000000..95890275595
--- /dev/null
+++ b/examples/event_sources/src/cloudwatch_logs.py
@@ -0,0 +1,18 @@
+from aws_lambda_powertools import Logger
+from aws_lambda_powertools.utilities.data_classes import CloudWatchLogsEvent, event_source
+from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import CloudWatchLogsDecodedData
+
+logger = Logger()
+
+
+@event_source(data_class=CloudWatchLogsEvent)
+def lambda_handler(event: CloudWatchLogsEvent, context):
+ decompressed_log: CloudWatchLogsDecodedData = event.parse_logs_data()
+
+ logger.info(f"Log group: {decompressed_log.log_group}")
+ logger.info(f"Log stream: {decompressed_log.log_stream}")
+
+ for log_event in decompressed_log.log_events:
+ logger.info(f"Timestamp: {log_event.timestamp}, Message: {log_event.message}")
+
+ return {"statusCode": 200, "body": f"Processed {len(decompressed_log.log_events)} log events"}
diff --git a/examples/event_sources/src/code_pipeline_job.py b/examples/event_sources/src/code_pipeline_job.py
new file mode 100644
index 00000000000..39db6e60b9e
--- /dev/null
+++ b/examples/event_sources/src/code_pipeline_job.py
@@ -0,0 +1,10 @@
+from aws_lambda_powertools.utilities.data_classes import CodePipelineJobEvent, event_source
+
+
+@event_source(data_class=CodePipelineJobEvent)
+def lambda_handler(event: CodePipelineJobEvent, context):
+ job_id = event.get_id
+
+ input_bucket = event.input_bucket_name
+
+ return {"statusCode": 200, "body": f"Processed job {job_id} from bucket {input_bucket}"}
diff --git a/examples/event_sources/src/codedeploy_lifecycle_hook.py b/examples/event_sources/src/codedeploy_lifecycle_hook.py
new file mode 100644
index 00000000000..6da54d185fc
--- /dev/null
+++ b/examples/event_sources/src/codedeploy_lifecycle_hook.py
@@ -0,0 +1,9 @@
+from aws_lambda_powertools.utilities.data_classes import CodeDeployLifecycleHookEvent, event_source
+
+
+@event_source(data_class=CodeDeployLifecycleHookEvent)
+def lambda_handler(event: CodeDeployLifecycleHookEvent, context):
+ deployment_id = event.deployment_id
+ lifecycle_event_hook_execution_id = event.lifecycle_event_hook_execution_id
+
+ return {"deployment_id": deployment_id, "lifecycle_event_hook_execution_id": lifecycle_event_hook_execution_id}
diff --git a/examples/event_sources/src/cognito_create_auth.py b/examples/event_sources/src/cognito_create_auth.py
new file mode 100644
index 00000000000..9f57743f053
--- /dev/null
+++ b/examples/event_sources/src/cognito_create_auth.py
@@ -0,0 +1,11 @@
+from aws_lambda_powertools.utilities.data_classes import event_source
+from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import CreateAuthChallengeTriggerEvent
+
+
+@event_source(data_class=CreateAuthChallengeTriggerEvent)
+def handler(event: CreateAuthChallengeTriggerEvent, context) -> dict:
+ if event.request.challenge_name == "CUSTOM_CHALLENGE":
+ event.response.public_challenge_parameters = {"captchaUrl": "url/123.jpg"}
+ event.response.private_challenge_parameters = {"answer": "5"}
+ event.response.challenge_metadata = "CAPTCHA_CHALLENGE"
+ return event.raw_event
diff --git a/examples/event_sources/src/cognito_define_auth.py b/examples/event_sources/src/cognito_define_auth.py
new file mode 100644
index 00000000000..2f7d197bb26
--- /dev/null
+++ b/examples/event_sources/src/cognito_define_auth.py
@@ -0,0 +1,30 @@
+from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import DefineAuthChallengeTriggerEvent
+
+
+def lambda_handler(event, context) -> dict:
+ event_obj: DefineAuthChallengeTriggerEvent = DefineAuthChallengeTriggerEvent(event)
+
+ if len(event_obj.request.session) == 1 and event_obj.request.session[0].challenge_name == "SRP_A":
+ event_obj.response.issue_tokens = False
+ event_obj.response.fail_authentication = False
+ event_obj.response.challenge_name = "PASSWORD_VERIFIER"
+ elif (
+ len(event_obj.request.session) == 2
+ and event_obj.request.session[1].challenge_name == "PASSWORD_VERIFIER"
+ and event_obj.request.session[1].challenge_result
+ ):
+ event_obj.response.issue_tokens = False
+ event_obj.response.fail_authentication = False
+ event_obj.response.challenge_name = "CUSTOM_CHALLENGE"
+ elif (
+ len(event_obj.request.session) == 3
+ and event_obj.request.session[2].challenge_name == "CUSTOM_CHALLENGE"
+ and event_obj.request.session[2].challenge_result
+ ):
+ event_obj.response.issue_tokens = True
+ event_obj.response.fail_authentication = False
+ else:
+ event_obj.response.issue_tokens = False
+ event_obj.response.fail_authentication = True
+
+ return event_obj.raw_event
diff --git a/examples/event_sources/src/cognito_post_confirmation.py b/examples/event_sources/src/cognito_post_confirmation.py
new file mode 100644
index 00000000000..51ecc2de43f
--- /dev/null
+++ b/examples/event_sources/src/cognito_post_confirmation.py
@@ -0,0 +1,9 @@
+from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import PostConfirmationTriggerEvent
+
+
+def lambda_handler(event, context):
+ event: PostConfirmationTriggerEvent = PostConfirmationTriggerEvent(event)
+
+ user_attributes = event.request.user_attributes
+
+ return {"statusCode": 200, "body": f"User attributes: {user_attributes}"}
diff --git a/examples/event_sources/src/cognito_verify_auth.py b/examples/event_sources/src/cognito_verify_auth.py
new file mode 100644
index 00000000000..ae15942246e
--- /dev/null
+++ b/examples/event_sources/src/cognito_verify_auth.py
@@ -0,0 +1,10 @@
+from aws_lambda_powertools.utilities.data_classes import event_source
+from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import VerifyAuthChallengeResponseTriggerEvent
+
+
+@event_source(data_class=VerifyAuthChallengeResponseTriggerEvent)
+def lambda_handler(event: VerifyAuthChallengeResponseTriggerEvent, context) -> dict:
+ event.response.answer_correct = (
+ event.request.private_challenge_parameters.get("answer") == event.request.challenge_answer
+ )
+ return event.raw_event
diff --git a/examples/event_sources/src/connect_contact_flow.py b/examples/event_sources/src/connect_contact_flow.py
new file mode 100644
index 00000000000..53d120a4c4b
--- /dev/null
+++ b/examples/event_sources/src/connect_contact_flow.py
@@ -0,0 +1,14 @@
+from aws_lambda_powertools.utilities.data_classes.connect_contact_flow_event import (
+ ConnectContactFlowChannel,
+ ConnectContactFlowEndpointType,
+ ConnectContactFlowEvent,
+ ConnectContactFlowInitiationMethod,
+)
+
+
+def lambda_handler(event, context):
+ event: ConnectContactFlowEvent = ConnectContactFlowEvent(event)
+ assert event.contact_data.attributes == {"Language": "en-US"}
+ assert event.contact_data.channel == ConnectContactFlowChannel.VOICE
+ assert event.contact_data.customer_endpoint.endpoint_type == ConnectContactFlowEndpointType.TELEPHONE_NUMBER
+ assert event.contact_data.initiation_method == ConnectContactFlowInitiationMethod.API
diff --git a/examples/event_sources/src/dynamodb_multiple_records.py b/examples/event_sources/src/dynamodb_multiple_records.py
new file mode 100644
index 00000000000..8436dcfc827
--- /dev/null
+++ b/examples/event_sources/src/dynamodb_multiple_records.py
@@ -0,0 +1,13 @@
+from aws_lambda_powertools.utilities.data_classes import DynamoDBStreamEvent, event_source
+from aws_lambda_powertools.utilities.typing import LambdaContext
+
+
+@event_source(data_class=DynamoDBStreamEvent)
+def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
+ processed_keys = []
+ for record in event.records:
+ if record.dynamodb and record.dynamodb.keys and "Id" in record.dynamodb.keys:
+ key = record.dynamodb.keys["Id"]
+ processed_keys.append(key)
+
+ return {"statusCode": 200, "body": f"Processed keys: {processed_keys}"}
diff --git a/examples/event_sources/src/dynamodb_stream.py b/examples/event_sources/src/dynamodb_stream.py
new file mode 100644
index 00000000000..e317ddac8d4
--- /dev/null
+++ b/examples/event_sources/src/dynamodb_stream.py
@@ -0,0 +1,16 @@
+from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import (
+ DynamoDBRecordEventName,
+ DynamoDBStreamEvent,
+)
+
+
+def lambda_handler(event, context):
+ event: DynamoDBStreamEvent = DynamoDBStreamEvent(event)
+
+ # Multiple records can be delivered in a single event
+ for record in event.records:
+ if record.event_name == DynamoDBRecordEventName.MODIFY:
+ pass
+ elif record.event_name == DynamoDBRecordEventName.INSERT:
+ pass
+ return "success"
diff --git a/examples/event_sources/src/eventBridgeEvent.py b/examples/event_sources/src/eventBridgeEvent.py
new file mode 100644
index 00000000000..5bd9c165824
--- /dev/null
+++ b/examples/event_sources/src/eventBridgeEvent.py
@@ -0,0 +1,11 @@
+from aws_lambda_powertools.utilities.data_classes import EventBridgeEvent, event_source
+
+
+@event_source(data_class=EventBridgeEvent)
+def lambda_handler(event: EventBridgeEvent, context):
+ detail_type = event.detail_type
+ state = event.detail.get("state")
+
+ # Do something
+
+ return {"detail_type": detail_type, "state": state}
diff --git a/examples/event_sources/src/getting_started_data_classes.py b/examples/event_sources/src/getting_started_data_classes.py
new file mode 100644
index 00000000000..64119fc4c0f
--- /dev/null
+++ b/examples/event_sources/src/getting_started_data_classes.py
@@ -0,0 +1,9 @@
+from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEvent
+
+
+def lambda_handler(event: dict, context):
+ api_event = APIGatewayProxyEvent(event)
+ if "hello" in api_event.path and api_event.http_method == "GET":
+ return {"statusCode": 200, "body": f"Hello from path: {api_event.path}"}
+ else:
+ return {"statusCode": 400, "body": "No Hello from path"}
diff --git a/examples/event_sources/src/kafka_event.py b/examples/event_sources/src/kafka_event.py
new file mode 100644
index 00000000000..c6f62e243eb
--- /dev/null
+++ b/examples/event_sources/src/kafka_event.py
@@ -0,0 +1,12 @@
+from aws_lambda_powertools.utilities.data_classes import KafkaEvent, event_source
+
+
+def do_something_with(key: str, value: str):
+ print(f"key: {key}, value: {value}")
+
+
+@event_source(data_class=KafkaEvent)
+def lambda_handler(event: KafkaEvent, context):
+ for record in event.records:
+ do_something_with(record.topic, record.value)
+ return "success"
diff --git a/examples/event_sources/src/kinesisStreamCloudWatchLogs.py b/examples/event_sources/src/kinesisStreamCloudWatchLogs.py
new file mode 100644
index 00000000000..fa6fccf2b17
--- /dev/null
+++ b/examples/event_sources/src/kinesisStreamCloudWatchLogs.py
@@ -0,0 +1,17 @@
+from typing import List
+
+from aws_lambda_powertools.utilities.data_classes import event_source
+from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import CloudWatchLogsDecodedData
+from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import (
+ KinesisStreamEvent,
+ extract_cloudwatch_logs_from_event,
+)
+
+
+@event_source(data_class=KinesisStreamEvent)
+def lambda_handler(event: KinesisStreamEvent, context):
+ logs: List[CloudWatchLogsDecodedData] = extract_cloudwatch_logs_from_event(event)
+ for log in logs:
+ if log.message_type == "DATA_MESSAGE":
+ return "success"
+ return "nothing to be processed"
diff --git a/examples/event_sources/src/kinesis_batch_example.py b/examples/event_sources/src/kinesis_batch_example.py
new file mode 100644
index 00000000000..0a7366fdd8b
--- /dev/null
+++ b/examples/event_sources/src/kinesis_batch_example.py
@@ -0,0 +1,29 @@
+from aws_lambda_powertools import Logger
+from aws_lambda_powertools.utilities.batch import (
+ BatchProcessor,
+ EventType,
+ process_partial_response,
+)
+from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import (
+ KinesisStreamRecord,
+ extract_cloudwatch_logs_from_record,
+)
+
+logger = Logger()
+
+processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
+
+
+def record_handler(record: KinesisStreamRecord):
+ log = extract_cloudwatch_logs_from_record(record)
+ logger.info(f"Message type: {log.message_type}")
+ return log.message_type == "DATA_MESSAGE"
+
+
+def lambda_handler(event, context):
+ return process_partial_response(
+ event=event,
+ record_handler=record_handler,
+ processor=processor,
+ context=context,
+ )
diff --git a/examples/event_sources/src/kinesis_streams.py b/examples/event_sources/src/kinesis_streams.py
new file mode 100644
index 00000000000..630190c5807
--- /dev/null
+++ b/examples/event_sources/src/kinesis_streams.py
@@ -0,0 +1,40 @@
+import json
+from typing import Any, Dict, Union
+
+from aws_lambda_powertools import Logger
+from aws_lambda_powertools.utilities.data_classes import KinesisStreamEvent, event_source
+from aws_lambda_powertools.utilities.typing import LambdaContext
+
+logger = Logger()
+
+
+@event_source(data_class=KinesisStreamEvent)
+def lambda_handler(event: KinesisStreamEvent, context: LambdaContext):
+ for record in event.records:
+ kinesis_record = record.kinesis
+
+ payload: Union[Dict[str, Any], str]
+
+ try:
+ # Try to parse as JSON first
+ payload = kinesis_record.data_as_json()
+ logger.info("Received JSON data from Kinesis")
+ except json.JSONDecodeError:
+ # If JSON parsing fails, get as text
+ payload = kinesis_record.data_as_text()
+ logger.info("Received text data from Kinesis")
+
+ process_data(payload)
+
+ return {"statusCode": 200, "body": "Processed all records successfully"}
+
+
+def process_data(data: Union[Dict[str, Any], str]) -> None:
+ if isinstance(data, dict):
+ # Handle JSON data
+ logger.info(f"Processing JSON data: {data}")
+ # Add your JSON processing logic here
+ else:
+ # Handle text data
+ logger.info(f"Processing text data: {data}")
+ # Add your text processing logic here
diff --git a/examples/event_sources/src/lambdaFunctionUrl.py b/examples/event_sources/src/lambdaFunctionUrl.py
new file mode 100644
index 00000000000..f518d825680
--- /dev/null
+++ b/examples/event_sources/src/lambdaFunctionUrl.py
@@ -0,0 +1,7 @@
+from aws_lambda_powertools.utilities.data_classes import LambdaFunctionUrlEvent, event_source
+
+
+@event_source(data_class=LambdaFunctionUrlEvent)
+def lambda_handler(event: LambdaFunctionUrlEvent, context):
+ if event.request_context.http.method == "GET":
+ return {"statusCode": 200, "body": "Hello World!"}
diff --git a/examples/event_sources/src/rabbit_mq_example.py b/examples/event_sources/src/rabbit_mq_example.py
new file mode 100644
index 00000000000..998f012fdba
--- /dev/null
+++ b/examples/event_sources/src/rabbit_mq_example.py
@@ -0,0 +1,21 @@
+from typing import Dict
+
+from aws_lambda_powertools import Logger
+from aws_lambda_powertools.utilities.data_classes import event_source
+from aws_lambda_powertools.utilities.data_classes.rabbit_mq_event import RabbitMQEvent
+
+logger = Logger()
+
+
+@event_source(data_class=RabbitMQEvent)
+def lambda_handler(event: RabbitMQEvent, context):
+ for queue_name, messages in event.rmq_messages_by_queue.items():
+ logger.debug(f"Messages for queue: {queue_name}")
+ for message in messages:
+ logger.debug(f"MessageID: {message.basic_properties.message_id}")
+ data: Dict = message.json_data
+ logger.debug(f"Process json in base64 encoded data str {data}")
+ return {
+ "queue_name": queue_name,
+ "message_id": message.basic_properties.message_id,
+ }
diff --git a/examples/event_sources/src/s3Event.py b/examples/event_sources/src/s3Event.py
new file mode 100644
index 00000000000..2307bdfc5e0
--- /dev/null
+++ b/examples/event_sources/src/s3Event.py
@@ -0,0 +1,18 @@
+from urllib.parse import unquote_plus
+
+from aws_lambda_powertools.utilities.data_classes import S3Event, event_source
+
+
+@event_source(data_class=S3Event)
+def lambda_handler(event: S3Event, context):
+ bucket_name = event.bucket_name
+
+ # Multiple records can be delivered in a single event
+ for record in event.records:
+ object_key = unquote_plus(record.s3.get_object.key)
+ object_etag = record.s3.get_object.etag
+ return {
+ "bucket": bucket_name,
+ "object_key": object_key,
+ "object_etag": object_etag,
+ }
diff --git a/examples/event_sources/src/s3_event_bridge.py b/examples/event_sources/src/s3_event_bridge.py
new file mode 100644
index 00000000000..425c144bfd8
--- /dev/null
+++ b/examples/event_sources/src/s3_event_bridge.py
@@ -0,0 +1,13 @@
+from aws_lambda_powertools.utilities.data_classes import S3EventBridgeNotificationEvent, event_source
+
+
+@event_source(data_class=S3EventBridgeNotificationEvent)
+def lambda_handler(event: S3EventBridgeNotificationEvent, context):
+ bucket_name = event.detail.bucket.name
+ file_key = event.detail.object.key
+ if event.detail_type == "Object Created":
+ print(f"Object {file_key} created in bucket {bucket_name}")
+ return {
+ "bucket": bucket_name,
+ "file_key": file_key,
+ }
diff --git a/examples/event_sources/src/s3_object_lambda.py b/examples/event_sources/src/s3_object_lambda.py
new file mode 100644
index 00000000000..11e20287191
--- /dev/null
+++ b/examples/event_sources/src/s3_object_lambda.py
@@ -0,0 +1,31 @@
+import boto3
+import requests
+
+from aws_lambda_powertools import Logger
+from aws_lambda_powertools.logging.correlation_paths import S3_OBJECT_LAMBDA
+from aws_lambda_powertools.utilities.data_classes.s3_object_event import S3ObjectLambdaEvent
+
+logger = Logger()
+session = boto3.session.Session()
+s3 = session.client("s3")
+
+
+@logger.inject_lambda_context(correlation_id_path=S3_OBJECT_LAMBDA, log_event=True)
+def lambda_handler(event, context):
+ event = S3ObjectLambdaEvent(event)
+
+ # Get object from S3
+ response = requests.get(event.input_s3_url)
+ original_object = response.content.decode("utf-8")
+
+ # Make changes to the object about to be returned
+ transformed_object = original_object.upper()
+
+ # Write object back to S3 Object Lambda
+ s3.write_get_object_response(
+ Body=transformed_object,
+ RequestRoute=event.request_route,
+ RequestToken=event.request_token,
+ )
+
+ return {"status_code": 200}
diff --git a/examples/event_sources/src/secrets_manager_event.json b/examples/event_sources/src/secrets_manager_event.json
deleted file mode 100644
index 18e7dcd935b..00000000000
--- a/examples/event_sources/src/secrets_manager_event.json
+++ /dev/null
@@ -1,5 +0,0 @@
-{
- "SecretId":"arn:aws:secretsmanager:us-west-2:123456789012:secret:MyTestDatabaseSecret-a1b2c3",
- "ClientRequestToken":"550e8400-e29b-41d4-a716-446655440000",
- "Step":"createSecret"
-}
diff --git a/examples/event_sources/src/ses_event.py b/examples/event_sources/src/ses_event.py
new file mode 100644
index 00000000000..690bfd2f7bc
--- /dev/null
+++ b/examples/event_sources/src/ses_event.py
@@ -0,0 +1,13 @@
+from aws_lambda_powertools.utilities.data_classes import SESEvent, event_source
+
+
+@event_source(data_class=SESEvent)
+def lambda_handler(event: SESEvent, context):
+ # Multiple records can be delivered in a single event
+ for record in event.records:
+ mail = record.ses.mail
+ common_headers = mail.common_headers
+ return {
+ "mail": mail,
+ "common_headers": common_headers,
+ }
diff --git a/examples/event_sources/src/sns_event.py b/examples/event_sources/src/sns_event.py
new file mode 100644
index 00000000000..a45e02b1e24
--- /dev/null
+++ b/examples/event_sources/src/sns_event.py
@@ -0,0 +1,13 @@
+from aws_lambda_powertools.utilities.data_classes import SNSEvent, event_source
+
+
+@event_source(data_class=SNSEvent)
+def lambda_handler(event: SNSEvent, context):
+ # Multiple records can be delivered in a single event
+ for record in event.records:
+ message = record.sns.message
+ subject = record.sns.subject
+ return {
+ "message": message,
+ "subject": subject,
+ }
diff --git a/examples/event_sources/src/sqs_event.py b/examples/event_sources/src/sqs_event.py
new file mode 100644
index 00000000000..b76b1bfd360
--- /dev/null
+++ b/examples/event_sources/src/sqs_event.py
@@ -0,0 +1,13 @@
+from aws_lambda_powertools.utilities.data_classes import SQSEvent, event_source
+
+
+@event_source(data_class=SQSEvent)
+def lambda_handler(event: SQSEvent, context):
+ # Multiple records can be delivered in a single event
+ for record in event.records:
+ message = record.body
+ message_id = record.message_id
+ return {
+ "message": message,
+ "message_id": message_id,
+ }