From 2069fc3e25b48f40959ccf8a377c99da8327c66f Mon Sep 17 00:00:00 2001 From: Alexander Schueren Date: Mon, 18 Nov 2024 11:48:22 +0100 Subject: [PATCH 1/6] migrated kinesis events for tests --- .../firehose-put.json} | 0 .../firehose-sqs.json} | 0 .../firehose.json} | 0 .../stream-cloudwatch-logs.json} | 0 .../stream-one-record.json} | 0 .../stream.json} | 0 .../unit/envelopes/kinesis-firehose.test.ts | 70 +++++++++++-------- .../tests/unit/envelopes/kinesis.test.ts | 24 +++++-- .../parser/tests/unit/schema/kinesis.test.ts | 59 +++++++++++----- 9 files changed, 102 insertions(+), 51 deletions(-) rename packages/parser/tests/events/{kinesisFirehosePutEvent.json => kinesis/firehose-put.json} (100%) rename packages/parser/tests/events/{kinesisFirehoseSQSEvent.json => kinesis/firehose-sqs.json} (100%) rename packages/parser/tests/events/{kinesisFirehoseKinesisEvent.json => kinesis/firehose.json} (100%) rename packages/parser/tests/events/{kinesisStreamCloudWatchLogsEvent.json => kinesis/stream-cloudwatch-logs.json} (100%) rename packages/parser/tests/events/{kinesisStreamEventOneRecord.json => kinesis/stream-one-record.json} (100%) rename packages/parser/tests/events/{kinesisStreamEvent.json => kinesis/stream.json} (100%) diff --git a/packages/parser/tests/events/kinesisFirehosePutEvent.json b/packages/parser/tests/events/kinesis/firehose-put.json similarity index 100% rename from packages/parser/tests/events/kinesisFirehosePutEvent.json rename to packages/parser/tests/events/kinesis/firehose-put.json diff --git a/packages/parser/tests/events/kinesisFirehoseSQSEvent.json b/packages/parser/tests/events/kinesis/firehose-sqs.json similarity index 100% rename from packages/parser/tests/events/kinesisFirehoseSQSEvent.json rename to packages/parser/tests/events/kinesis/firehose-sqs.json diff --git a/packages/parser/tests/events/kinesisFirehoseKinesisEvent.json b/packages/parser/tests/events/kinesis/firehose.json similarity index 100% rename from packages/parser/tests/events/kinesisFirehoseKinesisEvent.json rename to packages/parser/tests/events/kinesis/firehose.json diff --git a/packages/parser/tests/events/kinesisStreamCloudWatchLogsEvent.json b/packages/parser/tests/events/kinesis/stream-cloudwatch-logs.json similarity index 100% rename from packages/parser/tests/events/kinesisStreamCloudWatchLogsEvent.json rename to packages/parser/tests/events/kinesis/stream-cloudwatch-logs.json diff --git a/packages/parser/tests/events/kinesisStreamEventOneRecord.json b/packages/parser/tests/events/kinesis/stream-one-record.json similarity index 100% rename from packages/parser/tests/events/kinesisStreamEventOneRecord.json rename to packages/parser/tests/events/kinesis/stream-one-record.json diff --git a/packages/parser/tests/events/kinesisStreamEvent.json b/packages/parser/tests/events/kinesis/stream.json similarity index 100% rename from packages/parser/tests/events/kinesisStreamEvent.json rename to packages/parser/tests/events/kinesis/stream.json diff --git a/packages/parser/tests/unit/envelopes/kinesis-firehose.test.ts b/packages/parser/tests/unit/envelopes/kinesis-firehose.test.ts index 1f9d7baa89..c0216cb387 100644 --- a/packages/parser/tests/unit/envelopes/kinesis-firehose.test.ts +++ b/packages/parser/tests/unit/envelopes/kinesis-firehose.test.ts @@ -4,15 +4,21 @@ import { ZodError, type z } from 'zod'; import { ParseError } from '../../../src'; import { KinesisFirehoseEnvelope } from '../../../src/envelopes/index.js'; import type { KinesisFirehoseSchema } from '../../../src/schemas/'; -import { TestEvents, TestSchema } from '../schema/utils.js'; +import type { + KinesisFireHoseEvent, + KinesisFireHoseSqsEvent, +} from '../../../src/types'; +import { TestEvents, TestSchema, getTestEvent } from '../schema/utils.js'; describe('Kinesis Firehose Envelope', () => { + const eventsPath = 'kinesis'; describe('parse', () => { it('should parse records for PutEvent', () => { const mock = generateMock(TestSchema); - const testEvent = TestEvents.kinesisFirehosePutEvent as z.infer< - typeof KinesisFirehoseSchema - >; + const testEvent = getTestEvent({ + eventsPath, + filename: 'firehose-put', + }); testEvent.records.map((record) => { record.data = Buffer.from(JSON.stringify(mock)).toString('base64'); @@ -24,9 +30,10 @@ describe('Kinesis Firehose Envelope', () => { it('should parse a single record for SQS event', () => { const mock = generateMock(TestSchema); - const testEvent = TestEvents.kinesisFirehoseSQSEvent as z.infer< - typeof KinesisFirehoseSchema - >; + const testEvent = getTestEvent({ + eventsPath, + filename: 'firehose-sqs', + }); testEvent.records.map((record) => { record.data = Buffer.from(JSON.stringify(mock)).toString('base64'); @@ -38,9 +45,10 @@ describe('Kinesis Firehose Envelope', () => { it('should parse records for kinesis event', () => { const mock = generateMock(TestSchema); - const testEvent = TestEvents.kinesisFirehoseKinesisEvent as z.infer< - typeof KinesisFirehoseSchema - >; + const testEvent = getTestEvent({ + eventsPath, + filename: 'firehose', + }); testEvent.records.map((record) => { record.data = Buffer.from(JSON.stringify(mock)).toString('base64'); @@ -50,9 +58,10 @@ describe('Kinesis Firehose Envelope', () => { expect(resp).toEqual([mock, mock]); }); it('should throw if record is not base64 encoded', () => { - const testEvent = TestEvents.kinesisFirehosePutEvent as z.infer< - typeof KinesisFirehoseSchema - >; + const testEvent = getTestEvent({ + eventsPath, + filename: 'firehose-put', + }); testEvent.records.map((record) => { record.data = 'not base64 encoded'; @@ -68,9 +77,10 @@ describe('Kinesis Firehose Envelope', () => { }).toThrow(); }); it('should throw when schema does not match record', () => { - const testEvent = TestEvents.kinesisFirehosePutEvent as z.infer< - typeof KinesisFirehoseSchema - >; + const testEvent = getTestEvent({ + eventsPath, + filename: 'firehose-put', + }); testEvent.records.map((record) => { record.data = Buffer.from('not a valid json').toString('base64'); @@ -84,9 +94,10 @@ describe('Kinesis Firehose Envelope', () => { describe('safeParse', () => { it('should parse records for PutEvent', () => { const mock = generateMock(TestSchema); - const testEvent = TestEvents.kinesisFirehosePutEvent as z.infer< - typeof KinesisFirehoseSchema - >; + const testEvent = getTestEvent({ + eventsPath, + filename: 'firehose-put', + }); testEvent.records.map((record) => { record.data = Buffer.from(JSON.stringify(mock)).toString('base64'); @@ -98,9 +109,10 @@ describe('Kinesis Firehose Envelope', () => { it('should parse a single record for SQS event', () => { const mock = generateMock(TestSchema); - const testEvent = TestEvents.kinesisFirehoseSQSEvent as z.infer< - typeof KinesisFirehoseSchema - >; + const testEvent = getTestEvent({ + eventsPath, + filename: 'firehose-sqs', + }); testEvent.records.map((record) => { record.data = Buffer.from(JSON.stringify(mock)).toString('base64'); @@ -112,9 +124,10 @@ describe('Kinesis Firehose Envelope', () => { it('should parse records for kinesis event', () => { const mock = generateMock(TestSchema); - const testEvent = TestEvents.kinesisFirehoseKinesisEvent as z.infer< - typeof KinesisFirehoseSchema - >; + const testEvent = getTestEvent({ + eventsPath, + filename: 'firehose', + }); testEvent.records.map((record) => { record.data = Buffer.from(JSON.stringify(mock)).toString('base64'); @@ -139,9 +152,10 @@ describe('Kinesis Firehose Envelope', () => { } }); it('should return original event if record is not base64 encoded', () => { - const testEvent = TestEvents.kinesisFirehosePutEvent as z.infer< - typeof KinesisFirehoseSchema - >; + const testEvent = getTestEvent({ + eventsPath, + filename: 'firehose-put', + }); testEvent.records.map((record) => { record.data = 'not base64 encoded'; diff --git a/packages/parser/tests/unit/envelopes/kinesis.test.ts b/packages/parser/tests/unit/envelopes/kinesis.test.ts index 28c98ee382..02d030ec28 100644 --- a/packages/parser/tests/unit/envelopes/kinesis.test.ts +++ b/packages/parser/tests/unit/envelopes/kinesis.test.ts @@ -3,13 +3,18 @@ import type { KinesisStreamEvent } from 'aws-lambda'; import { describe, expect, it } from 'vitest'; import { KinesisEnvelope } from '../../../src/envelopes/index.js'; import { ParseError } from '../../../src/errors.js'; -import { TestEvents, TestSchema } from '../schema/utils.js'; +import type { KinesisDataStreamEvent } from '../../../src/types/schema.js'; +import { TestEvents, TestSchema, getTestEvent } from '../schema/utils.js'; describe('KinesisEnvelope', () => { + const eventsPath = 'kinesis'; describe('parse', () => { it('should parse Kinesis Stream event', () => { const mock = generateMock(TestSchema); - const testEvent = TestEvents.kinesisStreamEvent as KinesisStreamEvent; + const testEvent = getTestEvent({ + eventsPath, + filename: 'stream', + }); testEvent.Records.map((record) => { record.kinesis.data = Buffer.from(JSON.stringify(mock)).toString( @@ -24,7 +29,10 @@ describe('KinesisEnvelope', () => { expect(() => KinesisEnvelope.parse({ foo: 'bar' }, TestSchema)).toThrow(); }); it('should throw if record is invalid', () => { - const testEvent = TestEvents.kinesisStreamEvent as KinesisStreamEvent; + const testEvent = getTestEvent({ + eventsPath, + filename: 'stream', + }); testEvent.Records[0].kinesis.data = 'invalid'; expect(() => KinesisEnvelope.parse(testEvent, TestSchema)).toThrow(); }); @@ -33,7 +41,10 @@ describe('KinesisEnvelope', () => { describe('safeParse', () => { it('should parse Kinesis Stream event', () => { const mock = generateMock(TestSchema); - const testEvent = TestEvents.kinesisStreamEvent as KinesisStreamEvent; + const testEvent = getTestEvent({ + eventsPath, + filename: 'stream', + }); testEvent.Records.map((record) => { record.kinesis.data = Buffer.from(JSON.stringify(mock)).toString( @@ -54,7 +65,10 @@ describe('KinesisEnvelope', () => { }); }); it('should return original event if record is invalid', () => { - const testEvent = TestEvents.kinesisStreamEvent as KinesisStreamEvent; + const testEvent = getTestEvent({ + eventsPath, + filename: 'stream', + }); testEvent.Records[0].kinesis.data = 'invalid'; const parseResult = KinesisEnvelope.safeParse(testEvent, TestSchema); expect(parseResult).toEqual({ diff --git a/packages/parser/tests/unit/schema/kinesis.test.ts b/packages/parser/tests/unit/schema/kinesis.test.ts index 5ff2453f49..503439c8aa 100644 --- a/packages/parser/tests/unit/schema/kinesis.test.ts +++ b/packages/parser/tests/unit/schema/kinesis.test.ts @@ -18,12 +18,15 @@ import type { KinesisFirehoseRecord, KinesisFirehoseSqsRecord, } from '../../../src/types/schema'; -import { TestEvents } from './utils.js'; +import { TestEvents, getTestEvent } from './utils.js'; describe('Kinesis ', () => { + const eventsPath = 'kinesis'; it('should parse kinesis event', () => { - const kinesisStreamEvent = - TestEvents.kinesisStreamEvent as KinesisDataStreamEvent; + const kinesisStreamEvent = getTestEvent({ + eventsPath, + filename: 'stream', + }); const parsed = KinesisDataStreamSchema.parse(kinesisStreamEvent); const transformedInput = { @@ -41,8 +44,10 @@ describe('Kinesis ', () => { expect(parsed).toStrictEqual(transformedInput); }); it('should parse single kinesis record', () => { - const kinesisStreamEventOneRecord = - TestEvents.kinesisStreamEventOneRecord as KinesisDataStreamEvent; + const kinesisStreamEventOneRecord = getTestEvent({ + eventsPath, + filename: 'stream-one-record', + }); const parsed = KinesisDataStreamSchema.parse(kinesisStreamEventOneRecord); const transformedInput = { @@ -62,8 +67,10 @@ describe('Kinesis ', () => { expect(parsed).toStrictEqual(transformedInput); }); it('should parse Firehose event', () => { - const kinesisFirehoseKinesisEvent = - TestEvents.kinesisFirehoseKinesisEvent as KinesisFireHoseEvent; + const kinesisFirehoseKinesisEvent = getTestEvent({ + eventsPath, + filename: 'firehose', + }); const parsed = KinesisFirehoseSchema.parse(kinesisFirehoseKinesisEvent); const transformedInput = { @@ -79,8 +86,11 @@ describe('Kinesis ', () => { expect(parsed).toStrictEqual(transformedInput); }); it('should parse Kinesis Firehose PutEvents event', () => { - const kinesisFirehosePutEvent = - TestEvents.kinesisFirehosePutEvent as KinesisFireHoseEvent; + const kinesisFirehosePutEvent = getTestEvent({ + eventsPath, + filename: 'firehose-put', + }); + const parsed = KinesisFirehoseSchema.parse(kinesisFirehosePutEvent); const transformedInput = { @@ -96,8 +106,11 @@ describe('Kinesis ', () => { expect(parsed).toStrictEqual(transformedInput); }); it('should parse Firehose event with SQS event', () => { - const kinesisFirehoseSQSEvent = - TestEvents.kinesisFirehoseSQSEvent as KinesisFireHoseSqsEvent; + const kinesisFirehoseSQSEvent = getTestEvent({ + eventsPath, + filename: 'firehose-sqs', + }); + const parsed = KinesisFirehoseSqsSchema.parse(kinesisFirehoseSQSEvent); const transformedInput = { @@ -116,7 +129,10 @@ describe('Kinesis ', () => { }); it('should parse Kinesis event with CloudWatch event', () => { const kinesisStreamCloudWatchLogsEvent = - TestEvents.kinesisStreamCloudWatchLogsEvent as KinesisDataStreamEvent; + getTestEvent({ + eventsPath, + filename: 'stream-cloudwatch-logs', + }); const parsed = KinesisDataStreamSchema.parse( kinesisStreamCloudWatchLogsEvent ); @@ -140,9 +156,10 @@ describe('Kinesis ', () => { expect(parsed).toStrictEqual(transformedInput); }); it('should return original value if cannot parse KinesisFirehoseSqsRecord', () => { - const kinesisFirehoseSQSEvent = TestEvents.kinesisFirehoseSQSEvent as { - records: { data: string }[]; - }; + const kinesisFirehoseSQSEvent = getTestEvent({ + eventsPath, + filename: 'firehose-sqs', + }); kinesisFirehoseSQSEvent.records[0].data = 'not a valid json'; const parsed = KinesisFirehoseSqsSchema.parse(kinesisFirehoseSQSEvent); @@ -150,7 +167,10 @@ describe('Kinesis ', () => { }); it('should parse a kinesis record from a kinesis event', () => { const kinesisStreamEvent: KinesisDataStreamEvent = - TestEvents.kinesisStreamEvent as KinesisDataStreamEvent; + getTestEvent({ + eventsPath, + filename: 'stream-one-record', + }); const parsedRecord = KinesisDataStreamRecord.parse( kinesisStreamEvent.Records[0] ); @@ -161,7 +181,7 @@ describe('Kinesis ', () => { it('should parse a kinesis firehose record from a kinesis firehose event', () => { const kinesisFirehoseEvent: KinesisFireHoseEvent = - TestEvents.kinesisFirehoseKinesisEvent as KinesisFireHoseEvent; + getTestEvent({ eventsPath, filename: 'firehose' }); const parsedRecord: KinesisFirehoseRecord = KinesisFirehoseRecordSchema.parse(kinesisFirehoseEvent.records[0]); @@ -170,7 +190,10 @@ describe('Kinesis ', () => { it('should parse a sqs record from a kinesis firehose event', () => { const kinesisFireHoseSqsEvent: KinesisFireHoseSqsEvent = - TestEvents.kinesisFirehoseSQSEvent as KinesisFireHoseSqsEvent; + getTestEvent({ + eventsPath, + filename: 'firehose-sqs', + }); const parsed: KinesisFirehoseSqsRecord = KinesisFirehoseSqsRecordSchema.parse(kinesisFireHoseSqsEvent.records[0]); From a4a7abd9fd2379afd0dea590864a3961a0f3e527 Mon Sep 17 00:00:00 2001 From: Alexander Schueren Date: Mon, 18 Nov 2024 20:24:43 +0100 Subject: [PATCH 2/6] feat(schemas): add KinesisDynamoDBStreamSchema and related transformations for DynamoDB stream events --- packages/parser/src/schemas/dynamodb.ts | 14 +++++ packages/parser/src/schemas/index.ts | 6 ++- packages/parser/src/schemas/kinesis.ts | 23 ++++++-- packages/parser/src/types/schema.ts | 4 ++ .../tests/events/kinesis/dynamodb-stream.json | 36 +++++++++++++ .../parser/tests/unit/schema/kinesis.test.ts | 54 +++++++++++++++++++ 6 files changed, 133 insertions(+), 4 deletions(-) create mode 100644 packages/parser/tests/events/kinesis/dynamodb-stream.json diff --git a/packages/parser/src/schemas/dynamodb.ts b/packages/parser/src/schemas/dynamodb.ts index e33b003535..f6bf57b5b6 100644 --- a/packages/parser/src/schemas/dynamodb.ts +++ b/packages/parser/src/schemas/dynamodb.ts @@ -31,6 +31,19 @@ const DynamoDBStreamRecord = z.object({ userIdentity: UserIdentity.optional(), }); +const DynamoDBStreamToKinesisRecord = DynamoDBStreamRecord.extend({ + recordFormat: z.literal('application/json'), + tableName: z.string(), + userIdentity: UserIdentity.nullish(), + dynamodb: DynamoDBStreamChangeRecord.omit({ + SequenceNumber: true, + StreamViewType: true, + }), +}).omit({ + eventVersion: true, + eventSourceARN: true, +}); + /** * Zod schema for Amazon DynamoDB Stream event. * @@ -111,6 +124,7 @@ const DynamoDBStreamSchema = z.object({ }); export { + DynamoDBStreamToKinesisRecord, DynamoDBStreamSchema, DynamoDBStreamRecord, DynamoDBStreamChangeRecord, diff --git a/packages/parser/src/schemas/index.ts b/packages/parser/src/schemas/index.ts index b50de2aac9..60282489a7 100644 --- a/packages/parser/src/schemas/index.ts +++ b/packages/parser/src/schemas/index.ts @@ -27,7 +27,11 @@ export { KafkaSelfManagedEventSchema, KafkaRecordSchema, } from './kafka.js'; -export { KinesisDataStreamSchema, KinesisDataStreamRecord } from './kinesis.js'; +export { + KinesisDataStreamSchema, + KinesisDynamoDBStreamSchema, + KinesisDataStreamRecord, +} from './kinesis.js'; export { KinesisFirehoseSchema, KinesisFirehoseSqsSchema, diff --git a/packages/parser/src/schemas/kinesis.ts b/packages/parser/src/schemas/kinesis.ts index 39f03a71ae..083f39ddce 100644 --- a/packages/parser/src/schemas/kinesis.ts +++ b/packages/parser/src/schemas/kinesis.ts @@ -1,5 +1,6 @@ import { gunzipSync } from 'node:zlib'; import { z } from 'zod'; +import { DynamoDBStreamToKinesisRecord } from './dynamodb.js'; const KinesisDataStreamRecordPayload = z.object({ kinesisSchemaVersion: z.string(), @@ -7,11 +8,11 @@ const KinesisDataStreamRecordPayload = z.object({ sequenceNumber: z.string(), approximateArrivalTimestamp: z.number(), data: z.string().transform((data) => { - const decompresed = decompress(data); + const decompressed = decompress(data); const decoded = Buffer.from(data, 'base64').toString('utf-8'); try { // If data was not compressed, try to parse it as JSON otherwise it must be string - return decompresed === data ? JSON.parse(decoded) : decompresed; + return decompressed === data ? JSON.parse(decoded) : decompressed; } catch (e) { return decoded; } @@ -37,6 +38,21 @@ const KinesisDataStreamRecord = z.object({ kinesis: KinesisDataStreamRecordPayload, }); +const KinesisDynamoDBStreamSchema = z.object({ + Records: z.array( + KinesisDataStreamRecord.extend({ + kinesis: KinesisDataStreamRecordPayload.extend({ + data: z + .string() + .transform((data) => { + return JSON.parse(Buffer.from(data, 'base64').toString('utf8')); + }) + .pipe(DynamoDBStreamToKinesisRecord), + }), + }) + ), +}); + /** * Zod schema for Kinesis Data Stream event * @@ -88,7 +104,8 @@ const KinesisDataStreamSchema = z.object({ }); export { - KinesisDataStreamSchema, KinesisDataStreamRecord, KinesisDataStreamRecordPayload, + KinesisDataStreamSchema, + KinesisDynamoDBStreamSchema, }; diff --git a/packages/parser/src/types/schema.ts b/packages/parser/src/types/schema.ts index 7c1cf3032b..091ce0e130 100644 --- a/packages/parser/src/types/schema.ts +++ b/packages/parser/src/types/schema.ts @@ -16,6 +16,7 @@ import type { KafkaRecordSchema, KafkaSelfManagedEventSchema, KinesisDataStreamSchema, + KinesisDynamoDBStreamSchema, KinesisFirehoseRecordSchema, KinesisFirehoseSchema, KinesisFirehoseSqsRecordSchema, @@ -79,6 +80,8 @@ type KafkaMskEvent = z.infer; type KinesisDataStreamEvent = z.infer; +type KinesisDynamoDBStreamEvent = z.infer; + type KinesisFireHoseEvent = z.infer; type KinesisFirehoseRecord = z.infer; @@ -136,6 +139,7 @@ export type { KafkaMskEvent, KafkaRecord, KinesisDataStreamEvent, + KinesisDynamoDBStreamEvent, KinesisFireHoseEvent, KinesisFirehoseRecord, KinesisFireHoseSqsEvent, diff --git a/packages/parser/tests/events/kinesis/dynamodb-stream.json b/packages/parser/tests/events/kinesis/dynamodb-stream.json new file mode 100644 index 0000000000..717e70e7e8 --- /dev/null +++ b/packages/parser/tests/events/kinesis/dynamodb-stream.json @@ -0,0 +1,36 @@ +{ + "Records": [ + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "859F7C064A4818874FA67ABEC9BF2AF1", + "sequenceNumber": "49657828409187701520019995242508390119953358325192589314", + "data": "eyJhd3NSZWdpb24iOiJldS13ZXN0LTEiLCJldmVudElEIjoiZDk0MjgwMjktMGY2My00MDU2LTg2ZGEtY2UxMGQ1NDViMWI5IiwiZXZlbnROYW1lIjoiSU5TRVJUIiwidXNlcklkZW50aXR5IjpudWxsLCJyZWNvcmRGb3JtYXQiOiJhcHBsaWNhdGlvbi9qc29uIiwidGFibGVOYW1lIjoiUG93ZXJ0b29sc0V2ZW50c1N0YWNrLUR5bmFtb0RCVGFibGU1OTc4NEZDMC04TktBTVRFUlRBWFkiLCJkeW5hbW9kYiI6eyJBcHByb3hpbWF0ZUNyZWF0aW9uRGF0ZVRpbWUiOjE3MzE5MjQ1NTUzNzAsIktleXMiOnsiaWQiOnsiUyI6InJlY29yZC0xcWl0Mnk4MTlnaSJ9fSwiTmV3SW1hZ2UiOnsiaWQiOnsiUyI6InJlY29yZC0xcWl0Mnk4MTlnaSJ9LCJkYXRhIjp7IlMiOiJkYXRhLXg2YXE3Y2tkcGdrIn19LCJTaXplQnl0ZXMiOjYwfSwiZXZlbnRTb3VyY2UiOiJhd3M6ZHluYW1vZGIifQ==", + "approximateArrivalTimestamp": 1731924555.932 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000000:49657828409187701520019995242508390119953358325192589314", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::1234567789012:role/PowertoolsEventsStack-KinesisConsumerFunctionServic-JG17OEKZaDq6", + "awsRegion": "eu-west-1", + "eventSourceARN": "arn:aws:kinesis:eu-west-1:1234567789012:stream/PowertoolsEventsStack-KinesisStream46752A3E-u0C9B3ZKjgG0" + }, + { + "kinesis": { + "kinesisSchemaVersion": "1.0", + "partitionKey": "6037E47B707479B67E577C989D96E9F8", + "sequenceNumber": "49657828409187701520019995242509599045772972954367295490", + "data": "eyJhd3NSZWdpb24iOiJldS13ZXN0LTEiLCJldmVudElEIjoiYWE1NmNhZDQtMzExYS00NmM4LWFiNWYtYzdhMTNhN2E2Mjk4IiwiZXZlbnROYW1lIjoiSU5TRVJUIiwidXNlcklkZW50aXR5IjpudWxsLCJyZWNvcmRGb3JtYXQiOiJhcHBsaWNhdGlvbi9qc29uIiwidGFibGVOYW1lIjoiUG93ZXJ0b29sc0V2ZW50c1N0YWNrLUR5bmFtb0RCVGFibGU1OTc4NEZDMC04TktBTVRFUlRBWFkiLCJkeW5hbW9kYiI6eyJBcHByb3hpbWF0ZUNyZWF0aW9uRGF0ZVRpbWUiOjE3MzE5MjQ1NTUzNzAsIktleXMiOnsiaWQiOnsiUyI6InJlY29yZC1mdnhuM3E0cTVqdyJ9fSwiTmV3SW1hZ2UiOnsiaWQiOnsiUyI6InJlY29yZC1mdnhuM3E0cTVqdyJ9LCJkYXRhIjp7IlMiOiJkYXRhLTRlb21wanM4OW41In19LCJTaXplQnl0ZXMiOjYwfSwiZXZlbnRTb3VyY2UiOiJhd3M6ZHluYW1vZGIifQ==", + "approximateArrivalTimestamp": 1731924555.935 + }, + "eventSource": "aws:kinesis", + "eventVersion": "1.0", + "eventID": "shardId-000000000000:49657828409187701520019995242509599045772972954367295490", + "eventName": "aws:kinesis:record", + "invokeIdentityArn": "arn:aws:iam::1234567789012:role/PowertoolsEventsStack-KinesisConsumerFunctionServic-JG17OEKZaDq6", + "awsRegion": "eu-west-1", + "eventSourceARN": "arn:aws:kinesis:eu-west-1:1234567789012:stream/PowertoolsEventsStack-KinesisStream46752A3E-u0C9B3ZKjgG0" + } + ] +} diff --git a/packages/parser/tests/unit/schema/kinesis.test.ts b/packages/parser/tests/unit/schema/kinesis.test.ts index 503439c8aa..3ddffb940c 100644 --- a/packages/parser/tests/unit/schema/kinesis.test.ts +++ b/packages/parser/tests/unit/schema/kinesis.test.ts @@ -9,12 +9,14 @@ import { KinesisFirehoseSqsSchema, SqsRecordSchema, } from '../../../src/schemas/'; +import { KinesisDynamoDBStreamSchema } from '../../../src/schemas/kinesis'; import type { KinesisDataStreamEvent, KinesisFireHoseEvent, KinesisFireHoseSqsEvent, } from '../../../src/types'; import type { + KinesisDynamoDBStreamEvent, KinesisFirehoseRecord, KinesisFirehoseSqsRecord, } from '../../../src/types/schema'; @@ -179,6 +181,58 @@ describe('Kinesis ', () => { expect(parsedRecord.eventName).toEqual('aws:kinesis:record'); }); + it('should parse a kinesis record from dynamodb stream event', () => { + const dynamodbStreamKinesisEvent = getTestEvent( + { eventsPath, filename: 'dynamodb-stream' } + ); + const expectedRecords = [ + { + awsRegion: 'eu-west-1', + eventID: 'd9428029-0f63-4056-86da-ce10d545b1b9', + eventName: 'INSERT', + userIdentity: null, + recordFormat: 'application/json', + tableName: 'PowertoolsEventsStack-DynamoDBTable59784FC0-8NKAMTERTAXY', + dynamodb: { + ApproximateCreationDateTime: 1731924555370, + Keys: { id: { S: 'record-1qit2y819gi' } }, + NewImage: { + id: { S: 'record-1qit2y819gi' }, + data: { S: 'data-x6aq7ckdpgk' }, + }, + SizeBytes: 60, + }, + eventSource: 'aws:dynamodb', + }, + { + awsRegion: 'eu-west-1', + eventID: 'aa56cad4-311a-46c8-ab5f-c7a13a7a6298', + eventName: 'INSERT', + userIdentity: null, + recordFormat: 'application/json', + tableName: 'PowertoolsEventsStack-DynamoDBTable59784FC0-8NKAMTERTAXY', + dynamodb: { + ApproximateCreationDateTime: 1731924555370, + Keys: { id: { S: 'record-fvxn3q4q5jw' } }, + NewImage: { + id: { S: 'record-fvxn3q4q5jw' }, + data: { S: 'data-4eompjs89n5' }, + }, + SizeBytes: 60, + }, + eventSource: 'aws:dynamodb', + }, + ]; + + const parsedRecord = KinesisDynamoDBStreamSchema.parse( + dynamodbStreamKinesisEvent + ); + + expect(parsedRecord.Records.map((record) => record.kinesis.data)).toEqual( + expectedRecords + ); + }); + it('should parse a kinesis firehose record from a kinesis firehose event', () => { const kinesisFirehoseEvent: KinesisFireHoseEvent = getTestEvent({ eventsPath, filename: 'firehose' }); From 3526fb422efcc1d9d636038a2d9f240c88cda95c Mon Sep 17 00:00:00 2001 From: Alexander Schueren Date: Mon, 18 Nov 2024 20:26:33 +0100 Subject: [PATCH 3/6] docs(parser): add KinesisDynamoDBStreamSchema to documentation --- docs/utilities/parser.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/utilities/parser.md b/docs/utilities/parser.md index 10a0a9077b..348b1c23bf 100644 --- a/docs/utilities/parser.md +++ b/docs/utilities/parser.md @@ -78,6 +78,7 @@ Parser comes with the following built-in schemas: | **KafkaSelfManagedEventSchema** | Lambda Event Source payload for self managed Kafka payload | | **KinesisDataStreamSchema** | Lambda Event Source payload for Amazon Kinesis Data Streams | | **KinesisFirehoseSchema** | Lambda Event Source payload for Amazon Kinesis Firehose | +| **KinesisDynamoDBStreamSchema** | Lambda Event Source payload for DynamodbStream record wrapped in Kinesis Data stream | | **KinesisFirehoseSqsSchema** | Lambda Event Source payload for SQS messages wrapped in Kinesis Firehose records | | **LambdaFunctionUrlSchema** | Lambda Event Source payload for Lambda Function URL payload | | **S3EventNotificationEventBridgeSchema** | Lambda Event Source payload for Amazon S3 Event Notification to EventBridge. | From 91a55cad786ce92a4c11ba20eccf464923723d48 Mon Sep 17 00:00:00 2001 From: Alexander Schueren Date: Tue, 19 Nov 2024 09:12:42 +0100 Subject: [PATCH 4/6] organised imports --- .../parser/tests/unit/envelopes/kinesis-firehose.test.ts | 5 ++--- packages/parser/tests/unit/envelopes/kinesis.test.ts | 3 +-- packages/parser/tests/unit/schema/kinesis.test.ts | 3 +-- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/packages/parser/tests/unit/envelopes/kinesis-firehose.test.ts b/packages/parser/tests/unit/envelopes/kinesis-firehose.test.ts index c0216cb387..41fa30e0af 100644 --- a/packages/parser/tests/unit/envelopes/kinesis-firehose.test.ts +++ b/packages/parser/tests/unit/envelopes/kinesis-firehose.test.ts @@ -1,14 +1,13 @@ import { generateMock } from '@anatine/zod-mock'; import { describe, expect, it } from 'vitest'; -import { ZodError, type z } from 'zod'; +import { ZodError } from 'zod'; import { ParseError } from '../../../src'; import { KinesisFirehoseEnvelope } from '../../../src/envelopes/index.js'; -import type { KinesisFirehoseSchema } from '../../../src/schemas/'; import type { KinesisFireHoseEvent, KinesisFireHoseSqsEvent, } from '../../../src/types'; -import { TestEvents, TestSchema, getTestEvent } from '../schema/utils.js'; +import { TestSchema, getTestEvent } from '../schema/utils.js'; describe('Kinesis Firehose Envelope', () => { const eventsPath = 'kinesis'; diff --git a/packages/parser/tests/unit/envelopes/kinesis.test.ts b/packages/parser/tests/unit/envelopes/kinesis.test.ts index 02d030ec28..6c4415ed2d 100644 --- a/packages/parser/tests/unit/envelopes/kinesis.test.ts +++ b/packages/parser/tests/unit/envelopes/kinesis.test.ts @@ -1,10 +1,9 @@ import { generateMock } from '@anatine/zod-mock'; -import type { KinesisStreamEvent } from 'aws-lambda'; import { describe, expect, it } from 'vitest'; import { KinesisEnvelope } from '../../../src/envelopes/index.js'; import { ParseError } from '../../../src/errors.js'; import type { KinesisDataStreamEvent } from '../../../src/types/schema.js'; -import { TestEvents, TestSchema, getTestEvent } from '../schema/utils.js'; +import { TestSchema, getTestEvent } from '../schema/utils.js'; describe('KinesisEnvelope', () => { const eventsPath = 'kinesis'; diff --git a/packages/parser/tests/unit/schema/kinesis.test.ts b/packages/parser/tests/unit/schema/kinesis.test.ts index 3ddffb940c..f81fc4f0a4 100644 --- a/packages/parser/tests/unit/schema/kinesis.test.ts +++ b/packages/parser/tests/unit/schema/kinesis.test.ts @@ -7,7 +7,6 @@ import { KinesisFirehoseSchema, KinesisFirehoseSqsRecordSchema, KinesisFirehoseSqsSchema, - SqsRecordSchema, } from '../../../src/schemas/'; import { KinesisDynamoDBStreamSchema } from '../../../src/schemas/kinesis'; import type { @@ -20,7 +19,7 @@ import type { KinesisFirehoseRecord, KinesisFirehoseSqsRecord, } from '../../../src/types/schema'; -import { TestEvents, getTestEvent } from './utils.js'; +import { getTestEvent } from './utils.js'; describe('Kinesis ', () => { const eventsPath = 'kinesis'; From 0be0f2601d3cd48ec5ad81711e64c1865ce6c26c Mon Sep 17 00:00:00 2001 From: Alexander Schueren Date: Wed, 20 Nov 2024 13:07:10 +0100 Subject: [PATCH 5/6] feat(parser): add DynamoDBStreamToKinesisRecord and related types --- packages/parser/src/schemas/index.ts | 5 +- packages/parser/src/types/index.ts | 2 + packages/parser/src/types/schema.ts | 6 + .../parser/tests/unit/schema/kinesis.test.ts | 143 +++++++++--------- 4 files changed, 84 insertions(+), 72 deletions(-) diff --git a/packages/parser/src/schemas/index.ts b/packages/parser/src/schemas/index.ts index 60282489a7..95fa347237 100644 --- a/packages/parser/src/schemas/index.ts +++ b/packages/parser/src/schemas/index.ts @@ -20,7 +20,10 @@ export { CloudWatchLogsDecodeSchema, CloudWatchLogsSchema, } from './cloudwatch.js'; -export { DynamoDBStreamSchema } from './dynamodb.js'; +export { + DynamoDBStreamSchema, + DynamoDBStreamToKinesisRecord, +} from './dynamodb.js'; export { EventBridgeSchema } from './eventbridge.js'; export { KafkaMskEventSchema, diff --git a/packages/parser/src/types/index.ts b/packages/parser/src/types/index.ts index 89dc0b8029..a74259e7eb 100644 --- a/packages/parser/src/types/index.ts +++ b/packages/parser/src/types/index.ts @@ -19,6 +19,7 @@ export type { SnsEvent, SqsEvent, DynamoDBStreamEvent, + DynamoDBStreamToKinesisRecordEvent, CloudWatchLogsEvent, CloudFormationCustomResourceCreateEvent, CloudFormationCustomResourceDeleteEvent, @@ -27,6 +28,7 @@ export type { KafkaSelfManagedEvent, KafkaMskEvent, KinesisDataStreamEvent, + KinesisDynamoDBStreamEvent, KinesisFireHoseEvent, KinesisFireHoseSqsEvent, LambdaFunctionUrlEvent, diff --git a/packages/parser/src/types/schema.ts b/packages/parser/src/types/schema.ts index 091ce0e130..f9084d97ec 100644 --- a/packages/parser/src/types/schema.ts +++ b/packages/parser/src/types/schema.ts @@ -11,6 +11,7 @@ import type { CloudFormationCustomResourceUpdateSchema, CloudWatchLogsSchema, DynamoDBStreamSchema, + DynamoDBStreamToKinesisRecord, EventBridgeSchema, KafkaMskEventSchema, KafkaRecordSchema, @@ -70,6 +71,10 @@ type CloudWatchLogsEvent = z.infer; type DynamoDBStreamEvent = z.infer; +type DynamoDBStreamToKinesisRecordEvent = z.infer< + typeof DynamoDBStreamToKinesisRecord +>; + type EventBridgeEvent = z.infer; type KafkaSelfManagedEvent = z.infer; @@ -134,6 +139,7 @@ export type { CloudFormationCustomResourceUpdateEvent, CloudWatchLogsEvent, DynamoDBStreamEvent, + DynamoDBStreamToKinesisRecordEvent, EventBridgeEvent, KafkaSelfManagedEvent, KafkaMskEvent, diff --git a/packages/parser/tests/unit/schema/kinesis.test.ts b/packages/parser/tests/unit/schema/kinesis.test.ts index f81fc4f0a4..f2c638770a 100644 --- a/packages/parser/tests/unit/schema/kinesis.test.ts +++ b/packages/parser/tests/unit/schema/kinesis.test.ts @@ -23,15 +23,45 @@ import { getTestEvent } from './utils.js'; describe('Kinesis ', () => { const eventsPath = 'kinesis'; - it('should parse kinesis event', () => { - const kinesisStreamEvent = getTestEvent({ + + const kinesisStreamEvent = getTestEvent({ + eventsPath, + filename: 'stream', + }); + + const kinesisStreamEventOneRecord = getTestEvent({ + eventsPath, + filename: 'stream-one-record', + }); + + const kinesisFirehoseEvent = getTestEvent({ + eventsPath, + filename: 'firehose', + }); + + const kinesisFirehosePutEvent = getTestEvent({ + eventsPath, + filename: 'firehose-put', + }); + + const kinesisFirehoseSQSEvent = getTestEvent({ + eventsPath, + filename: 'firehose-sqs', + }); + + const kinesisStreamCloudWatchLogsEvent = getTestEvent( + { eventsPath, - filename: 'stream', - }); - const parsed = KinesisDataStreamSchema.parse(kinesisStreamEvent); + filename: 'stream-cloudwatch-logs', + } + ); + + it('should parse kinesis event', () => { + const testEvent = structuredClone(kinesisStreamEvent); + const parsed = KinesisDataStreamSchema.parse(testEvent); const transformedInput = { - Records: kinesisStreamEvent.Records.map((record, index) => { + Records: testEvent.Records.map((record, index) => { return { ...record, kinesis: { @@ -45,14 +75,11 @@ describe('Kinesis ', () => { expect(parsed).toStrictEqual(transformedInput); }); it('should parse single kinesis record', () => { - const kinesisStreamEventOneRecord = getTestEvent({ - eventsPath, - filename: 'stream-one-record', - }); - const parsed = KinesisDataStreamSchema.parse(kinesisStreamEventOneRecord); + const testEvent = structuredClone(kinesisStreamEventOneRecord); + const parsed = KinesisDataStreamSchema.parse(testEvent); const transformedInput = { - Records: kinesisStreamEventOneRecord.Records.map((record, index) => { + Records: testEvent.Records.map((record, index) => { return { ...record, kinesis: { @@ -68,15 +95,12 @@ describe('Kinesis ', () => { expect(parsed).toStrictEqual(transformedInput); }); it('should parse Firehose event', () => { - const kinesisFirehoseKinesisEvent = getTestEvent({ - eventsPath, - filename: 'firehose', - }); - const parsed = KinesisFirehoseSchema.parse(kinesisFirehoseKinesisEvent); + const testEvent = structuredClone(kinesisFirehoseEvent); + const parsed = KinesisFirehoseSchema.parse(testEvent); const transformedInput = { - ...kinesisFirehoseKinesisEvent, - records: kinesisFirehoseKinesisEvent.records.map((record) => { + ...testEvent, + records: testEvent.records.map((record) => { return { ...record, data: Buffer.from(record.data, 'base64').toString(), @@ -87,16 +111,13 @@ describe('Kinesis ', () => { expect(parsed).toStrictEqual(transformedInput); }); it('should parse Kinesis Firehose PutEvents event', () => { - const kinesisFirehosePutEvent = getTestEvent({ - eventsPath, - filename: 'firehose-put', - }); + const testEvent = structuredClone(kinesisFirehosePutEvent); - const parsed = KinesisFirehoseSchema.parse(kinesisFirehosePutEvent); + const parsed = KinesisFirehoseSchema.parse(testEvent); const transformedInput = { - ...kinesisFirehosePutEvent, - records: kinesisFirehosePutEvent.records.map((record) => { + ...testEvent, + records: testEvent.records.map((record) => { return { ...record, data: Buffer.from(record.data, 'base64').toString(), @@ -107,16 +128,13 @@ describe('Kinesis ', () => { expect(parsed).toStrictEqual(transformedInput); }); it('should parse Firehose event with SQS event', () => { - const kinesisFirehoseSQSEvent = getTestEvent({ - eventsPath, - filename: 'firehose-sqs', - }); + const testEvent = structuredClone(kinesisFirehoseSQSEvent); - const parsed = KinesisFirehoseSqsSchema.parse(kinesisFirehoseSQSEvent); + const parsed = KinesisFirehoseSqsSchema.parse(testEvent); const transformedInput = { - ...kinesisFirehoseSQSEvent, - records: kinesisFirehoseSQSEvent.records.map((record) => { + ...testEvent, + records: testEvent.records.map((record) => { return { ...record, data: JSON.parse( @@ -129,17 +147,12 @@ describe('Kinesis ', () => { expect(parsed).toStrictEqual(transformedInput); }); it('should parse Kinesis event with CloudWatch event', () => { - const kinesisStreamCloudWatchLogsEvent = - getTestEvent({ - eventsPath, - filename: 'stream-cloudwatch-logs', - }); - const parsed = KinesisDataStreamSchema.parse( - kinesisStreamCloudWatchLogsEvent - ); + const testEvent = structuredClone(kinesisStreamCloudWatchLogsEvent); + + const parsed = KinesisDataStreamSchema.parse(testEvent); const transformedInput = { - Records: kinesisStreamCloudWatchLogsEvent.Records.map((record, index) => { + Records: testEvent.Records.map((record, index) => { return { ...record, kinesis: { @@ -157,33 +170,28 @@ describe('Kinesis ', () => { expect(parsed).toStrictEqual(transformedInput); }); it('should return original value if cannot parse KinesisFirehoseSqsRecord', () => { - const kinesisFirehoseSQSEvent = getTestEvent({ - eventsPath, - filename: 'firehose-sqs', - }); - kinesisFirehoseSQSEvent.records[0].data = 'not a valid json'; - const parsed = KinesisFirehoseSqsSchema.parse(kinesisFirehoseSQSEvent); + const testEvent = structuredClone(kinesisFirehoseSQSEvent); + testEvent.records[0].data = 'not a valid json'; + + const parsed = KinesisFirehoseSqsSchema.parse(testEvent); - expect(parsed).toStrictEqual(kinesisFirehoseSQSEvent); + expect(parsed).toStrictEqual(testEvent); }); it('should parse a kinesis record from a kinesis event', () => { - const kinesisStreamEvent: KinesisDataStreamEvent = - getTestEvent({ - eventsPath, - filename: 'stream-one-record', - }); - const parsedRecord = KinesisDataStreamRecord.parse( - kinesisStreamEvent.Records[0] - ); + const testEvent: KinesisDataStreamEvent = + structuredClone(kinesisStreamEvent); + + const parsedRecord = KinesisDataStreamRecord.parse(testEvent.Records[0]); expect(parsedRecord.eventSource).toEqual('aws:kinesis'); expect(parsedRecord.eventName).toEqual('aws:kinesis:record'); }); it('should parse a kinesis record from dynamodb stream event', () => { - const dynamodbStreamKinesisEvent = getTestEvent( - { eventsPath, filename: 'dynamodb-stream' } - ); + const testEvent = getTestEvent({ + eventsPath, + filename: 'dynamodb-stream', + }); const expectedRecords = [ { awsRegion: 'eu-west-1', @@ -223,9 +231,7 @@ describe('Kinesis ', () => { }, ]; - const parsedRecord = KinesisDynamoDBStreamSchema.parse( - dynamodbStreamKinesisEvent - ); + const parsedRecord = KinesisDynamoDBStreamSchema.parse(testEvent); expect(parsedRecord.Records.map((record) => record.kinesis.data)).toEqual( expectedRecords @@ -233,20 +239,15 @@ describe('Kinesis ', () => { }); it('should parse a kinesis firehose record from a kinesis firehose event', () => { - const kinesisFirehoseEvent: KinesisFireHoseEvent = - getTestEvent({ eventsPath, filename: 'firehose' }); + const testEvent = structuredClone(kinesisFirehoseEvent); const parsedRecord: KinesisFirehoseRecord = - KinesisFirehoseRecordSchema.parse(kinesisFirehoseEvent.records[0]); + KinesisFirehoseRecordSchema.parse(testEvent.records[0]); expect(parsedRecord.data).toEqual('Hello World'); }); it('should parse a sqs record from a kinesis firehose event', () => { - const kinesisFireHoseSqsEvent: KinesisFireHoseSqsEvent = - getTestEvent({ - eventsPath, - filename: 'firehose-sqs', - }); + const kinesisFireHoseSqsEvent = structuredClone(kinesisFirehoseSQSEvent); const parsed: KinesisFirehoseSqsRecord = KinesisFirehoseSqsRecordSchema.parse(kinesisFireHoseSqsEvent.records[0]); From d92f5ec3d1052401f0bf84d2ec6929798c74ae9b Mon Sep 17 00:00:00 2001 From: Alexander Schueren Date: Wed, 20 Nov 2024 13:15:33 +0100 Subject: [PATCH 6/6] test(parser): refactor Kinesis Firehose and Stream event tests to use structured clones --- .../unit/envelopes/kinesis-firehose.test.ts | 60 ++++++++----------- .../tests/unit/envelopes/kinesis.test.ts | 27 ++++----- 2 files changed, 35 insertions(+), 52 deletions(-) diff --git a/packages/parser/tests/unit/envelopes/kinesis-firehose.test.ts b/packages/parser/tests/unit/envelopes/kinesis-firehose.test.ts index 41fa30e0af..32d7bb610f 100644 --- a/packages/parser/tests/unit/envelopes/kinesis-firehose.test.ts +++ b/packages/parser/tests/unit/envelopes/kinesis-firehose.test.ts @@ -11,13 +11,25 @@ import { TestSchema, getTestEvent } from '../schema/utils.js'; describe('Kinesis Firehose Envelope', () => { const eventsPath = 'kinesis'; + const kinesisFirehosePutEvent = getTestEvent({ + eventsPath, + filename: 'firehose-put', + }); + + const kinesisFirehoseSQSEvent = getTestEvent({ + eventsPath, + filename: 'firehose-sqs', + }); + + const kinesisFirehoseEvent = getTestEvent({ + eventsPath, + filename: 'firehose', + }); + describe('parse', () => { it('should parse records for PutEvent', () => { const mock = generateMock(TestSchema); - const testEvent = getTestEvent({ - eventsPath, - filename: 'firehose-put', - }); + const testEvent = structuredClone(kinesisFirehosePutEvent); testEvent.records.map((record) => { record.data = Buffer.from(JSON.stringify(mock)).toString('base64'); @@ -29,10 +41,7 @@ describe('Kinesis Firehose Envelope', () => { it('should parse a single record for SQS event', () => { const mock = generateMock(TestSchema); - const testEvent = getTestEvent({ - eventsPath, - filename: 'firehose-sqs', - }); + const testEvent = structuredClone(kinesisFirehoseSQSEvent); testEvent.records.map((record) => { record.data = Buffer.from(JSON.stringify(mock)).toString('base64'); @@ -44,10 +53,7 @@ describe('Kinesis Firehose Envelope', () => { it('should parse records for kinesis event', () => { const mock = generateMock(TestSchema); - const testEvent = getTestEvent({ - eventsPath, - filename: 'firehose', - }); + const testEvent = structuredClone(kinesisFirehoseEvent); testEvent.records.map((record) => { record.data = Buffer.from(JSON.stringify(mock)).toString('base64'); @@ -57,10 +63,7 @@ describe('Kinesis Firehose Envelope', () => { expect(resp).toEqual([mock, mock]); }); it('should throw if record is not base64 encoded', () => { - const testEvent = getTestEvent({ - eventsPath, - filename: 'firehose-put', - }); + const testEvent = structuredClone(kinesisFirehosePutEvent); testEvent.records.map((record) => { record.data = 'not base64 encoded'; @@ -76,10 +79,7 @@ describe('Kinesis Firehose Envelope', () => { }).toThrow(); }); it('should throw when schema does not match record', () => { - const testEvent = getTestEvent({ - eventsPath, - filename: 'firehose-put', - }); + const testEvent = structuredClone(kinesisFirehosePutEvent); testEvent.records.map((record) => { record.data = Buffer.from('not a valid json').toString('base64'); @@ -93,10 +93,7 @@ describe('Kinesis Firehose Envelope', () => { describe('safeParse', () => { it('should parse records for PutEvent', () => { const mock = generateMock(TestSchema); - const testEvent = getTestEvent({ - eventsPath, - filename: 'firehose-put', - }); + const testEvent = structuredClone(kinesisFirehosePutEvent); testEvent.records.map((record) => { record.data = Buffer.from(JSON.stringify(mock)).toString('base64'); @@ -108,10 +105,7 @@ describe('Kinesis Firehose Envelope', () => { it('should parse a single record for SQS event', () => { const mock = generateMock(TestSchema); - const testEvent = getTestEvent({ - eventsPath, - filename: 'firehose-sqs', - }); + const testEvent = structuredClone(kinesisFirehoseSQSEvent); testEvent.records.map((record) => { record.data = Buffer.from(JSON.stringify(mock)).toString('base64'); @@ -123,10 +117,7 @@ describe('Kinesis Firehose Envelope', () => { it('should parse records for kinesis event', () => { const mock = generateMock(TestSchema); - const testEvent = getTestEvent({ - eventsPath, - filename: 'firehose', - }); + const testEvent = structuredClone(kinesisFirehoseEvent); testEvent.records.map((record) => { record.data = Buffer.from(JSON.stringify(mock)).toString('base64'); @@ -151,10 +142,7 @@ describe('Kinesis Firehose Envelope', () => { } }); it('should return original event if record is not base64 encoded', () => { - const testEvent = getTestEvent({ - eventsPath, - filename: 'firehose-put', - }); + const testEvent = structuredClone(kinesisFirehosePutEvent); testEvent.records.map((record) => { record.data = 'not base64 encoded'; diff --git a/packages/parser/tests/unit/envelopes/kinesis.test.ts b/packages/parser/tests/unit/envelopes/kinesis.test.ts index 6c4415ed2d..fc04c4e9d4 100644 --- a/packages/parser/tests/unit/envelopes/kinesis.test.ts +++ b/packages/parser/tests/unit/envelopes/kinesis.test.ts @@ -7,13 +7,16 @@ import { TestSchema, getTestEvent } from '../schema/utils.js'; describe('KinesisEnvelope', () => { const eventsPath = 'kinesis'; + + const kinesisStreamEvent = getTestEvent({ + eventsPath, + filename: 'stream', + }); + describe('parse', () => { it('should parse Kinesis Stream event', () => { const mock = generateMock(TestSchema); - const testEvent = getTestEvent({ - eventsPath, - filename: 'stream', - }); + const testEvent = structuredClone(kinesisStreamEvent); testEvent.Records.map((record) => { record.kinesis.data = Buffer.from(JSON.stringify(mock)).toString( @@ -28,11 +31,9 @@ describe('KinesisEnvelope', () => { expect(() => KinesisEnvelope.parse({ foo: 'bar' }, TestSchema)).toThrow(); }); it('should throw if record is invalid', () => { - const testEvent = getTestEvent({ - eventsPath, - filename: 'stream', - }); + const testEvent = structuredClone(kinesisStreamEvent); testEvent.Records[0].kinesis.data = 'invalid'; + expect(() => KinesisEnvelope.parse(testEvent, TestSchema)).toThrow(); }); }); @@ -40,10 +41,7 @@ describe('KinesisEnvelope', () => { describe('safeParse', () => { it('should parse Kinesis Stream event', () => { const mock = generateMock(TestSchema); - const testEvent = getTestEvent({ - eventsPath, - filename: 'stream', - }); + const testEvent = structuredClone(kinesisStreamEvent); testEvent.Records.map((record) => { record.kinesis.data = Buffer.from(JSON.stringify(mock)).toString( @@ -64,10 +62,7 @@ describe('KinesisEnvelope', () => { }); }); it('should return original event if record is invalid', () => { - const testEvent = getTestEvent({ - eventsPath, - filename: 'stream', - }); + const testEvent = structuredClone(kinesisStreamEvent); testEvent.Records[0].kinesis.data = 'invalid'; const parseResult = KinesisEnvelope.safeParse(testEvent, TestSchema); expect(parseResult).toEqual({