Skip to content

feat(parser): add KinesisFirehoseModel #1556

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Oct 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aws_lambda_powertools/shared/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def resolve_env_var_choice(

def base64_decode(value: str) -> bytes:
try:
logger.debug("Decoding base64 Kafka record item before parsing")
logger.debug("Decoding base64 record item before parsing")
return base64.b64decode(value)
except (BinAsciiError, TypeError):
raise ValueError("base64 decode failed")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .event_bridge import EventBridgeEnvelope
from .kafka import KafkaEnvelope
from .kinesis import KinesisDataStreamEnvelope
from .kinesis_firehose import KinesisFirehoseEnvelope
from .lambda_function_url import LambdaFunctionUrlEnvelope
from .sns import SnsEnvelope, SnsSqsEnvelope
from .sqs import SqsEnvelope
Expand All @@ -17,6 +18,7 @@
"DynamoDBStreamEnvelope",
"EventBridgeEnvelope",
"KinesisDataStreamEnvelope",
"KinesisFirehoseEnvelope",
"LambdaFunctionUrlEnvelope",
"SnsEnvelope",
"SnsSqsEnvelope",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class KinesisDataStreamEnvelope(BaseEnvelope):
Regardless of its type it'll be parsed into a BaseModel object.

Note: Records will be parsed the same way so if model is str,
all items in the list will be parsed as str and npt as JSON (and vice versa)
all items in the list will be parsed as str and not as JSON (and vice versa)
"""

def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import logging
from typing import Any, Dict, List, Optional, Type, Union, cast

from ..models import KinesisFirehoseModel
from ..types import Model
from .base import BaseEnvelope

logger = logging.getLogger(__name__)


class KinesisFirehoseEnvelope(BaseEnvelope):
"""Kinesis Firehose Envelope to extract array of Records

The record's data parameter is a base64 encoded string which is parsed into a bytes array,
though it can also be a JSON encoded string.
Regardless of its type it'll be parsed into a BaseModel object.

Note: Records will be parsed the same way so if model is str,
all items in the list will be parsed as str and not as JSON (and vice versa)

https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html
"""

def parse(self, data: Optional[Union[Dict[str, Any], Any]], model: Type[Model]) -> List[Optional[Model]]:
"""Parses records found with model provided

Parameters
----------
data : Dict
Lambda event to be parsed
model : Type[Model]
Data model provided to parse after extracting data using envelope

Returns
-------
List
List of records parsed with model provided
"""
logger.debug(f"Parsing incoming data with Kinesis Firehose model {KinesisFirehoseModel}")
parsed_envelope: KinesisFirehoseModel = KinesisFirehoseModel.parse_obj(data)
logger.debug(f"Parsing Kinesis Firehose records in `body` with {model}")
models = []
for record in parsed_envelope.records:
# We allow either AWS expected contract (bytes) or a custom Model, see #943
data = cast(bytes, record.data)
models.append(self._parse(data=data.decode("utf-8"), model=model))
return models
8 changes: 8 additions & 0 deletions aws_lambda_powertools/utilities/parser/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
KinesisDataStreamRecord,
KinesisDataStreamRecordPayload,
)
from .kinesis_firehose import (
KinesisFirehoseModel,
KinesisFirehoseRecord,
KinesisFirehoseRecordMetadata,
)
from .lambda_function_url import LambdaFunctionUrlModel
from .s3 import S3Model, S3RecordModel
from .s3_object_event import (
Expand Down Expand Up @@ -86,6 +91,9 @@
"KinesisDataStreamModel",
"KinesisDataStreamRecord",
"KinesisDataStreamRecordPayload",
"KinesisFirehoseModel",
"KinesisFirehoseRecord",
"KinesisFirehoseRecordMetadata",
"LambdaFunctionUrlModel",
"S3Model",
"S3RecordModel",
Expand Down
12 changes: 2 additions & 10 deletions aws_lambda_powertools/utilities/parser/models/kinesis.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
import base64
import logging
from binascii import Error as BinAsciiError
from typing import List, Type, Union

from pydantic import BaseModel, validator

from aws_lambda_powertools.shared.functions import base64_decode
from aws_lambda_powertools.utilities.parser.types import Literal

logger = logging.getLogger(__name__)


class KinesisDataStreamRecordPayload(BaseModel):
kinesisSchemaVersion: str
Expand All @@ -19,11 +15,7 @@ class KinesisDataStreamRecordPayload(BaseModel):

@validator("data", pre=True, allow_reuse=True)
def data_base64_decode(cls, value):
try:
logger.debug("Decoding base64 Kinesis data record before parsing")
return base64.b64decode(value)
except (BinAsciiError, TypeError):
raise ValueError("base64 decode failed")
return base64_decode(value)


class KinesisDataStreamRecord(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from typing import List, Optional, Type, Union

from pydantic import BaseModel, PositiveInt, validator

from aws_lambda_powertools.shared.functions import base64_decode


class KinesisFirehoseRecordMetadata(BaseModel):
shardId: str
partitionKey: str
approximateArrivalTimestamp: PositiveInt
sequenceNumber: str
subsequenceNumber: str


class KinesisFirehoseRecord(BaseModel):
data: Union[bytes, Type[BaseModel]] # base64 encoded str is parsed into bytes
recordId: str
approximateArrivalTimestamp: PositiveInt
kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata]

@validator("data", pre=True, allow_reuse=True)
def data_base64_decode(cls, value):
return base64_decode(value)


class KinesisFirehoseModel(BaseModel):
invocationId: str
deliveryStreamArn: str
region: str
sourceKinesisStreamArn: Optional[str]
records: List[KinesisFirehoseRecord]
2 changes: 2 additions & 0 deletions docs/utilities/parser.md
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ Parser comes with the following built-in models:
| **S3Model** | Lambda Event Source payload for Amazon S3 |
| **S3ObjectLambdaEvent** | Lambda Event Source payload for Amazon S3 Object Lambda |
| **KinesisDataStreamModel** | Lambda Event Source payload for Amazon Kinesis Data Streams |
| **KinesisFirehoseModel** | Lambda Event Source payload for Amazon Kinesis Firehose |
| **SesModel** | Lambda Event Source payload for Amazon Simple Email Service |
| **SnsModel** | Lambda Event Source payload for Amazon Simple Notification Service |
| **APIGatewayProxyEventModel** | Lambda Event Source payload for Amazon API Gateway |
Expand Down Expand Up @@ -319,6 +320,7 @@ Parser comes with the following built-in envelopes, where `Model` in the return
| **SqsEnvelope** | 1. Parses data using `SqsModel`. <br/> 2. Parses records in `body` key using your model and return them in a list. | `List[Model]` |
| **CloudWatchLogsEnvelope** | 1. Parses data using `CloudwatchLogsModel` which will base64 decode and decompress it. <br/> 2. Parses records in `message` key using your model and return them in a list. | `List[Model]` |
| **KinesisDataStreamEnvelope** | 1. Parses data using `KinesisDataStreamModel` which will base64 decode it. <br/> 2. Parses records in in `Records` key using your model and returns them in a list. | `List[Model]` |
| **KinesisFirehoseEnvelope** | 1. Parses data using `KinesisFirehoseModel` which will base64 decode it. <br/> 2. Parses records in in `Records` key using your model and returns them in a list. | `List[Model]` |
| **SnsEnvelope** | 1. Parses data using `SnsModel`. <br/> 2. Parses records in `body` key using your model and return them in a list. | `List[Model]` |
| **SnsSqsEnvelope** | 1. Parses data using `SqsModel`. <br/> 2. Parses SNS records in `body` key using `SnsNotificationModel`. <br/> 3. Parses data in `Message` key using your model and return them in a list. | `List[Model]` |
| **ApiGatewayEnvelope** | 1. Parses data using `APIGatewayProxyEventModel`. <br/> 2. Parses `body` key using your model and returns it. | `Model` |
Expand Down
32 changes: 32 additions & 0 deletions tests/events/kinesisFirehoseKinesisEvent.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a",
"sourceKinesisStreamArn":"arn:aws:kinesis:us-east-1:123456789012:stream/kinesis-source",
"deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name",
"region": "us-east-2",
"records": [
{
"data": "SGVsbG8gV29ybGQ=",
"recordId": "record1",
"approximateArrivalTimestamp": 1664028820148,
"kinesisRecordMetadata": {
"shardId": "shardId-000000000000",
"partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c317a",
"approximateArrivalTimestamp": 1664028820148,
"sequenceNumber": "49546986683135544286507457936321625675700192471156785154",
"subsequenceNumber": ""
}
},
{
"data": "eyJIZWxsbyI6ICJXb3JsZCJ9",
"recordId": "record2",
"approximateArrivalTimestamp": 1664028793294,
"kinesisRecordMetadata": {
"shardId": "shardId-000000000001",
"partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c318a",
"approximateArrivalTimestamp": 1664028793294,
"sequenceNumber": "49546986683135544286507457936321625675700192471156785155",
"subsequenceNumber": ""
}
}
]
}
17 changes: 17 additions & 0 deletions tests/events/kinesisFirehosePutEvent.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a",
"deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name",
"region": "us-east-2",
"records":[
{
"recordId":"record1",
"approximateArrivalTimestamp":1664029185290,
"data":"SGVsbG8gV29ybGQ="
},
{
"recordId":"record2",
"approximateArrivalTimestamp":1664029186945,
"data":"eyJIZWxsbyI6ICJXb3JsZCJ9"
}
]
}
4 changes: 4 additions & 0 deletions tests/functional/parser/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,7 @@ class MyALambdaFuncUrlBusiness(BaseModel):

class MyLambdaKafkaBusiness(BaseModel):
key: str


class MyKinesisFirehoseBusiness(BaseModel):
Hello: str
114 changes: 114 additions & 0 deletions tests/functional/parser/test_kinesis_firehose.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from typing import List

import pytest

from aws_lambda_powertools.utilities.parser import (
ValidationError,
envelopes,
event_parser,
)
from aws_lambda_powertools.utilities.parser.models import (
KinesisFirehoseModel,
KinesisFirehoseRecord,
KinesisFirehoseRecordMetadata,
)
from aws_lambda_powertools.utilities.typing import LambdaContext
from tests.functional.parser.schemas import MyKinesisFirehoseBusiness
from tests.functional.utils import load_event


@event_parser(model=MyKinesisFirehoseBusiness, envelope=envelopes.KinesisFirehoseEnvelope)
def handle_firehose(event: List[MyKinesisFirehoseBusiness], _: LambdaContext):
assert len(event) == 1
assert event[0].Hello == "World"


@event_parser(model=KinesisFirehoseModel)
def handle_firehose_no_envelope_kinesis(event: KinesisFirehoseModel, _: LambdaContext):
assert event.region == "us-east-2"
assert event.invocationId == "2b4d1ad9-2f48-94bd-a088-767c317e994a"
assert event.deliveryStreamArn == "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name"
assert event.sourceKinesisStreamArn == "arn:aws:kinesis:us-east-1:123456789012:stream/kinesis-source"

records = list(event.records)
assert len(records) == 2
record_01: KinesisFirehoseRecord = records[0]
assert record_01.approximateArrivalTimestamp == 1664028820148
assert record_01.recordId == "record1"
assert record_01.data == b"Hello World"

metadata_01: KinesisFirehoseRecordMetadata = record_01.kinesisRecordMetadata
assert metadata_01.partitionKey == "4d1ad2b9-24f8-4b9d-a088-76e9947c317a"
assert metadata_01.subsequenceNumber == ""
assert metadata_01.shardId == "shardId-000000000000"
assert metadata_01.approximateArrivalTimestamp == 1664028820148
assert metadata_01.sequenceNumber == "49546986683135544286507457936321625675700192471156785154"

record_02: KinesisFirehoseRecord = records[1]
assert record_02.approximateArrivalTimestamp == 1664028793294
assert record_02.recordId == "record2"
assert record_02.data == b'{"Hello": "World"}'

metadata_02: KinesisFirehoseRecordMetadata = record_02.kinesisRecordMetadata
assert metadata_02.partitionKey == "4d1ad2b9-24f8-4b9d-a088-76e9947c318a"
assert metadata_02.subsequenceNumber == ""
assert metadata_02.shardId == "shardId-000000000001"
assert metadata_02.approximateArrivalTimestamp == 1664028793294
assert metadata_02.sequenceNumber == "49546986683135544286507457936321625675700192471156785155"


@event_parser(model=KinesisFirehoseModel)
def handle_firehose_no_envelope_put(event: KinesisFirehoseModel, _: LambdaContext):
assert event.region == "us-east-2"
assert event.invocationId == "2b4d1ad9-2f48-94bd-a088-767c317e994a"
assert event.deliveryStreamArn == "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name"

records = list(event.records)
assert len(records) == 2

record_01: KinesisFirehoseRecord = records[0]
assert record_01.approximateArrivalTimestamp == 1664029185290
assert record_01.recordId == "record1"
assert record_01.data == b"Hello World"

record_02: KinesisFirehoseRecord = records[1]
assert record_02.approximateArrivalTimestamp == 1664029186945
assert record_02.recordId == "record2"
assert record_02.data == b'{"Hello": "World"}'


def test_firehose_trigger_event():
event_dict = load_event("kinesisFirehoseKinesisEvent.json")
event_dict["records"].pop(0) # remove first item since the payload is bytes and we want to test payload json class
handle_firehose(event_dict, LambdaContext())


def test_firehose_trigger_event_kinesis_no_envelope():
event_dict = load_event("kinesisFirehoseKinesisEvent.json")
handle_firehose_no_envelope_kinesis(event_dict, LambdaContext())


def test_firehose_trigger_event_put_no_envelope():
event_dict = load_event("kinesisFirehosePutEvent.json")
handle_firehose_no_envelope_put(event_dict, LambdaContext())


def test_kinesis_trigger_bad_base64_event():
event_dict = load_event("kinesisFirehoseKinesisEvent.json")
event_dict["records"][0]["data"] = {"bad base64"}
with pytest.raises(ValidationError):
handle_firehose_no_envelope_kinesis(event_dict, LambdaContext())


def test_kinesis_trigger_bad_timestamp_event():
event_dict = load_event("kinesisFirehoseKinesisEvent.json")
event_dict["records"][0]["approximateArrivalTimestamp"] = -1
with pytest.raises(ValidationError):
handle_firehose_no_envelope_kinesis(event_dict, LambdaContext())


def test_kinesis_trigger_bad_metadata_timestamp_event():
event_dict = load_event("kinesisFirehoseKinesisEvent.json")
event_dict["records"][0]["kinesisRecordMetadata"]["approximateArrivalTimestamp"] = "-1"
with pytest.raises(ValidationError):
handle_firehose_no_envelope_kinesis(event_dict, LambdaContext())