Skip to content

Commit 455836c

Browse files
roger-zhanggleandrodamascenaheitorlessa
authored
feat(event_source): add Kinesis Firehose Data Transformation data class (#3029)
* support kinesis response * fix lint, address Leandro suggestions * remove deleted const * fix Literal import in 3.7 * change to use data-classes * fix mypy * fix typo, make asdict a function * address Troy/Leandro suggestions * remove 6MB comment * fix comments * address Heitor's suggestion * data class default optimization * remove slot for static check * fix doc, example * rename r->record * Addressing Heitor's feedback * Addressing Heitor's feedback * Addressing Heitor's feedback * add result warning, add asdict test, metadata test * refactor: initial refactoring * chore: branding Signed-off-by: heitorlessa <[email protected]> * refactor: use classvar and tuple for perf Signed-off-by: heitorlessa <[email protected]> * chore: fix rebase issue Signed-off-by: heitorlessa <[email protected]> * chore: fix mypy tuple exactness type Signed-off-by: heitorlessa <[email protected]> * remove Ok in example response,add failure example * chore: clean up docs example Signed-off-by: heitorlessa <[email protected]> * chore: lower cognitive overhead; add example docstring Signed-off-by: heitorlessa <[email protected]> * add drop example * docs: give info upfront, name examples * docs: improve transforming records example * docs: improve dropping records example * docs: improve exception example --------- Signed-off-by: heitorlessa <[email protected]> Co-authored-by: Leandro Damascena <[email protected]> Co-authored-by: heitorlessa <[email protected]>
1 parent f673368 commit 455836c

File tree

9 files changed

+500
-21
lines changed

9 files changed

+500
-21
lines changed
+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Internal shared functions. Do not rely on it besides internal usage."""

aws_lambda_powertools/utilities/data_classes/__init__.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,12 @@
1414
from .event_bridge_event import EventBridgeEvent
1515
from .event_source import event_source
1616
from .kafka_event import KafkaEvent
17-
from .kinesis_firehose_event import KinesisFirehoseEvent
17+
from .kinesis_firehose_event import (
18+
KinesisFirehoseDataTransformationRecord,
19+
KinesisFirehoseDataTransformationRecordMetadata,
20+
KinesisFirehoseDataTransformationResponse,
21+
KinesisFirehoseEvent,
22+
)
1823
from .kinesis_stream_event import KinesisStreamEvent
1924
from .lambda_function_url_event import LambdaFunctionUrlEvent
2025
from .s3_event import S3Event, S3EventBridgeNotificationEvent
@@ -39,6 +44,9 @@
3944
"KafkaEvent",
4045
"KinesisFirehoseEvent",
4146
"KinesisStreamEvent",
47+
"KinesisFirehoseDataTransformationResponse",
48+
"KinesisFirehoseDataTransformationRecord",
49+
"KinesisFirehoseDataTransformationRecordMetadata",
4250
"LambdaFunctionUrlEvent",
4351
"S3Event",
4452
"S3EventBridgeNotificationEvent",

aws_lambda_powertools/utilities/data_classes/kinesis_firehose_event.py

+197-1
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,179 @@
11
import base64
2-
from typing import Iterator, Optional
2+
import json
3+
import warnings
4+
from dataclasses import dataclass, field
5+
from typing import Any, Callable, ClassVar, Dict, Iterator, List, Optional, Tuple
6+
7+
from typing_extensions import Literal
38

49
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
510

611

12+
@dataclass(repr=False, order=False, frozen=True)
13+
class KinesisFirehoseDataTransformationRecordMetadata:
14+
"""
15+
Metadata in Firehose Data Transform Record.
16+
17+
Parameters
18+
----------
19+
partition_keys: Dict[str, str]
20+
A dict of partition keys/value in string format, e.g. `{"year":"2023","month":"09"}`
21+
22+
Documentation:
23+
--------------
24+
- https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
25+
"""
26+
27+
partition_keys: Dict[str, str] = field(default_factory=lambda: {})
28+
29+
def asdict(self) -> Dict:
30+
if self.partition_keys is not None:
31+
return {"partitionKeys": self.partition_keys}
32+
return {}
33+
34+
35+
@dataclass(repr=False, order=False)
36+
class KinesisFirehoseDataTransformationRecord:
37+
"""Record in Kinesis Data Firehose response object.
38+
39+
Parameters
40+
----------
41+
record_id: str
42+
uniquely identifies this record within the current batch
43+
result: Literal["Ok", "Dropped", "ProcessingFailed"]
44+
record data transformation status, whether it succeeded, should be dropped, or failed.
45+
data: str
46+
base64-encoded payload, by default empty string.
47+
48+
Use `data_from_text` or `data_from_json` methods to convert data if needed.
49+
50+
metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata]
51+
Metadata associated with this record; can contain partition keys.
52+
53+
See: https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
54+
json_serializer: Callable
55+
function to serialize `obj` to a JSON formatted `str`, by default json.dumps
56+
json_deserializer: Callable
57+
function to deserialize `str`, `bytes`, bytearray` containing a JSON document to a Python `obj`,
58+
by default json.loads
59+
60+
Documentation:
61+
--------------
62+
- https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html
63+
"""
64+
65+
_valid_result_types: ClassVar[Tuple[str, str, str]] = ("Ok", "Dropped", "ProcessingFailed")
66+
67+
record_id: str
68+
result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok"
69+
data: str = ""
70+
metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None
71+
json_serializer: Callable = json.dumps
72+
json_deserializer: Callable = json.loads
73+
_json_data: Optional[Any] = None
74+
75+
def asdict(self) -> Dict:
76+
if self.result not in self._valid_result_types:
77+
warnings.warn(
78+
stacklevel=1,
79+
message=f'The result "{self.result}" is not valid, Choose from "Ok", "Dropped", "ProcessingFailed"',
80+
)
81+
82+
record: Dict[str, Any] = {
83+
"recordId": self.record_id,
84+
"result": self.result,
85+
"data": self.data,
86+
}
87+
if self.metadata:
88+
record["metadata"] = self.metadata.asdict()
89+
return record
90+
91+
@property
92+
def data_as_bytes(self) -> bytes:
93+
"""Decoded base64-encoded data as bytes"""
94+
if not self.data:
95+
return b""
96+
return base64.b64decode(self.data)
97+
98+
@property
99+
def data_as_text(self) -> str:
100+
"""Decoded base64-encoded data as text"""
101+
if not self.data:
102+
return ""
103+
return self.data_as_bytes.decode("utf-8")
104+
105+
@property
106+
def data_as_json(self) -> Dict:
107+
"""Decoded base64-encoded data loaded to json"""
108+
if not self.data:
109+
return {}
110+
if self._json_data is None:
111+
self._json_data = self.json_deserializer(self.data_as_text)
112+
return self._json_data
113+
114+
115+
@dataclass(repr=False, order=False)
116+
class KinesisFirehoseDataTransformationResponse:
117+
"""Kinesis Data Firehose response object
118+
119+
Documentation:
120+
--------------
121+
- https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html
122+
123+
Parameters
124+
----------
125+
records : List[KinesisFirehoseResponseRecord]
126+
records of Kinesis Data Firehose response object,
127+
optional parameter at start. can be added later using `add_record` function.
128+
129+
Examples
130+
--------
131+
132+
**Transforming data records**
133+
134+
```python
135+
from aws_lambda_powertools.utilities.data_classes import (
136+
KinesisFirehoseDataTransformationRecord,
137+
KinesisFirehoseDataTransformationResponse,
138+
KinesisFirehoseEvent,
139+
)
140+
from aws_lambda_powertools.utilities.serialization import base64_from_json
141+
from aws_lambda_powertools.utilities.typing import LambdaContext
142+
143+
144+
def lambda_handler(event: dict, context: LambdaContext):
145+
firehose_event = KinesisFirehoseEvent(event)
146+
result = KinesisFirehoseDataTransformationResponse()
147+
148+
for record in firehose_event.records:
149+
payload = record.data_as_text # base64 decoded data as str
150+
151+
## generate data to return
152+
transformed_data = {"tool_used": "powertools_dataclass", "original_payload": payload}
153+
processed_record = KinesisFirehoseDataTransformationRecord(
154+
record_id=record.record_id,
155+
data=base64_from_json(transformed_data),
156+
)
157+
158+
result.add_record(processed_record)
159+
160+
# return transformed records
161+
return result.asdict()
162+
```
163+
"""
164+
165+
records: List[KinesisFirehoseDataTransformationRecord] = field(default_factory=list)
166+
167+
def add_record(self, record: KinesisFirehoseDataTransformationRecord):
168+
self.records.append(record)
169+
170+
def asdict(self) -> Dict:
171+
if not self.records:
172+
raise ValueError("Amazon Kinesis Data Firehose doesn't accept empty response")
173+
174+
return {"records": [record.asdict() for record in self.records]}
175+
176+
7177
class KinesisFirehoseRecordMetadata(DictWrapper):
8178
@property
9179
def _metadata(self) -> dict:
@@ -77,6 +247,32 @@ def data_as_json(self) -> dict:
77247
self._json_data = self._json_deserializer(self.data_as_text)
78248
return self._json_data
79249

250+
def build_data_transformation_response(
251+
self,
252+
result: Literal["Ok", "Dropped", "ProcessingFailed"] = "Ok",
253+
data: str = "",
254+
metadata: Optional[KinesisFirehoseDataTransformationRecordMetadata] = None,
255+
) -> KinesisFirehoseDataTransformationRecord:
256+
"""Create a KinesisFirehoseResponseRecord directly using the record_id and given values
257+
258+
Parameters
259+
----------
260+
result : Literal["Ok", "Dropped", "ProcessingFailed"]
261+
processing result, supported value: Ok, Dropped, ProcessingFailed
262+
data : str, optional
263+
data blob, base64-encoded, optional at init. Allows pass in base64-encoded data directly or
264+
use either function like `data_from_text`, `data_from_json` to populate data
265+
metadata: KinesisFirehoseResponseRecordMetadata, optional
266+
Metadata associated with this record; can contain partition keys
267+
- https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html
268+
"""
269+
return KinesisFirehoseDataTransformationRecord(
270+
record_id=self.record_id,
271+
result=result,
272+
data=data,
273+
metadata=metadata,
274+
)
275+
80276

81277
class KinesisFirehoseEvent(DictWrapper):
82278
"""Kinesis Data Firehose event
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
"""Standalone functions to serialize/deserialize common data structures"""
2+
import base64
3+
import json
4+
from typing import Any, Callable
5+
6+
7+
def base64_encode(data: str) -> str:
8+
"""Encode a string and returns Base64-encoded encoded value.
9+
10+
Parameters
11+
----------
12+
data: str
13+
The string to encode.
14+
15+
Returns
16+
-------
17+
str
18+
The Base64-encoded encoded value.
19+
"""
20+
return base64.b64encode(data.encode()).decode("utf-8")
21+
22+
23+
def base64_decode(data: str) -> str:
24+
"""Decodes a Base64-encoded string and returns the decoded value.
25+
26+
Parameters
27+
----------
28+
data: str
29+
The Base64-encoded string to decode.
30+
31+
Returns
32+
-------
33+
str
34+
The decoded string value.
35+
"""
36+
return base64.b64decode(data).decode("utf-8")
37+
38+
39+
def base64_from_str(data: str) -> str:
40+
"""Encode str as base64 string"""
41+
return base64.b64encode(data.encode()).decode("utf-8")
42+
43+
44+
def base64_from_json(data: Any, json_serializer: Callable[..., str] = json.dumps) -> str:
45+
"""Encode JSON serializable data as base64 string
46+
47+
Parameters
48+
----------
49+
data: Any
50+
JSON serializable (dict, list, boolean, etc.)
51+
json_serializer: Callable
52+
function to serialize `obj` to a JSON formatted `str`, by default json.dumps
53+
54+
Returns
55+
-------
56+
str:
57+
JSON string as base64 string
58+
"""
59+
return base64_from_str(data=json_serializer(data))

docs/utilities/data_classes.md

+27-6
Original file line numberDiff line numberDiff line change
@@ -975,18 +975,39 @@ or plain text, depending on the original payload.
975975

976976
### Kinesis Firehose delivery stream
977977

978-
Kinesis Firehose Data Transformation can use a Lambda Function to modify the records
979-
inline, and re-emit them back to the Delivery Stream.
978+
When using Kinesis Firehose, you can use a Lambda function to [perform data transformation](https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html){target="_blank"}. For each transformed record, you can choose to either:
980979

981-
Similar to Kinesis Data Streams, the events contain base64 encoded data. You can use the helper
982-
function to access the data either as json or plain text, depending on the original payload.
980+
* **A)** Put them back to the delivery stream (default)
981+
* **B)** Drop them so consumers don't receive them (e.g., data validation)
982+
* **C)** Indicate a record failed data transformation and should be retried
983983

984-
=== "app.py"
984+
To do that, you can use `KinesisFirehoseDataTransformationResponse` class along with helper functions to make it easier to decode and encode base64 data in the stream.
985985

986-
```python
986+
=== "Transforming streaming records"
987+
988+
```python hl_lines="2-3 12 28"
987989
--8<-- "examples/event_sources/src/kinesis_firehose_delivery_stream.py"
988990
```
989991

992+
1. **Ingesting JSON payloads?** <br><br> Use `record.data_as_json` to easily deserialize them.
993+
2. For your convenience, `base64_from_json` serializes a dict to JSON, then encode as base64 data.
994+
995+
=== "Dropping invalid records"
996+
997+
```python hl_lines="5-6 16 34"
998+
--8<-- "examples/event_sources/src/kinesis_firehose_response_drop.py"
999+
```
1000+
1001+
1. This exception would be generated from `record.data_as_json` if invalid payload.
1002+
1003+
=== "Indicating a processing failure"
1004+
1005+
```python hl_lines="2-3 33"
1006+
--8<-- "examples/event_sources/src/kinesis_firehose_response_exception.py"
1007+
```
1008+
1009+
1. This record will now be sent to your [S3 bucket in the `processing-failed` folder](https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html#data-transformation-failure-handling){target="_blank"}.
1010+
9901011
### Lambda Function URL
9911012

9921013
=== "app.py"
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,28 @@
1-
import base64
2-
import json
3-
41
from aws_lambda_powertools.utilities.data_classes import (
2+
KinesisFirehoseDataTransformationResponse,
53
KinesisFirehoseEvent,
64
event_source,
75
)
6+
from aws_lambda_powertools.utilities.serialization import base64_from_json
87
from aws_lambda_powertools.utilities.typing import LambdaContext
98

109

1110
@event_source(data_class=KinesisFirehoseEvent)
1211
def lambda_handler(event: KinesisFirehoseEvent, context: LambdaContext):
13-
result = []
12+
result = KinesisFirehoseDataTransformationResponse()
1413

1514
for record in event.records:
16-
# if data was delivered as json; caches loaded value
17-
data = record.data_as_json
15+
# get original data using data_as_text property
16+
data = record.data_as_text # (1)!
17+
18+
## generate data to return
19+
transformed_data = {"new_data": "transformed data using Powertools", "original_payload": data}
1820

19-
processed_record = {
20-
"recordId": record.record_id,
21-
"data": base64.b64encode(json.dumps(data).encode("utf-8")),
22-
"result": "Ok",
23-
}
21+
processed_record = record.build_data_transformation_response(
22+
data=base64_from_json(transformed_data), # (2)!
23+
)
2424

25-
result.append(processed_record)
25+
result.add_record(processed_record)
2626

2727
# return transformed records
28-
return {"records": result}
28+
return result.asdict()

0 commit comments

Comments
 (0)