Skip to content

Commit 81afadb

Browse files
feat(parser): add KinesisFirehoseModel (aws-powertools#1556)
Co-authored-by: Leandro Damascena <[email protected]>
1 parent a5755d4 commit 81afadb

File tree

12 files changed

+262
-12
lines changed

12 files changed

+262
-12
lines changed

aws_lambda_powertools/shared/functions.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ def resolve_env_var_choice(
7171

7272
def base64_decode(value: str) -> bytes:
7373
try:
74-
logger.debug("Decoding base64 Kafka record item before parsing")
74+
logger.debug("Decoding base64 record item before parsing")
7575
return base64.b64decode(value)
7676
except (BinAsciiError, TypeError):
7777
raise ValueError("base64 decode failed")

aws_lambda_powertools/utilities/parser/envelopes/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from .event_bridge import EventBridgeEnvelope
77
from .kafka import KafkaEnvelope
88
from .kinesis import KinesisDataStreamEnvelope
9+
from .kinesis_firehose import KinesisFirehoseEnvelope
910
from .lambda_function_url import LambdaFunctionUrlEnvelope
1011
from .sns import SnsEnvelope, SnsSqsEnvelope
1112
from .sqs import SqsEnvelope
@@ -17,6 +18,7 @@
1718
"DynamoDBStreamEnvelope",
1819
"EventBridgeEnvelope",
1920
"KinesisDataStreamEnvelope",
21+
"KinesisFirehoseEnvelope",
2022
"LambdaFunctionUrlEnvelope",
2123
"SnsEnvelope",
2224
"SnsSqsEnvelope",

aws_lambda_powertools/utilities/parser/envelopes/kinesis.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class KinesisDataStreamEnvelope(BaseEnvelope):
1616
Regardless of its type it'll be parsed into a BaseModel object.
1717
1818
Note: Records will be parsed the same way so if model is str,
19-
all items in the list will be parsed as str and npt as JSON (and vice versa)
19+
all items in the list will be parsed as str and not as JSON (and vice versa)
2020
"""
2121

2222
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import logging
2+
from typing import Any, Dict, List, Optional, Type, Union, cast
3+
4+
from ..models import KinesisFirehoseModel
5+
from ..types import Model
6+
from .base import BaseEnvelope
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
class KinesisFirehoseEnvelope(BaseEnvelope):
12+
"""Kinesis Firehose Envelope to extract array of Records
13+
14+
The record's data parameter is a base64 encoded string which is parsed into a bytes array,
15+
though it can also be a JSON encoded string.
16+
Regardless of its type it'll be parsed into a BaseModel object.
17+
18+
Note: Records will be parsed the same way so if model is str,
19+
all items in the list will be parsed as str and not as JSON (and vice versa)
20+
21+
https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html
22+
"""
23+
24+
def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
25+
"""Parses records found with model provided
26+
27+
Parameters
28+
----------
29+
data : Dict
30+
Lambda event to be parsed
31+
model : Type[Model]
32+
Data model provided to parse after extracting data using envelope
33+
34+
Returns
35+
-------
36+
List
37+
List of records parsed with model provided
38+
"""
39+
logger.debug(f"Parsing incoming data with Kinesis Firehose model {KinesisFirehoseModel}")
40+
parsed_envelope: KinesisFirehoseModel = KinesisFirehoseModel.parse_obj(data)
41+
logger.debug(f"Parsing Kinesis Firehose records in `body` with {model}")
42+
models = []
43+
for record in parsed_envelope.records:
44+
# We allow either AWS expected contract (bytes) or a custom Model, see #943
45+
data = cast(bytes, record.data)
46+
models.append(self._parse(data=data.decode("utf-8"), model=model))
47+
return models

aws_lambda_powertools/utilities/parser/models/__init__.py

+8
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,11 @@
3737
KinesisDataStreamRecord,
3838
KinesisDataStreamRecordPayload,
3939
)
40+
from .kinesis_firehose import (
41+
KinesisFirehoseModel,
42+
KinesisFirehoseRecord,
43+
KinesisFirehoseRecordMetadata,
44+
)
4045
from .lambda_function_url import LambdaFunctionUrlModel
4146
from .s3 import S3Model, S3RecordModel
4247
from .s3_object_event import (
@@ -86,6 +91,9 @@
8691
"KinesisDataStreamModel",
8792
"KinesisDataStreamRecord",
8893
"KinesisDataStreamRecordPayload",
94+
"KinesisFirehoseModel",
95+
"KinesisFirehoseRecord",
96+
"KinesisFirehoseRecordMetadata",
8997
"LambdaFunctionUrlModel",
9098
"S3Model",
9199
"S3RecordModel",

aws_lambda_powertools/utilities/parser/models/kinesis.py

+2-10
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
1-
import base64
2-
import logging
3-
from binascii import Error as BinAsciiError
41
from typing import List, Type, Union
52

63
from pydantic import BaseModel, validator
74

5+
from aws_lambda_powertools.shared.functions import base64_decode
86
from aws_lambda_powertools.utilities.parser.types import Literal
97

10-
logger = logging.getLogger(__name__)
11-
128

139
class KinesisDataStreamRecordPayload(BaseModel):
1410
kinesisSchemaVersion: str
@@ -19,11 +15,7 @@ class KinesisDataStreamRecordPayload(BaseModel):
1915

2016
@validator("data", pre=True, allow_reuse=True)
2117
def data_base64_decode(cls, value):
22-
try:
23-
logger.debug("Decoding base64 Kinesis data record before parsing")
24-
return base64.b64decode(value)
25-
except (BinAsciiError, TypeError):
26-
raise ValueError("base64 decode failed")
18+
return base64_decode(value)
2719

2820

2921
class KinesisDataStreamRecord(BaseModel):
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from typing import List, Optional, Type, Union
2+
3+
from pydantic import BaseModel, PositiveInt, validator
4+
5+
from aws_lambda_powertools.shared.functions import base64_decode
6+
7+
8+
class KinesisFirehoseRecordMetadata(BaseModel):
9+
shardId: str
10+
partitionKey: str
11+
approximateArrivalTimestamp: PositiveInt
12+
sequenceNumber: str
13+
subsequenceNumber: str
14+
15+
16+
class KinesisFirehoseRecord(BaseModel):
17+
data: Union[bytes, Type[BaseModel]] # base64 encoded str is parsed into bytes
18+
recordId: str
19+
approximateArrivalTimestamp: PositiveInt
20+
kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata]
21+
22+
@validator("data", pre=True, allow_reuse=True)
23+
def data_base64_decode(cls, value):
24+
return base64_decode(value)
25+
26+
27+
class KinesisFirehoseModel(BaseModel):
28+
invocationId: str
29+
deliveryStreamArn: str
30+
region: str
31+
sourceKinesisStreamArn: Optional[str]
32+
records: List[KinesisFirehoseRecord]

docs/utilities/parser.md

+2
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ Parser comes with the following built-in models:
163163
| **S3Model** | Lambda Event Source payload for Amazon S3 |
164164
| **S3ObjectLambdaEvent** | Lambda Event Source payload for Amazon S3 Object Lambda |
165165
| **KinesisDataStreamModel** | Lambda Event Source payload for Amazon Kinesis Data Streams |
166+
| **KinesisFirehoseModel** | Lambda Event Source payload for Amazon Kinesis Firehose |
166167
| **SesModel** | Lambda Event Source payload for Amazon Simple Email Service |
167168
| **SnsModel** | Lambda Event Source payload for Amazon Simple Notification Service |
168169
| **APIGatewayProxyEventModel** | Lambda Event Source payload for Amazon API Gateway |
@@ -319,6 +320,7 @@ Parser comes with the following built-in envelopes, where `Model` in the return
319320
| **SqsEnvelope** | 1. Parses data using `SqsModel`. <br/> 2. Parses records in `body` key using your model and return them in a list. | `List[Model]` |
320321
| **CloudWatchLogsEnvelope** | 1. Parses data using `CloudwatchLogsModel` which will base64 decode and decompress it. <br/> 2. Parses records in `message` key using your model and return them in a list. | `List[Model]` |
321322
| **KinesisDataStreamEnvelope** | 1. Parses data using `KinesisDataStreamModel` which will base64 decode it. <br/> 2. Parses records in in `Records` key using your model and returns them in a list. | `List[Model]` |
323+
| **KinesisFirehoseEnvelope** | 1. Parses data using `KinesisFirehoseModel` which will base64 decode it. <br/> 2. Parses records in in `Records` key using your model and returns them in a list. | `List[Model]` |
322324
| **SnsEnvelope** | 1. Parses data using `SnsModel`. <br/> 2. Parses records in `body` key using your model and return them in a list. | `List[Model]` |
323325
| **SnsSqsEnvelope** | 1. Parses data using `SqsModel`. <br/> 2. Parses SNS records in `body` key using `SnsNotificationModel`. <br/> 3. Parses data in `Message` key using your model and return them in a list. | `List[Model]` |
324326
| **ApiGatewayEnvelope** | 1. Parses data using `APIGatewayProxyEventModel`. <br/> 2. Parses `body` key using your model and returns it. | `Model` |
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
{
2+
"invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a",
3+
"sourceKinesisStreamArn":"arn:aws:kinesis:us-east-1:123456789012:stream/kinesis-source",
4+
"deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name",
5+
"region": "us-east-2",
6+
"records": [
7+
{
8+
"data": "SGVsbG8gV29ybGQ=",
9+
"recordId": "record1",
10+
"approximateArrivalTimestamp": 1664028820148,
11+
"kinesisRecordMetadata": {
12+
"shardId": "shardId-000000000000",
13+
"partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c317a",
14+
"approximateArrivalTimestamp": 1664028820148,
15+
"sequenceNumber": "49546986683135544286507457936321625675700192471156785154",
16+
"subsequenceNumber": ""
17+
}
18+
},
19+
{
20+
"data": "eyJIZWxsbyI6ICJXb3JsZCJ9",
21+
"recordId": "record2",
22+
"approximateArrivalTimestamp": 1664028793294,
23+
"kinesisRecordMetadata": {
24+
"shardId": "shardId-000000000001",
25+
"partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c318a",
26+
"approximateArrivalTimestamp": 1664028793294,
27+
"sequenceNumber": "49546986683135544286507457936321625675700192471156785155",
28+
"subsequenceNumber": ""
29+
}
30+
}
31+
]
32+
}
+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{
2+
"invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a",
3+
"deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name",
4+
"region": "us-east-2",
5+
"records":[
6+
{
7+
"recordId":"record1",
8+
"approximateArrivalTimestamp":1664029185290,
9+
"data":"SGVsbG8gV29ybGQ="
10+
},
11+
{
12+
"recordId":"record2",
13+
"approximateArrivalTimestamp":1664029186945,
14+
"data":"eyJIZWxsbyI6ICJXb3JsZCJ9"
15+
}
16+
]
17+
}

tests/functional/parser/schemas.py

+4
Original file line numberDiff line numberDiff line change
@@ -95,3 +95,7 @@ class MyALambdaFuncUrlBusiness(BaseModel):
9595

9696
class MyLambdaKafkaBusiness(BaseModel):
9797
key: str
98+
99+
100+
class MyKinesisFirehoseBusiness(BaseModel):
101+
Hello: str
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
from typing import List
2+
3+
import pytest
4+
5+
from aws_lambda_powertools.utilities.parser import (
6+
ValidationError,
7+
envelopes,
8+
event_parser,
9+
)
10+
from aws_lambda_powertools.utilities.parser.models import (
11+
KinesisFirehoseModel,
12+
KinesisFirehoseRecord,
13+
KinesisFirehoseRecordMetadata,
14+
)
15+
from aws_lambda_powertools.utilities.typing import LambdaContext
16+
from tests.functional.parser.schemas import MyKinesisFirehoseBusiness
17+
from tests.functional.utils import load_event
18+
19+
20+
@event_parser(model=MyKinesisFirehoseBusiness, envelope=envelopes.KinesisFirehoseEnvelope)
21+
def handle_firehose(event: List[MyKinesisFirehoseBusiness], _: LambdaContext):
22+
assert len(event) == 1
23+
assert event[0].Hello == "World"
24+
25+
26+
@event_parser(model=KinesisFirehoseModel)
27+
def handle_firehose_no_envelope_kinesis(event: KinesisFirehoseModel, _: LambdaContext):
28+
assert event.region == "us-east-2"
29+
assert event.invocationId == "2b4d1ad9-2f48-94bd-a088-767c317e994a"
30+
assert event.deliveryStreamArn == "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name"
31+
assert event.sourceKinesisStreamArn == "arn:aws:kinesis:us-east-1:123456789012:stream/kinesis-source"
32+
33+
records = list(event.records)
34+
assert len(records) == 2
35+
record_01: KinesisFirehoseRecord = records[0]
36+
assert record_01.approximateArrivalTimestamp == 1664028820148
37+
assert record_01.recordId == "record1"
38+
assert record_01.data == b"Hello World"
39+
40+
metadata_01: KinesisFirehoseRecordMetadata = record_01.kinesisRecordMetadata
41+
assert metadata_01.partitionKey == "4d1ad2b9-24f8-4b9d-a088-76e9947c317a"
42+
assert metadata_01.subsequenceNumber == ""
43+
assert metadata_01.shardId == "shardId-000000000000"
44+
assert metadata_01.approximateArrivalTimestamp == 1664028820148
45+
assert metadata_01.sequenceNumber == "49546986683135544286507457936321625675700192471156785154"
46+
47+
record_02: KinesisFirehoseRecord = records[1]
48+
assert record_02.approximateArrivalTimestamp == 1664028793294
49+
assert record_02.recordId == "record2"
50+
assert record_02.data == b'{"Hello": "World"}'
51+
52+
metadata_02: KinesisFirehoseRecordMetadata = record_02.kinesisRecordMetadata
53+
assert metadata_02.partitionKey == "4d1ad2b9-24f8-4b9d-a088-76e9947c318a"
54+
assert metadata_02.subsequenceNumber == ""
55+
assert metadata_02.shardId == "shardId-000000000001"
56+
assert metadata_02.approximateArrivalTimestamp == 1664028793294
57+
assert metadata_02.sequenceNumber == "49546986683135544286507457936321625675700192471156785155"
58+
59+
60+
@event_parser(model=KinesisFirehoseModel)
61+
def handle_firehose_no_envelope_put(event: KinesisFirehoseModel, _: LambdaContext):
62+
assert event.region == "us-east-2"
63+
assert event.invocationId == "2b4d1ad9-2f48-94bd-a088-767c317e994a"
64+
assert event.deliveryStreamArn == "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name"
65+
66+
records = list(event.records)
67+
assert len(records) == 2
68+
69+
record_01: KinesisFirehoseRecord = records[0]
70+
assert record_01.approximateArrivalTimestamp == 1664029185290
71+
assert record_01.recordId == "record1"
72+
assert record_01.data == b"Hello World"
73+
74+
record_02: KinesisFirehoseRecord = records[1]
75+
assert record_02.approximateArrivalTimestamp == 1664029186945
76+
assert record_02.recordId == "record2"
77+
assert record_02.data == b'{"Hello": "World"}'
78+
79+
80+
def test_firehose_trigger_event():
81+
event_dict = load_event("kinesisFirehoseKinesisEvent.json")
82+
event_dict["records"].pop(0) # remove first item since the payload is bytes and we want to test payload json class
83+
handle_firehose(event_dict, LambdaContext())
84+
85+
86+
def test_firehose_trigger_event_kinesis_no_envelope():
87+
event_dict = load_event("kinesisFirehoseKinesisEvent.json")
88+
handle_firehose_no_envelope_kinesis(event_dict, LambdaContext())
89+
90+
91+
def test_firehose_trigger_event_put_no_envelope():
92+
event_dict = load_event("kinesisFirehosePutEvent.json")
93+
handle_firehose_no_envelope_put(event_dict, LambdaContext())
94+
95+
96+
def test_kinesis_trigger_bad_base64_event():
97+
event_dict = load_event("kinesisFirehoseKinesisEvent.json")
98+
event_dict["records"][0]["data"] = {"bad base64"}
99+
with pytest.raises(ValidationError):
100+
handle_firehose_no_envelope_kinesis(event_dict, LambdaContext())
101+
102+
103+
def test_kinesis_trigger_bad_timestamp_event():
104+
event_dict = load_event("kinesisFirehoseKinesisEvent.json")
105+
event_dict["records"][0]["approximateArrivalTimestamp"] = -1
106+
with pytest.raises(ValidationError):
107+
handle_firehose_no_envelope_kinesis(event_dict, LambdaContext())
108+
109+
110+
def test_kinesis_trigger_bad_metadata_timestamp_event():
111+
event_dict = load_event("kinesisFirehoseKinesisEvent.json")
112+
event_dict["records"][0]["kinesisRecordMetadata"]["approximateArrivalTimestamp"] = "-1"
113+
with pytest.raises(ValidationError):
114+
handle_firehose_no_envelope_kinesis(event_dict, LambdaContext())

0 commit comments

Comments
 (0)