Skip to content

Commit 416ab1b

Browse files
ryandeivertrubenfonsecaleandrodamascena
authored
feat(data_classes): add KinesisFirehoseEvent (#1540)
Co-authored-by: Rúben Fonseca <[email protected]> Co-authored-by: Leandro Damascena <[email protected]>
1 parent ec709c3 commit 416ab1b

File tree

6 files changed

+229
-7
lines changed

6 files changed

+229
-7
lines changed

Diff for: aws_lambda_powertools/utilities/data_classes/__init__.py

+2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from .event_bridge_event import EventBridgeEvent
1414
from .event_source import event_source
1515
from .kafka_event import KafkaEvent
16+
from .kinesis_firehose_event import KinesisFirehoseEvent
1617
from .kinesis_stream_event import KinesisStreamEvent
1718
from .lambda_function_url_event import LambdaFunctionUrlEvent
1819
from .s3_event import S3Event
@@ -32,6 +33,7 @@
3233
"DynamoDBStreamEvent",
3334
"EventBridgeEvent",
3435
"KafkaEvent",
36+
"KinesisFirehoseEvent",
3537
"KinesisStreamEvent",
3638
"LambdaFunctionUrlEvent",
3739
"S3Event",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
import base64
2+
import json
3+
from typing import Iterator, Optional
4+
5+
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
6+
7+
8+
class KinesisFirehoseRecordMetadata(DictWrapper):
9+
@property
10+
def _metadata(self) -> dict:
11+
"""Optional: metadata associated with this record; present only when Kinesis Stream is source"""
12+
return self["kinesisRecordMetadata"] # could raise KeyError
13+
14+
@property
15+
def shard_id(self) -> str:
16+
"""Kinesis stream shard ID; present only when Kinesis Stream is source"""
17+
return self._metadata["shardId"]
18+
19+
@property
20+
def partition_key(self) -> str:
21+
"""Kinesis stream partition key; present only when Kinesis Stream is source"""
22+
return self._metadata["partitionKey"]
23+
24+
@property
25+
def approximate_arrival_timestamp(self) -> int:
26+
"""Kinesis stream approximate arrival ISO timestamp; present only when Kinesis Stream is source"""
27+
return self._metadata["approximateArrivalTimestamp"]
28+
29+
@property
30+
def sequence_number(self) -> str:
31+
"""Kinesis stream sequence number; present only when Kinesis Stream is source"""
32+
return self._metadata["sequenceNumber"]
33+
34+
@property
35+
def subsequence_number(self) -> str:
36+
"""Kinesis stream sub-sequence number; present only when Kinesis Stream is source
37+
38+
Note: this will only be present for Kinesis streams using record aggregation
39+
"""
40+
return self._metadata["subsequenceNumber"]
41+
42+
43+
class KinesisFirehoseRecord(DictWrapper):
44+
@property
45+
def approximate_arrival_timestamp(self) -> int:
46+
"""The approximate time that the record was inserted into the delivery stream"""
47+
return self["approximateArrivalTimestamp"]
48+
49+
@property
50+
def record_id(self) -> str:
51+
"""Record ID; uniquely identifies this record within the current batch"""
52+
return self["recordId"]
53+
54+
@property
55+
def data(self) -> str:
56+
"""The data blob, base64-encoded"""
57+
return self["data"]
58+
59+
@property
60+
def metadata(self) -> Optional[KinesisFirehoseRecordMetadata]:
61+
"""Optional: metadata associated with this record; present only when Kinesis Stream is source"""
62+
return KinesisFirehoseRecordMetadata(self._data) if self.get("kinesisRecordMetadata") else None
63+
64+
@property
65+
def data_as_bytes(self) -> bytes:
66+
"""Decoded base64-encoded data as bytes"""
67+
return base64.b64decode(self.data)
68+
69+
@property
70+
def data_as_text(self) -> str:
71+
"""Decoded base64-encoded data as text"""
72+
return self.data_as_bytes.decode("utf-8")
73+
74+
@property
75+
def data_as_json(self) -> dict:
76+
"""Decoded base64-encoded data loaded to json"""
77+
if self._json_data is None:
78+
self._json_data = json.loads(self.data_as_text)
79+
return self._json_data
80+
81+
82+
class KinesisFirehoseEvent(DictWrapper):
83+
"""Kinesis Data Firehose event
84+
85+
Documentation:
86+
--------------
87+
- https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html
88+
"""
89+
90+
@property
91+
def invocation_id(self) -> str:
92+
"""Unique ID for for Lambda invocation"""
93+
return self["invocationId"]
94+
95+
@property
96+
def delivery_stream_arn(self) -> str:
97+
"""ARN of the Firehose Data Firehose Delivery Stream"""
98+
return self["deliveryStreamArn"]
99+
100+
@property
101+
def source_kinesis_stream_arn(self) -> Optional[str]:
102+
"""ARN of the Kinesis Stream; present only when Kinesis Stream is source"""
103+
return self.get("sourceKinesisStreamArn")
104+
105+
@property
106+
def region(self) -> str:
107+
"""AWS region where the event originated eg: us-east-1"""
108+
return self["region"]
109+
110+
@property
111+
def records(self) -> Iterator[KinesisFirehoseRecord]:
112+
for record in self["records"]:
113+
yield KinesisFirehoseRecord(record)

Diff for: docs/utilities/data_classes.md

+15
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ Event Source | Data_class
7777
[EventBridge](#eventbridge) | `EventBridgeEvent`
7878
[Kafka](#kafka) | `KafkaEvent`
7979
[Kinesis Data Stream](#kinesis-streams) | `KinesisStreamEvent`
80+
[Kinesis Firehose Delivery Stream](#kinesis-firehose-delivery-stream) | `KinesisFirehoseEvent`
8081
[Lambda Function URL](#lambda-function-url) | `LambdaFunctionUrlEvent`
8182
[Rabbit MQ](#rabbit-mq) | `RabbitMQEvent`
8283
[S3](#s3) | `S3Event`
@@ -892,6 +893,20 @@ or plain text, depending on the original payload.
892893
do_something_with(data)
893894
```
894895

896+
### Kinesis Firehose delivery stream
897+
898+
Kinesis Firehose Data Transformation can use a Lambda Function to modify the records
899+
inline, and re-emit them back to the Delivery Stream.
900+
901+
Similar to Kinesis Data Streams, the events contain base64 encoded data. You can use the helper
902+
function to access the data either as json or plain text, depending on the original payload.
903+
904+
=== "app.py"
905+
906+
```python
907+
--8<-- "examples/event_sources/src/kinesis_firehose_delivery_stream.py"
908+
```
909+
895910
### Lambda Function URL
896911

897912
=== "app.py"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
import base64
2+
import json
3+
4+
from aws_lambda_powertools.utilities.data_classes import KinesisFirehoseEvent, event_source
5+
from aws_lambda_powertools.utilities.typing import LambdaContext
6+
7+
8+
@event_source(data_class=KinesisFirehoseEvent)
9+
def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext):
10+
result = []
11+
12+
for record in event.records:
13+
# if data was delivered as json; caches loaded value
14+
data = record.data_as_json
15+
16+
processed_record = {
17+
"recordId": record.record_id,
18+
"data": base64.b64encode(json.dumps(data).encode("utf-8")),
19+
"result": "Ok",
20+
}
21+
22+
result.append(processed_record)
23+
24+
# return transformed records
25+
return {"records": result}

Diff for: tests/events/kinesisFirehosePutEvent.json

+7-7
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,16 @@
22
"invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a",
33
"deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name",
44
"region": "us-east-2",
5-
"records":[
5+
"records": [
66
{
7-
"recordId":"record1",
8-
"approximateArrivalTimestamp":1664029185290,
9-
"data":"SGVsbG8gV29ybGQ="
7+
"recordId": "record1",
8+
"approximateArrivalTimestamp": 1664029185290,
9+
"data": "SGVsbG8gV29ybGQ="
1010
},
1111
{
12-
"recordId":"record2",
13-
"approximateArrivalTimestamp":1664029186945,
14-
"data":"eyJIZWxsbyI6ICJXb3JsZCJ9"
12+
"recordId": "record2",
13+
"approximateArrivalTimestamp": 1664029186945,
14+
"data": "eyJIZWxsbyI6ICJXb3JsZCJ9"
1515
}
1616
]
1717
}

Diff for: tests/functional/test_data_classes.py

+67
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
CodePipelineJobEvent,
1919
EventBridgeEvent,
2020
KafkaEvent,
21+
KinesisFirehoseEvent,
2122
KinesisStreamEvent,
2223
S3Event,
2324
SESEvent,
@@ -1239,6 +1240,72 @@ def test_kafka_self_managed_event():
12391240
assert record.get_header_value("HeaderKey", case_sensitive=False) == b"headerValue"
12401241

12411242

1243+
def test_kinesis_firehose_kinesis_event():
1244+
event = KinesisFirehoseEvent(load_event("kinesisFirehoseKinesisEvent.json"))
1245+
1246+
assert event.region == "us-east-2"
1247+
assert event.invocation_id == "2b4d1ad9-2f48-94bd-a088-767c317e994a"
1248+
assert event.delivery_stream_arn == "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name"
1249+
assert event.source_kinesis_stream_arn == "arn:aws:kinesis:us-east-1:123456789012:stream/kinesis-source"
1250+
1251+
records = list(event.records)
1252+
assert len(records) == 2
1253+
record_01, record_02 = records[:]
1254+
1255+
assert record_01.approximate_arrival_timestamp == 1664028820148
1256+
assert record_01.record_id == "record1"
1257+
assert record_01.data == "SGVsbG8gV29ybGQ="
1258+
assert record_01.data_as_bytes == b"Hello World"
1259+
assert record_01.data_as_text == "Hello World"
1260+
1261+
assert record_01.metadata.shard_id == "shardId-000000000000"
1262+
assert record_01.metadata.partition_key == "4d1ad2b9-24f8-4b9d-a088-76e9947c317a"
1263+
assert record_01.metadata.approximate_arrival_timestamp == 1664028820148
1264+
assert record_01.metadata.sequence_number == "49546986683135544286507457936321625675700192471156785154"
1265+
assert record_01.metadata.subsequence_number == ""
1266+
1267+
assert record_02.approximate_arrival_timestamp == 1664028793294
1268+
assert record_02.record_id == "record2"
1269+
assert record_02.data == "eyJIZWxsbyI6ICJXb3JsZCJ9"
1270+
assert record_02.data_as_bytes == b'{"Hello": "World"}'
1271+
assert record_02.data_as_text == '{"Hello": "World"}'
1272+
assert record_02.data_as_json == {"Hello": "World"}
1273+
1274+
assert record_02.metadata.shard_id == "shardId-000000000001"
1275+
assert record_02.metadata.partition_key == "4d1ad2b9-24f8-4b9d-a088-76e9947c318a"
1276+
assert record_02.metadata.approximate_arrival_timestamp == 1664028793294
1277+
assert record_02.metadata.sequence_number == "49546986683135544286507457936321625675700192471156785155"
1278+
assert record_02.metadata.subsequence_number == ""
1279+
1280+
1281+
def test_kinesis_firehose_put_event():
1282+
event = KinesisFirehoseEvent(load_event("kinesisFirehosePutEvent.json"))
1283+
1284+
assert event.region == "us-east-2"
1285+
assert event.invocation_id == "2b4d1ad9-2f48-94bd-a088-767c317e994a"
1286+
assert event.delivery_stream_arn == "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name"
1287+
assert event.source_kinesis_stream_arn is None
1288+
1289+
records = list(event.records)
1290+
assert len(records) == 2
1291+
record_01, record_02 = records[:]
1292+
1293+
assert record_01.approximate_arrival_timestamp == 1664029185290
1294+
assert record_01.record_id == "record1"
1295+
assert record_01.data == "SGVsbG8gV29ybGQ="
1296+
assert record_01.data_as_bytes == b"Hello World"
1297+
assert record_01.data_as_text == "Hello World"
1298+
assert record_01.metadata is None
1299+
1300+
assert record_02.approximate_arrival_timestamp == 1664029186945
1301+
assert record_02.record_id == "record2"
1302+
assert record_02.data == "eyJIZWxsbyI6ICJXb3JsZCJ9"
1303+
assert record_02.data_as_bytes == b'{"Hello": "World"}'
1304+
assert record_02.data_as_text == '{"Hello": "World"}'
1305+
assert record_02.data_as_json == {"Hello": "World"}
1306+
assert record_02.metadata is None
1307+
1308+
12421309
def test_kinesis_stream_event():
12431310
event = KinesisStreamEvent(load_event("kinesisStreamEvent.json"))
12441311

0 commit comments

Comments
 (0)