Skip to content

feat(parser): add KinesisFirehoseModel #1556

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Oct 10, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from datetime import datetime
from typing import List, Optional, Type, Union

from pydantic import BaseModel, validator
Expand All @@ -9,15 +8,15 @@
class KinesisFirehoseRecordMetadata(BaseModel):
shardId: str
partitionKey: str
approximateArrivalTimestamp: str
approximateArrivalTimestamp: int
sequenceNumber: str
subsequenceNumber: str


class KinesisFirehoseRecord(BaseModel):
data: Union[bytes, Type[BaseModel]] # base64 encoded str is parsed into bytes
recordId: str
approximateArrivalTimestamp: datetime
approximateArrivalTimestamp: int
kinesisRecordMetadata: Optional[KinesisFirehoseRecordMetadata]

@validator("data", pre=True, allow_reuse=True)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,32 @@
{
"invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a",
"sourceKinesisStreamArn":"arn:aws:kinesis:us-east-1:123456789012:stream/kinesis-source",
"deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name",
"region": "us-east-2",
"records": [
{
"data": "SGVsbG8gV29ybGQ=",
"recordId": "record1",
"approximateArrivalTimestamp": 1510772160000,
"approximateArrivalTimestamp": 1664028820148,
"kinesisRecordMetadata": {
"shardId": "shardId-000000000000",
"partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c317a",
"approximateArrivalTimestamp": "2012-04-23T18:25:43.511Z",
"approximateArrivalTimestamp": 1664028820148,
"sequenceNumber": "49546986683135544286507457936321625675700192471156785154",
"subsequenceNumber": ""
}
},
{
"data": "eyJIZWxsbyI6ICJXb3JsZCJ9",
"recordId": "record2",
"approximateArrivalTimestamp": 151077216000,
"approximateArrivalTimestamp": 1664028793294,
"kinesisRecordMetadata": {
"shardId": "shardId-000000000001",
"partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c318a",
"approximateArrivalTimestamp": "2012-04-23T19:25:43.511Z",
"approximateArrivalTimestamp": 1664028793294,
"sequenceNumber": "49546986683135544286507457936321625675700192471156785155",
"subsequenceNumber": ""
}
}
]
}
}
17 changes: 17 additions & 0 deletions tests/events/kinesisFirehosePutEvent.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"invocationId": "2b4d1ad9-2f48-94bd-a088-767c317e994a",
"deliveryStreamArn": "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name",
"region": "us-east-2",
"records":[
{
"recordId":"record1",
"approximateArrivalTimestamp":1664029185290,
"data":"SGVsbG8gV29ybGQ="
},
{
"recordId":"record2",
"approximateArrivalTimestamp":1664029186945,
"data":"eyJIZWxsbyI6ICJXb3JsZCJ9"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,61 +14,83 @@


@event_parser(model=MyKinesisFirehoseBusiness, envelope=envelopes.KinesisFirehoseEnvelope)
def handle_kinesis(event: List[MyKinesisFirehoseBusiness], _: LambdaContext):
def handle_firehose(event: List[MyKinesisFirehoseBusiness], _: LambdaContext):
assert len(event) == 1
assert event[0].Hello == "World"


@event_parser(model=KinesisFirehoseModel)
def handle_kinesis_no_envelope(event: KinesisFirehoseModel, _: LambdaContext):
def handle_firehose_no_envelope_kinesis(event: KinesisFirehoseModel, _: LambdaContext):
assert event.region == "us-east-2"
assert event.invocationId == "2b4d1ad9-2f48-94bd-a088-767c317e994a"
assert event.deliveryStreamArn == "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name"
assert event.sourceKinesisStreamArn is None
assert event.sourceKinesisStreamArn == "arn:aws:kinesis:us-east-1:123456789012:stream/kinesis-source"

records = list(event.records)
assert len(records) == 2
record_01: KinesisFirehoseRecord = records[0]

convert_time = int(round(record_01.approximateArrivalTimestamp.timestamp() * 1000))
assert convert_time == 1510772160000
assert record_01.approximateArrivalTimestamp == 1664028820148
assert record_01.recordId == "record1"
assert record_01.data == b"Hello World"

metadata_01: KinesisFirehoseRecordMetadata = record_01.kinesisRecordMetadata
assert metadata_01.partitionKey == "4d1ad2b9-24f8-4b9d-a088-76e9947c317a"
assert metadata_01.subsequenceNumber == ""
assert metadata_01.shardId == "shardId-000000000000"
assert metadata_01.approximateArrivalTimestamp == "2012-04-23T18:25:43.511Z"
assert metadata_01.approximateArrivalTimestamp == 1664028820148
assert metadata_01.sequenceNumber == "49546986683135544286507457936321625675700192471156785154"

record_02: KinesisFirehoseRecord = records[1]
convert_time = int(round(record_02.approximateArrivalTimestamp.timestamp() * 1000))
assert convert_time == 151077216000
assert record_02.approximateArrivalTimestamp == 1664028793294
assert record_02.recordId == "record2"
assert record_02.data == b'{"Hello": "World"}'

metadata_02: KinesisFirehoseRecordMetadata = record_02.kinesisRecordMetadata
assert metadata_02.partitionKey == "4d1ad2b9-24f8-4b9d-a088-76e9947c318a"
assert metadata_02.subsequenceNumber == ""
assert metadata_02.shardId == "shardId-000000000001"
assert metadata_02.approximateArrivalTimestamp == "2012-04-23T19:25:43.511Z"
assert metadata_02.approximateArrivalTimestamp == 1664028793294
assert metadata_02.sequenceNumber == "49546986683135544286507457936321625675700192471156785155"


def test_kinesis_trigger_event():
event_dict = load_event("kinesisFirehoseEvent.json")
@event_parser(model=KinesisFirehoseModel)
def handle_firehose_no_envelope_put(event: KinesisFirehoseModel, _: LambdaContext):
assert event.region == "us-east-2"
assert event.invocationId == "2b4d1ad9-2f48-94bd-a088-767c317e994a"
assert event.deliveryStreamArn == "arn:aws:firehose:us-east-2:123456789012:deliverystream/delivery-stream-name"

records = list(event.records)
assert len(records) == 2

record_01: KinesisFirehoseRecord = records[0]
assert record_01.approximateArrivalTimestamp == 1664029185290
assert record_01.recordId == "record1"
assert record_01.data == b"Hello World"

record_02: KinesisFirehoseRecord = records[1]
assert record_02.approximateArrivalTimestamp == 1664029186945
assert record_02.recordId == "record2"
assert record_02.data == b'{"Hello": "World"}'


def test_firehose_trigger_event():
event_dict = load_event("kinesisFirehoseKinesisEvent.json")
event_dict["records"].pop(0) # remove first item since the payload is bytes and we want to test payload json class
handle_kinesis(event_dict, LambdaContext())
handle_firehose(event_dict, LambdaContext())


def test_firehose_trigger_event_kinesis_no_envelope():
event_dict = load_event("kinesisFirehoseKinesisEvent.json")
handle_firehose_no_envelope_kinesis(event_dict, LambdaContext())


def test_kinesis_trigger_event_no_envelope():
event_dict = load_event("kinesisFirehoseEvent.json")
handle_kinesis_no_envelope(event_dict, LambdaContext())
def test_firehose_trigger_event_put_no_envelope():
event_dict = load_event("kinesisFirehosePutEvent.json")
handle_firehose_no_envelope_put(event_dict, LambdaContext())


def test_kinesis_trigger_bad_base64_event():
event_dict = load_event("kinesisFirehoseEvent.json")
event_dict = load_event("kinesisFirehoseKinesisEvent.json")
event_dict["records"][0]["data"] = {"bad base64"}
with pytest.raises(ValidationError):
handle_kinesis_no_envelope(event_dict, LambdaContext())
handle_firehose_no_envelope_kinesis(event_dict, LambdaContext())