Skip to content

Commit 43eb3b9

Browse files
bhavikkumarbmoffatt
authored andcommitted
Add Kinesis Stream Metadata to firehose event (#188)
* Add source kinesis stream arn to firehose event * Add Kinesis Record Metadata struct to allow for reingest to a stream * fix approximateArrivalTimestamp in test data updated type to match sample in Lambda console
1 parent 18bf34b commit 43eb3b9

File tree

2 files changed

+20
-10
lines changed

2 files changed

+20
-10
lines changed

events/firehose.go

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,18 @@ package events
44

55
// KinesisFirehoseEvent represents the input event from Amazon Kinesis Firehose. It is used as the input parameter.
66
type KinesisFirehoseEvent struct {
7-
InvocationID string `json:"invocationId"`
8-
DeliveryStreamArn string `json:"deliveryStreamArn"`
9-
Region string `json:"region"`
10-
Records []KinesisFirehoseEventRecord `json:"records"`
7+
InvocationID string `json:"invocationId"`
8+
DeliveryStreamArn string `json:"deliveryStreamArn"`
9+
SourceKinesisStreamArn string `json:"sourceKinesisStreamArn"`
10+
Region string `json:"region"`
11+
Records []KinesisFirehoseEventRecord `json:"records"`
1112
}
1213

1314
type KinesisFirehoseEventRecord struct {
14-
RecordID string `json:"recordId"`
15-
ApproximateArrivalTimestamp MilliSecondsEpochTime `json:"approximateArrivalTimestamp"`
16-
Data []byte `json:"data"`
15+
RecordID string `json:"recordId"`
16+
ApproximateArrivalTimestamp MilliSecondsEpochTime `json:"approximateArrivalTimestamp"`
17+
Data []byte `json:"data"`
18+
KinesisFirehoseRecordMetadata KinesisFirehoseRecordMetadata `json:"kinesisRecordMetadata"`
1719
}
1820

1921
// Constants used for describing the transformation result
@@ -32,3 +34,10 @@ type KinesisFirehoseResponseRecord struct {
3234
Result string `json:"result"` // The status of the transformation. May be TransformedStateOk, TransformedStateDropped or TransformedStateProcessingFailed
3335
Data []byte `json:"data"`
3436
}
37+
38+
type KinesisFirehoseRecordMetadata struct {
39+
ShardID string `json:"shardId"`
40+
PartitionKey string `json:"partitionKey"`
41+
SequenceNumber string `json:"sequenceNumber"`
42+
ApproximateArrivalTimestamp MilliSecondsEpochTime `json:"approximateArrivalTimestamp"`
43+
}

events/testdata/kinesis-firehose-event.json

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
{
22
"invocationId": "invoked123",
33
"deliveryStreamArn": "aws:lambda:events",
4+
"sourceKinesisStreamArn": "arn:aws:kinesis:us-east-1:123456789012:stream/test",
45
"region": "us-west-2",
56
"records": [
67
{
@@ -10,7 +11,7 @@
1011
"kinesisRecordMetadata": {
1112
"shardId": "shardId-000000000000",
1213
"partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c317a",
13-
"approximateArrivalTimestamp": "1507217624302",
14+
"approximateArrivalTimestamp": 1507217624302,
1415
"sequenceNumber": "49546986683135544286507457936321625675700192471156785154",
1516
"subsequenceNumber": ""
1617
}
@@ -22,10 +23,10 @@
2223
"kinesisRecordMetadata": {
2324
"shardId": "shardId-000000000001",
2425
"partitionKey": "4d1ad2b9-24f8-4b9d-a088-76e9947c318a",
25-
"approximateArrivalTimestamp": "1507217624302",
26+
"approximateArrivalTimestamp": 1507217624302,
2627
"sequenceNumber": "49546986683135544286507457936321625675700192471156785155",
2728
"subsequenceNumber": ""
2829
}
2930
}
3031
]
31-
}
32+
}

0 commit comments

Comments
 (0)