Skip to content

Commit ca99aa0

Browse files
authored
Add DynamoDB and Kinesis Time Window events and response models (#441)
1 parent a0b8eda commit ca99aa0

8 files changed

+280
-1
lines changed

events/dynamodb.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,20 @@ type DynamoDBEvent struct {
88
Records []DynamoDBEventRecord `json:"Records"`
99
}
1010

11-
// DynamoDbEventRecord stores information about each record of a DynamoDb stream event
11+
// DynamoDBTimeWindowEvent represents an Amazon Dynamodb event when using time windows
12+
// ref. https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-windows
13+
type DynamoDBTimeWindowEvent struct {
14+
DynamoDBEvent
15+
TimeWindowProperties
16+
}
17+
18+
// DynamoDBTimeWindowEventResponse is the outer structure to report batch item failures for DynamoDBTimeWindowEvent.
19+
type DynamoDBTimeWindowEventResponse struct {
20+
TimeWindowEventResponseProperties
21+
BatchItemFailures []DynamoDBBatchItemFailure `json:"batchItemFailures"`
22+
}
23+
24+
// DynamoDBEventRecord stores information about each record of a DynamoDB stream event
1225
type DynamoDBEventRecord struct {
1326
// The region in which the GetRecords request was received.
1427
AWSRegion string `json:"awsRegion"`

events/dynamodb_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,27 @@ func TestDynamoDBEventMarshaling(t *testing.T) {
3434
func TestDynamoDBEventMarshalingMalformedJson(t *testing.T) {
3535
test.TestMalformedJson(t, DynamoDBEvent{})
3636
}
37+
38+
func TestDynamoDBTimeWindowEventMarshaling(t *testing.T) {
39+
// 1. read JSON from file
40+
inputJSON := test.ReadJSONFromFile(t, "./testdata/dynamodb-time-window-event.json")
41+
42+
// 2. de-serialize into Go object
43+
var inputEvent DynamoDBTimeWindowEvent
44+
if err := json.Unmarshal(inputJSON, &inputEvent); err != nil {
45+
t.Errorf("could not unmarshal event. details: %v", err)
46+
}
47+
48+
// 3. serialize to JSON
49+
outputJSON, err := json.Marshal(inputEvent)
50+
if err != nil {
51+
t.Errorf("could not marshal event. details: %v", err)
52+
}
53+
54+
// 4. check result
55+
assert.JSONEq(t, string(inputJSON), string(outputJSON))
56+
}
57+
58+
func TestDynamoDBTimeWindowEventMarshalingMalformedJson(t *testing.T) {
59+
test.TestMalformedJson(t, DynamoDBTimeWindowEvent{})
60+
}

events/epoch_time.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ import (
77
"time"
88
)
99

10+
// RFC3339EpochTime serializes a time.Time in JSON as an ISO 8601 string.
11+
type RFC3339EpochTime struct {
12+
time.Time
13+
}
14+
1015
// SecondsEpochTime serializes a time.Time in JSON as a UNIX epoch time in seconds
1116
type SecondsEpochTime struct {
1217
time.Time
@@ -57,3 +62,24 @@ func (e *MilliSecondsEpochTime) UnmarshalJSON(b []byte) error {
5762
*e = MilliSecondsEpochTime{time.Unix(epoch/1000, (epoch%1000)*1000000)}
5863
return nil
5964
}
65+
66+
func (e RFC3339EpochTime) MarshalJSON() ([]byte, error) {
67+
isoTimestampStr := e.Format(time.RFC3339)
68+
return json.Marshal(isoTimestampStr)
69+
}
70+
71+
func (e *RFC3339EpochTime) UnmarshalJSON(b []byte) error {
72+
var isoTimestampStr string
73+
err := json.Unmarshal(b, &isoTimestampStr)
74+
if err != nil {
75+
return err
76+
}
77+
78+
parsed, err := time.Parse(time.RFC3339, isoTimestampStr)
79+
if err != nil {
80+
return err
81+
}
82+
83+
*e = RFC3339EpochTime{parsed}
84+
return nil
85+
}

events/kinesis.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,19 @@ type KinesisEvent struct {
66
Records []KinesisEventRecord `json:"Records"`
77
}
88

9+
// KinesisTimeWindowEvent represents an Amazon Dynamodb event when using time windows
10+
// ref. https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-windows
11+
type KinesisTimeWindowEvent struct {
12+
KinesisEvent
13+
TimeWindowProperties
14+
}
15+
16+
// KinesisTimeWindowEventResponse is the outer structure to report batch item failures for KinesisTimeWindowEvent.
17+
type KinesisTimeWindowEventResponse struct {
18+
TimeWindowEventResponseProperties
19+
BatchItemFailures []KinesisBatchItemFailure `json:"batchItemFailures"`
20+
}
21+
922
type KinesisEventRecord struct {
1023
AwsRegion string `json:"awsRegion"` //nolint: stylecheck
1124
EventID string `json:"eventID"`

events/kinesis_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,27 @@ func TestKinesisEventMarshaling(t *testing.T) {
3333
func TestKinesisMarshalingMalformedJson(t *testing.T) {
3434
test.TestMalformedJson(t, KinesisEvent{})
3535
}
36+
37+
func TestKinesisTimeWindowEventMarshaling(t *testing.T) {
38+
// 1. read JSON from file
39+
inputJSON := test.ReadJSONFromFile(t, "./testdata/kinesis-time-window-event.json")
40+
41+
// 2. de-serialize into Go object
42+
var inputEvent KinesisTimeWindowEvent
43+
if err := json.Unmarshal(inputJSON, &inputEvent); err != nil {
44+
t.Errorf("could not unmarshal event. details: %v", err)
45+
}
46+
47+
// 3. serialize to JSON
48+
outputJSON, err := json.Marshal(inputEvent)
49+
if err != nil {
50+
t.Errorf("could not marshal event. details: %v", err)
51+
}
52+
53+
// 4. check result
54+
assert.JSONEq(t, string(inputJSON), string(outputJSON))
55+
}
56+
57+
func TestKinesisTimeWindowEventMarshalingMalformedJson(t *testing.T) {
58+
test.TestMalformedJson(t, KinesisTimeWindowEvent{})
59+
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
{
2+
"Records":[
3+
{
4+
"eventID":"1",
5+
"eventName":"INSERT",
6+
"eventVersion":"1.0",
7+
"eventSource":"aws:dynamodb",
8+
"awsRegion":"us-east-1",
9+
"dynamodb":{
10+
"ApproximateCreationDateTime": 1480642020,
11+
"Keys":{
12+
"Id":{
13+
"N":"101"
14+
}
15+
},
16+
"NewImage":{
17+
"Message":{
18+
"S":"New item!"
19+
},
20+
"Id":{
21+
"N":"101"
22+
}
23+
},
24+
"SequenceNumber":"111",
25+
"SizeBytes":26,
26+
"StreamViewType":"NEW_AND_OLD_IMAGES"
27+
},
28+
"eventSourceARN":"stream-ARN"
29+
},
30+
{
31+
"eventID":"2",
32+
"eventName":"MODIFY",
33+
"eventVersion":"1.0",
34+
"eventSource":"aws:dynamodb",
35+
"awsRegion":"us-east-1",
36+
"dynamodb":{
37+
"ApproximateCreationDateTime": 1480642020,
38+
"Keys":{
39+
"Id":{
40+
"N":"101"
41+
}
42+
},
43+
"NewImage":{
44+
"Message":{
45+
"S":"This item has changed"
46+
},
47+
"Id":{
48+
"N":"101"
49+
}
50+
},
51+
"OldImage":{
52+
"Message":{
53+
"S":"New item!"
54+
},
55+
"Id":{
56+
"N":"101"
57+
}
58+
},
59+
"SequenceNumber":"222",
60+
"SizeBytes":59,
61+
"StreamViewType":"NEW_AND_OLD_IMAGES"
62+
},
63+
"eventSourceARN":"stream-ARN"
64+
},
65+
{
66+
"eventID":"3",
67+
"eventName":"REMOVE",
68+
"eventVersion":"1.0",
69+
"eventSource":"aws:dynamodb",
70+
"awsRegion":"us-east-1",
71+
"dynamodb":{
72+
"ApproximateCreationDateTime": 1480642020,
73+
"Keys":{
74+
"Id":{
75+
"N":"101"
76+
}
77+
},
78+
"OldImage":{
79+
"Message":{
80+
"S":"This item has changed"
81+
},
82+
"Id":{
83+
"N":"101"
84+
}
85+
},
86+
"SequenceNumber":"333",
87+
"SizeBytes":38,
88+
"StreamViewType":"NEW_AND_OLD_IMAGES"
89+
},
90+
"eventSourceARN":"stream-ARN"
91+
}
92+
],
93+
"window": {
94+
"start": "2020-07-30T17:00:00Z",
95+
"end": "2020-07-30T17:05:00Z"
96+
},
97+
"state": {
98+
"1": "state1"
99+
},
100+
"shardId": "shard123456789",
101+
"eventSourceARN": "stream-ARN",
102+
"isFinalInvokeForWindow": false,
103+
"isWindowTerminatedEarly": false
104+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
2+
{
3+
"Records": [
4+
{
5+
"kinesis": {
6+
"kinesisSchemaVersion": "1.0",
7+
"partitionKey": "1",
8+
"sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
9+
"data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
10+
"approximateArrivalTimestamp": 1607497475.000
11+
},
12+
"eventSource": "aws:kinesis",
13+
"eventVersion": "1.0",
14+
"eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
15+
"eventName": "aws:kinesis:record",
16+
"invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role",
17+
"awsRegion": "us-east-1",
18+
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream"
19+
}
20+
],
21+
"window": {
22+
"start": "2020-12-09T07:04:00Z",
23+
"end": "2020-12-09T07:06:00Z"
24+
},
25+
"state": {
26+
"1": "state 1",
27+
"2": "state 2"
28+
},
29+
"shardId": "shardId-000000000006",
30+
"eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream",
31+
"isFinalInvokeForWindow": false,
32+
"isWindowTerminatedEarly": false
33+
}

events/time_window.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package events
2+
3+
// Window is the object that captures the time window for the records in the event when using the tumbling windows feature
4+
// Kinesis: https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-windows
5+
// DDB: https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-windows
6+
type Window struct {
7+
Start RFC3339EpochTime `json:"start"`
8+
End RFC3339EpochTime `json:"end"`
9+
}
10+
11+
// TimeWindowProperties is the object that captures properties that relate to the tumbling windows feature
12+
// Kinesis: https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-windows
13+
// DDB: https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-windows
14+
type TimeWindowProperties struct {
15+
// Time window for the records in the event.
16+
Window Window `json:"window"`
17+
18+
// State being built up to this invoke in the time window.
19+
State map[string]string `json:"state"`
20+
21+
// Shard id of the records
22+
ShardID string `json:"shardId"`
23+
24+
// The event source ARN of the service that generated the event (eg. DynamoDB or Kinesis)
25+
EventSourceARN string `json:"eventSourceARN"`
26+
27+
// Set to true for the last invoke of the time window.
28+
// Subsequent invoke will start a new time window along with a fresh state.
29+
IsFinalInvokeForWindow bool `json:"isFinalInvokeForWindow"`
30+
31+
// Set to true if window is terminated prematurely.
32+
// Subsequent invoke will continue the same window with a fresh state.
33+
IsWindowTerminatedEarly bool `json:"isWindowTerminatedEarly"`
34+
}
35+
36+
// TimeWindowEventResponseProperties is the object that captures response properties that relate to the tumbling windows feature
37+
// Kinesis: https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html#services-kinesis-windows
38+
// DDB: https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#services-ddb-windows
39+
type TimeWindowEventResponseProperties struct {
40+
// State being built up to this invoke in the time window.
41+
State map[string]string `json:"state"`
42+
}

0 commit comments

Comments
 (0)