-
Notifications
You must be signed in to change notification settings - Fork 420
/
Copy pathdata_classes.md
1287 lines (964 loc) · 48.5 KB
/
data_classes.md
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
---
title: Event Source Data Classes
description: Utility
---
<!-- markdownlint-disable MD043 -->
Event Source Data Classes utility provides classes self-describing Lambda event sources.
## Key features
* Type hinting and code completion for common event types
* Helper functions for decoding/deserializing nested fields
* Docstrings for fields contained in event schemas
**Background**
When authoring Lambda functions, you often need to understand the schema of the event dictionary which is passed to the
handler. There are several common event types which follow a specific schema, depending on the service triggering the
Lambda function.
## Getting started
### Utilizing the data classes
The classes are initialized by passing in the Lambda event object into the constructor of the appropriate data class or
by using the `event_source` decorator.
For example, if your Lambda function is being triggered by an API Gateway proxy integration, you can use the
`APIGatewayProxyEvent` class.
=== "app.py"
```python hl_lines="1 4"
from aws_lambda_powertools.utilities.data_classes import APIGatewayProxyEvent
def lambda_handler(event: dict, context):
event = APIGatewayProxyEvent(event)
if 'helloworld' in event.path and event.http_method == 'GET':
do_something_with(event.body, user)
```
Same example as above, but using the `event_source` decorator
=== "app.py"
```python hl_lines="1 3"
from aws_lambda_powertools.utilities.data_classes import event_source, APIGatewayProxyEvent
@event_source(data_class=APIGatewayProxyEvent)
def lambda_handler(event: APIGatewayProxyEvent, context):
if 'helloworld' in event.path and event.http_method == 'GET':
do_something_with(event.body, user)
```
Log Data Event for Troubleshooting
=== "app.py"
```python hl_lines="4 8"
from aws_lambda_powertools.utilities.data_classes import event_source, APIGatewayProxyEvent
from aws_lambda_powertools.logging.logger import Logger
logger = Logger(service="hello_logs", level="DEBUG")
@event_source(data_class=APIGatewayProxyEvent)
def lambda_handler(event: APIGatewayProxyEvent, context):
logger.debug(event)
```
**Autocomplete with self-documented properties and methods**

## Supported event sources
| Event Source | Data_class |
|-------------------------------------------------------------------------------|----------------------------------------------------|
| [Active MQ](#active-mq) | `ActiveMQEvent` |
| [API Gateway Authorizer](#api-gateway-authorizer) | `APIGatewayAuthorizerRequestEvent` |
| [API Gateway Authorizer V2](#api-gateway-authorizer-v2) | `APIGatewayAuthorizerEventV2` |
| [API Gateway Proxy](#api-gateway-proxy) | `APIGatewayProxyEvent` |
| [API Gateway Proxy V2](#api-gateway-proxy-v2) | `APIGatewayProxyEventV2` |
| [Application Load Balancer](#application-load-balancer) | `ALBEvent` |
| [AppSync Authorizer](#appsync-authorizer) | `AppSyncAuthorizerEvent` |
| [AppSync Resolver](#appsync-resolver) | `AppSyncResolverEvent` |
| [AWS Config Rule](#aws-config-rule) | `AWSConfigRuleEvent` |
| [Bedrock Agent](#bedrock-agent) | `BedrockAgent` |
| [CloudFormation Custom Resource](#cloudformation-custom-resource) | `CloudFormationCustomResourceEvent` |
| [CloudWatch Alarm State Change Action](#cloudwatch-alarm-state-change-action) | `CloudWatchAlarmEvent` |
| [CloudWatch Dashboard Custom Widget](#cloudwatch-dashboard-custom-widget) | `CloudWatchDashboardCustomWidgetEvent` |
| [CloudWatch Logs](#cloudwatch-logs) | `CloudWatchLogsEvent` |
| [CodePipeline Job Event](#codepipeline-job) | `CodePipelineJobEvent` |
| [Cognito User Pool](#cognito-user-pool) | Multiple available under `cognito_user_pool_event` |
| [Connect Contact Flow](#connect-contact-flow) | `ConnectContactFlowEvent` |
| [DynamoDB streams](#dynamodb-streams) | `DynamoDBStreamEvent`, `DynamoDBRecordEventName` |
| [EventBridge](#eventbridge) | `EventBridgeEvent` |
| [Kafka](#kafka) | `KafkaEvent` |
| [Kinesis Data Stream](#kinesis-streams) | `KinesisStreamEvent` |
| [Kinesis Firehose Delivery Stream](#kinesis-firehose-delivery-stream) | `KinesisFirehoseEvent` |
| [Lambda Function URL](#lambda-function-url) | `LambdaFunctionUrlEvent` |
| [Rabbit MQ](#rabbit-mq) | `RabbitMQEvent` |
| [S3](#s3) | `S3Event` |
| [S3 Batch Operations](#s3-batch-operations) | `S3BatchOperationEvent` |
| [S3 Object Lambda](#s3-object-lambda) | `S3ObjectLambdaEvent` |
| [S3 EventBridge Notification](#s3-eventbridge-notification) | `S3EventBridgeNotificationEvent` |
| [SES](#ses) | `SESEvent` |
| [SNS](#sns) | `SNSEvent` |
| [SQS](#sqs) | `SQSEvent` |
| [VPC Lattice V2](#vpc-lattice-v2) | `VPCLatticeV2Event` |
| [VPC Lattice V1](#vpc-lattice-v1) | `VPCLatticeEvent` |
???+ info
The examples provided below are far from exhaustive - the data classes themselves are designed to provide a form of
documentation inherently (via autocompletion, types and docstrings).
### Active MQ
It is used for [Active MQ payloads](https://docs.aws.amazon.com/lambda/latest/dg/with-mq.html){target="_blank"}, also see
the [AWS blog post](https://aws.amazon.com/blogs/compute/using-amazon-mq-as-an-event-source-for-aws-lambda/){target="_blank"}
for more details.
=== "app.py"
```python hl_lines="4-5 9-10"
from typing import Dict
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.active_mq_event import ActiveMQEvent
logger = Logger()
@event_source(data_class=ActiveMQEvent)
def lambda_handler(event: ActiveMQEvent, context):
for message in event.messages:
logger.debug(f"MessageID: {message.message_id}")
data: Dict = message.json_data
logger.debug("Process json in base64 encoded data str", data)
```
### API Gateway Authorizer
> New in 1.20.0
It is used for [API Gateway Rest API Lambda Authorizer payload](https://docs.aws.amazon.com/apigateway/latest/developerguide/apigateway-use-lambda-authorizer.html){target="_blank"}.
Use **`APIGatewayAuthorizerRequestEvent`** for type `REQUEST` and **`APIGatewayAuthorizerTokenEvent`** for type `TOKEN`.
=== "app_type_request.py"
This example uses the `APIGatewayAuthorizerResponse` to decline a given request if the user is not found.
When the user is found, it includes the user details in the request context that will be available to the back-end, and returns a full access policy for admin users.
```python hl_lines="2-6 29 36-42 47 49"
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import (
DENY_ALL_RESPONSE,
APIGatewayAuthorizerRequestEvent,
APIGatewayAuthorizerResponse,
HttpVerb,
)
from secrets import compare_digest
def get_user_by_token(token):
if compare_digest(token, "admin-foo"):
return {"id": 0, "name": "Admin", "isAdmin": True}
elif compare_digest(token, "regular-foo"):
return {"id": 1, "name": "Joe"}
else:
return None
@event_source(data_class=APIGatewayAuthorizerRequestEvent)
def handler(event: APIGatewayAuthorizerRequestEvent, context):
user = get_user_by_token(event.get_header_value("Authorization"))
if user is None:
# No user was found
# to return 401 - `{"message":"Unauthorized"}`, but pollutes lambda error count metrics
# raise Exception("Unauthorized")
# to return 403 - `{"message":"Forbidden"}`
return DENY_ALL_RESPONSE
# parse the `methodArn` as an `APIGatewayRouteArn`
arn = event.parsed_arn
# Create the response builder from parts of the `methodArn`
# and set the logged in user id and context
policy = APIGatewayAuthorizerResponse(
principal_id=user["id"],
context=user,
region=arn.region,
aws_account_id=arn.aws_account_id,
api_id=arn.api_id,
stage=arn.stage,
)
# Conditional IAM Policy
if user.get("isAdmin", False):
policy.allow_all_routes()
else:
policy.allow_route(HttpVerb.GET.value, "/user-profile")
return policy.asdict()
```
=== "app_type_token.py"
```python hl_lines="2-5 12-18 21 23-24"
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import (
APIGatewayAuthorizerTokenEvent,
APIGatewayAuthorizerResponse,
)
@event_source(data_class=APIGatewayAuthorizerTokenEvent)
def handler(event: APIGatewayAuthorizerTokenEvent, context):
arn = event.parsed_arn
policy = APIGatewayAuthorizerResponse(
principal_id="user",
region=arn.region,
aws_account_id=arn.aws_account_id,
api_id=arn.api_id,
stage=arn.stage
)
if event.authorization_token == "42":
policy.allow_all_routes()
else:
policy.deny_all_routes()
return policy.asdict()
```
### API Gateway Authorizer V2
> New in 1.20.0
It is used for [API Gateway HTTP API Lambda Authorizer payload version 2](https://docs.aws.amazon.com/apigateway/latest/developerguide/http-api-lambda-authorizer.html){target="_blank"}.
See also [this blog post](https://aws.amazon.com/blogs/compute/introducing-iam-and-lambda-authorizers-for-amazon-api-gateway-http-apis/){target="_blank"} for more details.
=== "app.py"
This example looks up user details via `x-token` header. It uses `APIGatewayAuthorizerResponseV2` to return a deny policy when user is not found or authorized.
```python hl_lines="2-5 21 24"
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.api_gateway_authorizer_event import (
APIGatewayAuthorizerEventV2,
APIGatewayAuthorizerResponseV2,
)
from secrets import compare_digest
def get_user_by_token(token):
if compare_digest(token, "Foo"):
return {"name": "Foo"}
return None
@event_source(data_class=APIGatewayAuthorizerEventV2)
def handler(event: APIGatewayAuthorizerEventV2, context):
user = get_user_by_token(event.get_header_value("x-token"))
if user is None:
# No user was found, so we return not authorized
return APIGatewayAuthorizerResponseV2().asdict()
# Found the user and setting the details in the context
return APIGatewayAuthorizerResponseV2(authorize=True, context=user).asdict()
```
### API Gateway Proxy
It is used for either API Gateway REST API or HTTP API using v1 proxy event.
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes import event_source, APIGatewayProxyEvent
@event_source(data_class=APIGatewayProxyEvent)
def lambda_handler(event: APIGatewayProxyEvent, context):
if "helloworld" in event.path and event.http_method == "GET":
request_context = event.request_context
identity = request_context.identity
user = identity.user
do_something_with(event.json_body, user)
```
### API Gateway Proxy V2
It is used for HTTP API using v2 proxy event.
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes import event_source, APIGatewayProxyEventV2
@event_source(data_class=APIGatewayProxyEventV2)
def lambda_handler(event: APIGatewayProxyEventV2, context):
if "helloworld" in event.path and event.http_method == "POST":
do_something_with(event.json_body, event.query_string_parameters)
```
### Application Load Balancer
Is it used for Application load balancer event.
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes import event_source, ALBEvent
@event_source(data_class=ALBEvent)
def lambda_handler(event: ALBEvent, context):
if "helloworld" in event.path and event.http_method == "POST":
do_something_with(event.json_body, event.query_string_parameters)
```
### AppSync Authorizer
> New in 1.20.0
Used when building an [AWS_LAMBDA Authorization](https://docs.aws.amazon.com/appsync/latest/devguide/security-authz.html#aws-lambda-authorization){target="_blank"} with AppSync.
See blog post [Introducing Lambda authorization for AWS AppSync GraphQL APIs](https://aws.amazon.com/blogs/mobile/appsync-lambda-auth/){target="_blank"}
or read the Amplify documentation on using [AWS Lambda for authorization](https://docs.amplify.aws/lib/graphqlapi/authz/q/platform/js#aws-lambda){target="_blank"} with AppSync.
In this example extract the `requestId` as the `correlation_id` for logging, used `@event_source` decorator and builds the AppSync authorizer using the `AppSyncAuthorizerResponse` helper.
=== "app.py"
```python
from typing import Dict
from aws_lambda_powertools.logging import correlation_paths
from aws_lambda_powertools.logging.logger import Logger
from aws_lambda_powertools.utilities.data_classes.appsync_authorizer_event import (
AppSyncAuthorizerEvent,
AppSyncAuthorizerResponse,
)
from aws_lambda_powertools.utilities.data_classes.event_source import event_source
logger = Logger()
def get_user_by_token(token: str):
"""Look a user by token"""
...
@logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_AUTHORIZER)
@event_source(data_class=AppSyncAuthorizerEvent)
def lambda_handler(event: AppSyncAuthorizerEvent, context) -> Dict:
user = get_user_by_token(event.authorization_token)
if not user:
# No user found, return not authorized
return AppSyncAuthorizerResponse().asdict()
return AppSyncAuthorizerResponse(
authorize=True,
resolver_context={"id": user.id},
# Only allow admins to delete events
deny_fields=None if user.is_admin else ["Mutation.deleteEvent"],
).asdict()
```
### AppSync Resolver
> New in 1.12.0
Used when building Lambda GraphQL Resolvers with [Amplify GraphQL Transform Library](https://docs.amplify.aws/cli/graphql-transformer/function){target="_blank"} (`@function`),
and [AppSync Direct Lambda Resolvers](https://aws.amazon.com/blogs/mobile/appsync-direct-lambda/){target="_blank"}.
In this example, we also use the new Logger `correlation_id` and built-in `correlation_paths` to extract, if available, X-Ray Trace ID in AppSync request headers:
=== "app.py"
```python hl_lines="2-5 12 14 19 21 29-30"
from aws_lambda_powertools.logging import Logger, correlation_paths
from aws_lambda_powertools.utilities.data_classes.appsync_resolver_event import (
AppSyncResolverEvent,
AppSyncIdentityCognito
)
logger = Logger()
def get_locations(name: str = None, size: int = 0, page: int = 0):
"""Your resolver logic here"""
@logger.inject_lambda_context(correlation_id_path=correlation_paths.APPSYNC_RESOLVER)
def lambda_handler(event, context):
event: AppSyncResolverEvent = AppSyncResolverEvent(event)
# Case insensitive look up of request headers
x_forwarded_for = event.get_header_value("x-forwarded-for")
# Support for AppSyncIdentityCognito or AppSyncIdentityIAM identity types
assert isinstance(event.identity, AppSyncIdentityCognito)
identity: AppSyncIdentityCognito = event.identity
# Logging with correlation_id
logger.debug({
"x-forwarded-for": x_forwarded_for,
"username": identity.username
})
if event.type_name == "Merchant" and event.field_name == "locations":
return get_locations(**event.arguments)
raise ValueError(f"Unsupported field resolver: {event.field_name}")
```
=== "Example AppSync Event"
```json hl_lines="2-8 14 19 20"
{
"typeName": "Merchant",
"fieldName": "locations",
"arguments": {
"page": 2,
"size": 1,
"name": "value"
},
"identity": {
"claims": {
"iat": 1615366261
...
},
"username": "mike",
...
},
"request": {
"headers": {
"x-amzn-trace-id": "Root=1-60488877-0b0c4e6727ab2a1c545babd0",
"x-forwarded-for": "127.0.0.1"
...
}
},
...
}
```
=== "Example CloudWatch Log"
```json hl_lines="5 6 16"
{
"level":"DEBUG",
"location":"lambda_handler:22",
"message":{
"x-forwarded-for":"127.0.0.1",
"username":"mike"
},
"timestamp":"2021-03-10 12:38:40,062",
"service":"service_undefined",
"sampling_rate":0.0,
"cold_start":true,
"function_name":"func_name",
"function_memory_size":512,
"function_arn":"func_arn",
"function_request_id":"6735a29c-c000-4ae3-94e6-1f1c934f7f94",
"correlation_id":"Root=1-60488877-0b0c4e6727ab2a1c545babd0"
}
```
### AWS Config Rule
=== "aws_config_rule.py"
```python hl_lines="3 11"
--8<-- "examples/event_sources/src/aws_config_rule.py"
```
=== "Event - ItemChanged"
```json
--8<-- "examples/event_sources/src/aws_config_rule_item_changed.json"
```
=== "Event - Oversized"
```json
--8<-- "examples/event_sources/src/aws_config_rule_oversized.json"
```
=== "Event - ScheduledNotification"
```json
--8<-- "examples/event_sources/src/aws_config_rule_scheduled.json"
```
### Bedrock Agent
=== "app.py"
```python hl_lines="2 8 10"
--8<-- "examples/event_sources/src/bedrock_agent_event.py"
```
### CloudFormation Custom Resource
=== "app.py"
```python hl_lines="11 13 15 17 19"
--8<-- "examples/event_sources/src/cloudformation_custom_resource_handler.py"
```
### CloudWatch Dashboard Custom Widget
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes import event_source, CloudWatchDashboardCustomWidgetEvent
const DOCS = `
## Echo
A simple echo script. Anything passed in \`\`\`echo\`\`\` parameter is returned as the content of custom widget.
### Widget parameters
| Param | Description |
| -------- | ------------------------ |
| **echo** | The content to echo back |
### Example parameters
\`\`\` yaml
echo: <h1>Hello world</h1>
\`\`\`
`
@event_source(data_class=CloudWatchDashboardCustomWidgetEvent)
def lambda_handler(event: CloudWatchDashboardCustomWidgetEvent, context):
if event.describe:
return DOCS
# You can directly return HTML or JSON content
# Alternatively, you can return markdown that will be rendered by CloudWatch
echo = event.widget_context.params["echo"]
return { "markdown": f"# {echo}" }
```
### CloudWatch Alarm State Change Action
[CloudWatch supports Lambda as an alarm state change action](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/AlarmThatSendsEmail.html#alarms-and-actions){target="_blank"}.
You can use the `CloudWathAlarmEvent` data class to access the fields containing such data as alarm information, current state, and previous state.
=== "app.py"
```python hl_lines="2 8"
--8<-- "examples/event_sources/src/cloudwatch_alarm_event.py"
```
### CloudWatch Logs
CloudWatch Logs events by default are compressed and base64 encoded. You can use the helper function provided to decode,
decompress and parse json data from the event.
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes import event_source, CloudWatchLogsEvent
from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import CloudWatchLogsDecodedData
@event_source(data_class=CloudWatchLogsEvent)
def lambda_handler(event: CloudWatchLogsEvent, context):
decompressed_log: CloudWatchLogsDecodedData = event.parse_logs_data()
log_events = decompressed_log.log_events
for event in log_events:
do_something_with(event.timestamp, event.message)
```
#### Kinesis integration
[When streaming CloudWatch Logs to a Kinesis Data Stream](https://aws.amazon.com/premiumsupport/knowledge-center/streaming-cloudwatch-logs/){target="_blank"} (cross-account or not), you can use `extract_cloudwatch_logs_from_event` to decode, decompress and extract logs as `CloudWatchLogsDecodedData` to ease log processing.
=== "app.py"
```python hl_lines="5-6 11"
from typing import List
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import CloudWatchLogsDecodedData
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import (
KinesisStreamEvent, extract_cloudwatch_logs_from_event)
@event_source(data_class=KinesisStreamEvent)
def simple_handler(event: KinesisStreamEvent, context):
logs: List[CloudWatchLogsDecodedData] = extract_cloudwatch_logs_from_event(event)
for log in logs:
if log.message_type == "DATA_MESSAGE":
return "success"
return "nothing to be processed"
```
Alternatively, you can use `extract_cloudwatch_logs_from_record` to seamless integrate with the [Batch utility](./batch.md){target="_blank"} for more robust log processing.
=== "app.py"
```python hl_lines="3-4 10"
from aws_lambda_powertools.utilities.batch import (BatchProcessor, EventType,
batch_processor)
from aws_lambda_powertools.utilities.data_classes.kinesis_stream_event import (
KinesisStreamRecord, extract_cloudwatch_logs_from_record)
processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
def record_handler(record: KinesisStreamRecord):
log = extract_cloudwatch_logs_from_record(record)
return log.message_type == "DATA_MESSAGE"
@batch_processor(record_handler=record_handler, processor=processor)
def lambda_handler(event, context):
return processor.response()
```
### CodePipeline Job
Data classes and utility functions to help create continuous delivery pipelines tasks with AWS Lambda
=== "app.py"
```python
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.data_classes import event_source, CodePipelineJobEvent
logger = Logger()
@event_source(data_class=CodePipelineJobEvent)
def lambda_handler(event, context):
"""The Lambda function handler
If a continuing job then checks the CloudFormation stack status
and updates the job accordingly.
If a new job then kick of an update or creation of the target
CloudFormation stack.
"""
# Extract the Job ID
job_id = event.get_id
# Extract the params
params: dict = event.decoded_user_parameters
stack = params["stack"]
artifact_name = params["artifact"]
template_file = params["file"]
try:
if event.data.continuation_token:
# If we're continuing then the create/update has already been triggered
# we just need to check if it has finished.
check_stack_update_status(job_id, stack)
else:
template = event.get_artifact(artifact_name, template_file)
# Kick off a stack update or create
start_update_or_create(job_id, stack, template)
except Exception as e:
# If any other exceptions which we didn't expect are raised
# then fail the job and log the exception message.
logger.exception("Function failed due to exception.")
put_job_failure(job_id, "Function exception: " + str(e))
logger.debug("Function complete.")
return "Complete."
```
### Cognito User Pool
Cognito User Pools have several [different Lambda trigger sources](https://docs.aws.amazon.com/cognito/latest/developerguide/cognito-user-identity-pools-working-with-aws-lambda-triggers.html#cognito-user-identity-pools-working-with-aws-lambda-trigger-sources){target="_blank"}, all of which map to a different data class, which
can be imported from `aws_lambda_powertools.data_classes.cognito_user_pool_event`:
| Trigger/Event Source | Data Class |
| --------------------- | ------------------------------------------------------------------------------ |
| Custom message event | `data_classes.cognito_user_pool_event.CustomMessageTriggerEvent` |
| Post authentication | `data_classes.cognito_user_pool_event.PostAuthenticationTriggerEvent` |
| Post confirmation | `data_classes.cognito_user_pool_event.PostConfirmationTriggerEvent` |
| Pre authentication | `data_classes.cognito_user_pool_event.PreAuthenticationTriggerEvent` |
| Pre sign-up | `data_classes.cognito_user_pool_event.PreSignUpTriggerEvent` |
| Pre token generation | `data_classes.cognito_user_pool_event.PreTokenGenerationTriggerEvent` |
| Pre token generation V2 | `data_classes.cognito_user_pool_event.PreTokenGenerationV2TriggerEvent` |
| User migration | `data_classes.cognito_user_pool_event.UserMigrationTriggerEvent` |
| Define Auth Challenge | `data_classes.cognito_user_pool_event.DefineAuthChallengeTriggerEvent` |
| Create Auth Challenge | `data_classes.cognito_user_pool_event.CreateAuthChallengeTriggerEvent` |
| Verify Auth Challenge | `data_classes.cognito_user_pool_event.VerifyAuthChallengeResponseTriggerEvent` |
| Custom Email Sender | `data_classes.cognito_user_pool_event.CustomEmailSenderTriggerEvent` |
| Custom SMS Sender | `data_classes.cognito_user_pool_event.CustomSMSSenderTriggerEvent` |
#### Post Confirmation Example
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import PostConfirmationTriggerEvent
def lambda_handler(event, context):
event: PostConfirmationTriggerEvent = PostConfirmationTriggerEvent(event)
user_attributes = event.request.user_attributes
do_something_with(user_attributes)
```
#### Define Auth Challenge Example
???+ note
In this example we are modifying the wrapped dict response fields, so we need to return the json serializable wrapped event in `event.raw_event`.
This example is based on the AWS Cognito docs for [Define Auth Challenge Lambda Trigger](https://docs.aws.amazon.com/cognito/latest/developerguide/user-pool-lambda-define-auth-challenge.html){target="_blank"}.
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import DefineAuthChallengeTriggerEvent
def handler(event: dict, context) -> dict:
event: DefineAuthChallengeTriggerEvent = DefineAuthChallengeTriggerEvent(event)
if (
len(event.request.session) == 1
and event.request.session[0].challenge_name == "SRP_A"
):
event.response.issue_tokens = False
event.response.fail_authentication = False
event.response.challenge_name = "PASSWORD_VERIFIER"
elif (
len(event.request.session) == 2
and event.request.session[1].challenge_name == "PASSWORD_VERIFIER"
and event.request.session[1].challenge_result
):
event.response.issue_tokens = False
event.response.fail_authentication = False
event.response.challenge_name = "CUSTOM_CHALLENGE"
elif (
len(event.request.session) == 3
and event.request.session[2].challenge_name == "CUSTOM_CHALLENGE"
and event.request.session[2].challenge_result
):
event.response.issue_tokens = True
event.response.fail_authentication = False
else:
event.response.issue_tokens = False
event.response.fail_authentication = True
return event.raw_event
```
=== "SPR_A response"
```json hl_lines="25-27"
{
"version": "1",
"region": "us-east-1",
"userPoolId": "us-east-1_example",
"userName": "UserName",
"callerContext": {
"awsSdkVersion": "awsSdkVersion",
"clientId": "clientId"
},
"triggerSource": "DefineAuthChallenge_Authentication",
"request": {
"userAttributes": {
"sub": "4A709A36-7D63-4785-829D-4198EF10EBDA",
"email_verified": "true",
"name": "First Last",
"email": "[email protected]"
},
"session": [
{
"challengeName": "SRP_A",
"challengeResult": true
}
]
},
"response": {
"issueTokens": false,
"failAuthentication": false,
"challengeName": "PASSWORD_VERIFIER"
}
}
```
=== "PASSWORD_VERIFIER success response"
```json hl_lines="30-32"
{
"version": "1",
"region": "us-east-1",
"userPoolId": "us-east-1_example",
"userName": "UserName",
"callerContext": {
"awsSdkVersion": "awsSdkVersion",
"clientId": "clientId"
},
"triggerSource": "DefineAuthChallenge_Authentication",
"request": {
"userAttributes": {
"sub": "4A709A36-7D63-4785-829D-4198EF10EBDA",
"email_verified": "true",
"name": "First Last",
"email": "[email protected]"
},
"session": [
{
"challengeName": "SRP_A",
"challengeResult": true
},
{
"challengeName": "PASSWORD_VERIFIER",
"challengeResult": true
}
]
},
"response": {
"issueTokens": false,
"failAuthentication": false,
"challengeName": "CUSTOM_CHALLENGE"
}
}
```
=== "CUSTOM_CHALLENGE success response"
```json hl_lines="34 35"
{
"version": "1",
"region": "us-east-1",
"userPoolId": "us-east-1_example",
"userName": "UserName",
"callerContext": {
"awsSdkVersion": "awsSdkVersion",
"clientId": "clientId"
},
"triggerSource": "DefineAuthChallenge_Authentication",
"request": {
"userAttributes": {
"sub": "4A709A36-7D63-4785-829D-4198EF10EBDA",
"email_verified": "true",
"name": "First Last",
"email": "[email protected]"
},
"session": [
{
"challengeName": "SRP_A",
"challengeResult": true
},
{
"challengeName": "PASSWORD_VERIFIER",
"challengeResult": true
},
{
"challengeName": "CUSTOM_CHALLENGE",
"challengeResult": true
}
]
},
"response": {
"issueTokens": true,
"failAuthentication": false
}
}
```
#### Create Auth Challenge Example
This example is based on the AWS Cognito docs for [Create Auth Challenge Lambda Trigger](https://docs.aws.amazon.com/cognito/latest/developerguide/user-pool-lambda-create-auth-challenge.html){target="_blank"}.
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import CreateAuthChallengeTriggerEvent
@event_source(data_class=CreateAuthChallengeTriggerEvent)
def handler(event: CreateAuthChallengeTriggerEvent, context) -> dict:
if event.request.challenge_name == "CUSTOM_CHALLENGE":
event.response.public_challenge_parameters = {"captchaUrl": "url/123.jpg"}
event.response.private_challenge_parameters = {"answer": "5"}
event.response.challenge_metadata = "CAPTCHA_CHALLENGE"
return event.raw_event
```
#### Verify Auth Challenge Response Example
This example is based on the AWS Cognito docs for [Verify Auth Challenge Response Lambda Trigger](https://docs.aws.amazon.com/cognito/latest/developerguide/user-pool-lambda-verify-auth-challenge-response.html){target="_blank"}.
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes import event_source
from aws_lambda_powertools.utilities.data_classes.cognito_user_pool_event import VerifyAuthChallengeResponseTriggerEvent
@event_source(data_class=VerifyAuthChallengeResponseTriggerEvent)
def handler(event: VerifyAuthChallengeResponseTriggerEvent, context) -> dict:
event.response.answer_correct = (
event.request.private_challenge_parameters.get("answer") == event.request.challenge_answer
)
return event.raw_event
```
### Connect Contact Flow
> New in 1.11.0
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes.connect_contact_flow_event import (
ConnectContactFlowChannel,
ConnectContactFlowEndpointType,
ConnectContactFlowEvent,
ConnectContactFlowInitiationMethod,
)
def lambda_handler(event, context):
event: ConnectContactFlowEvent = ConnectContactFlowEvent(event)
assert event.contact_data.attributes == {"Language": "en-US"}
assert event.contact_data.channel == ConnectContactFlowChannel.VOICE
assert event.contact_data.customer_endpoint.endpoint_type == ConnectContactFlowEndpointType.TELEPHONE_NUMBER
assert event.contact_data.initiation_method == ConnectContactFlowInitiationMethod.API
```
### DynamoDB Streams
The DynamoDB data class utility provides the base class for `DynamoDBStreamEvent`, as well as enums for stream view type (`StreamViewType`) and event type.
(`DynamoDBRecordEventName`).
The class automatically deserializes DynamoDB types into their equivalent Python types.
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import (
DynamoDBStreamEvent,
DynamoDBRecordEventName
)
def lambda_handler(event, context):
event: DynamoDBStreamEvent = DynamoDBStreamEvent(event)
# Multiple records can be delivered in a single event
for record in event.records:
if record.event_name == DynamoDBRecordEventName.MODIFY:
do_something_with(record.dynamodb.new_image)
do_something_with(record.dynamodb.old_image)
```
=== "multiple_records_types.py"
```python
from aws_lambda_powertools.utilities.data_classes import event_source, DynamoDBStreamEvent
from aws_lambda_powertools.utilities.typing import LambdaContext
@event_source(data_class=DynamoDBStreamEvent)
def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
for record in event.records:
# {"N": "123.45"} => Decimal("123.45")
key: str = record.dynamodb.keys["id"]
print(key)
```
### EventBridge
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes import event_source, EventBridgeEvent
@event_source(data_class=EventBridgeEvent)
def lambda_handler(event: EventBridgeEvent, context):
do_something_with(event.detail)
```
### Kafka
This example is based on the AWS docs for [Amazon MSK](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html){target="_blank"} and [self-managed Apache Kafka](https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html){target="_blank"}.
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes import event_source, KafkaEvent
@event_source(data_class=KafkaEvent)
def lambda_handler(event: KafkaEvent, context):
for record in event.records:
do_something_with(record.decoded_key, record.json_value)
```
### Kinesis streams
Kinesis events by default contain base64 encoded data. You can use the helper function to access the data either as json
or plain text, depending on the original payload.
=== "app.py"
```python
from aws_lambda_powertools.utilities.data_classes import event_source, KinesisStreamEvent
@event_source(data_class=KinesisStreamEvent)
def lambda_handler(event: KinesisStreamEvent, context):
kinesis_record = next(event.records).kinesis