Skip to content

fix(parser): Kafka Envelope + tests #3489

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jan 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 59 additions & 28 deletions packages/parser/src/envelopes/kafka.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,33 @@
import type { ZodSchema, z } from 'zod';
import { ZodError, type ZodIssue, type ZodSchema, z } from 'zod';
import { ParseError } from '../errors.js';
import {
KafkaMskEventSchema,
KafkaSelfManagedEventSchema,
} from '../schemas/kafka.js';
import type { KafkaMskEvent, ParsedResult } from '../types/index.js';
import { Envelope, envelopeDiscriminator } from './envelope.js';
import { envelopeDiscriminator } from './envelope.js';

/**
* Get the event source from the data.
*
* Before we can access the event source, we need to parse the data with a minimal schema.
*
* @param data - The data to extract the event source from
*/
const extractEventSource = (
data: unknown
): 'aws:kafka' | 'SelfManagedKafka' => {
const verifiedData = z
.object({
eventSource: z.union([
z.literal('aws:kafka'),
z.literal('SelfManagedKafka'),
]),
})
.parse(data);

return verifiedData.eventSource;
};

/**
* Kafka event envelope to extract data within body key
Expand All @@ -15,29 +37,28 @@ import { Envelope, envelopeDiscriminator } from './envelope.js';
* Note: Records will be parsed the same way so if model is str,
* all items in the list will be parsed as str and not as JSON (and vice versa)
*/

export const KafkaEnvelope = {
/**
* This is a discriminator to differentiate whether an envelope returns an array or an object
* @hidden
*/
[envelopeDiscriminator]: 'array' as const,
parse<T extends ZodSchema>(data: unknown, schema: T): z.infer<T>[] {
// manually fetch event source to decide between Msk or SelfManaged
const eventSource = (data as KafkaMskEvent).eventSource;
const eventSource = extractEventSource(data);

const parsedEnvelope:
| z.infer<typeof KafkaMskEventSchema>
| z.infer<typeof KafkaSelfManagedEventSchema> =
const parsedEnvelope =
eventSource === 'aws:kafka'
? KafkaMskEventSchema.parse(data)
: KafkaSelfManagedEventSchema.parse(data);

return Object.values(parsedEnvelope.records).map((topicRecord) => {
return topicRecord.map((record) => {
return Envelope.parse(record.value, schema);
});
});
const values: z.infer<T>[] = [];
for (const topicRecord of Object.values(parsedEnvelope.records)) {
for (const record of topicRecord) {
values.push(schema.parse(record.value));
}
}

return values;
},

safeParse<T extends ZodSchema>(
Expand All @@ -61,27 +82,37 @@ export const KafkaEnvelope = {
originalEvent: data,
};
}
const parsedRecords: z.infer<T>[] = [];

for (const topicRecord of Object.values(parsedEnvelope.data.records)) {
const values: z.infer<T>[] = [];
const issues: ZodIssue[] = [];
for (const [topicKey, topicRecord] of Object.entries(
parsedEnvelope.data.records
)) {
for (const record of topicRecord) {
const parsedRecord = Envelope.safeParse(record.value, schema);
const parsedRecord = schema.safeParse(record.value);
if (!parsedRecord.success) {
return {
success: false,
error: new ParseError('Failed to parse Kafka record', {
cause: parsedRecord.error,
}),
originalEvent: data,
};
issues.push(
...(parsedRecord.error as ZodError).issues.map((issue) => ({
...issue,
path: ['records', topicKey, ...issue.path],
}))
);
}
parsedRecords.push(parsedRecord.data);
values.push(parsedRecord.data);
}
}

return {
success: true,
data: parsedRecords,
};
return issues.length > 0
? {
success: false,
error: new ParseError('Failed to parse Kafka envelope', {
cause: new ZodError(issues),
}),
originalEvent: data,
}
: {
success: true,
data: values,
};
},
};
6 changes: 2 additions & 4 deletions packages/parser/src/schemas/kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@ const KafkaRecordSchema = z.object({
const KafkaBaseEventSchema = z.object({
bootstrapServers: z
.string()
.transform((bootstrapServers) => {
return bootstrapServers ? bootstrapServers.split(',') : undefined;
})
.transform((bootstrapServers) => bootstrapServers.split(','))
.nullish(),
records: z.record(z.string(), z.array(KafkaRecordSchema)),
records: z.record(z.string(), z.array(KafkaRecordSchema).min(1)),
});

/** Zod schema for Kafka event from Self Managed Kafka
Expand Down
22 changes: 0 additions & 22 deletions packages/parser/tests/events/kafkaEventSelfManaged.json

This file was deleted.

179 changes: 107 additions & 72 deletions packages/parser/tests/unit/envelopes/kafka.test.ts
Original file line number Diff line number Diff line change
@@ -1,95 +1,130 @@
import { generateMock } from '@anatine/zod-mock';
import type { MSKEvent, SelfManagedKafkaEvent } from 'aws-lambda';
import { describe, expect, it } from 'vitest';
import { ParseError } from '../../../src';
import { ZodError, z } from 'zod';
import { KafkaEnvelope } from '../../../src/envelopes/index.js';
import { TestEvents, TestSchema } from '../schema/utils.js';

describe('Kafka', () => {
describe('parse', () => {
it('should parse MSK kafka envelope', () => {
const mock = generateMock(TestSchema);

const kafkaEvent = TestEvents.kafkaEventMsk as MSKEvent;
kafkaEvent.records['mytopic-0'][0].value = Buffer.from(
JSON.stringify(mock)
).toString('base64');
import { ParseError } from '../../../src/errors.js';
import { JSONStringified } from '../../../src/helpers.js';
import { getTestEvent } from '../schema/utils.js';

describe('Envelope: Kafka', () => {
const baseEvent = getTestEvent({
eventsPath: 'kafka',
filename: 'base',
});

const result = KafkaEnvelope.parse(kafkaEvent, TestSchema);
describe('Method: parse', () => {
it('throws if the payload of the value does not match the schema', () => {
// Prepare
const event = structuredClone(baseEvent);

expect(result).toEqual([[mock]]);
// Act & Assess
expect(() => KafkaEnvelope.parse(event, z.number())).toThrow();
});

it('should parse Self Managed kafka envelope', () => {
const mock = generateMock(TestSchema);

const kafkaEvent =
TestEvents.kafkaEventSelfManaged as SelfManagedKafkaEvent;
kafkaEvent.records['mytopic-0'][0].value = Buffer.from(
JSON.stringify(mock)
).toString('base64');
it('parses a Kafka event', () => {
// Prepare
const event = structuredClone(baseEvent);

const result = KafkaEnvelope.parse(kafkaEvent, TestSchema);
// Act
const result = KafkaEnvelope.parse(event, z.string());

expect(result).toEqual([[mock]]);
// Assess
expect(result).toEqual(['{"key":"value"}']);
});

describe('safeParse', () => {
it('should parse MSK kafka envelope', () => {
const mock = generateMock(TestSchema);
it('parses a Kafka event and applies the schema transformation', () => {
// Prepare
const event = structuredClone(baseEvent);

const kafkaEvent = TestEvents.kafkaEventMsk as MSKEvent;
kafkaEvent.records['mytopic-0'][0].value = Buffer.from(
JSON.stringify(mock)
).toString('base64');
// Act
const result = KafkaEnvelope.parse(
event,
JSONStringified(z.object({ key: z.string() }))
);

const result = KafkaEnvelope.safeParse(kafkaEvent, TestSchema);

expect(result).toEqual({
success: true,
data: [mock],
});
});

it('should parse Self Managed kafka envelope', () => {
const mock = generateMock(TestSchema);
// Assess
expect(result).toEqual([{ key: 'value' }]);
});

const kafkaEvent =
TestEvents.kafkaEventSelfManaged as SelfManagedKafkaEvent;
kafkaEvent.records['mytopic-0'][0].value = Buffer.from(
JSON.stringify(mock)
).toString('base64');
it('parses a self managed Kafka event', () => {
// Prepare
const event = structuredClone(baseEvent);
event.eventSource = 'SelfManagedKafka';

const result = KafkaEnvelope.safeParse(kafkaEvent, TestSchema);
// Act
const result = KafkaEnvelope.parse(event, z.string());

expect(result).toEqual({
success: true,
data: [mock],
});
});
// Assess
expect(result).toEqual(['{"key":"value"}']);
});
});

it('should return original event on failure', () => {
const kafkaEvent = TestEvents.kafkaEventMsk as MSKEvent;
kafkaEvent.records['mytopic-0'][0].value = 'not a valid json';
describe('Method: safeParse', () => {
it('parses a Kafka event', () => {
// Prepare
const event = structuredClone(baseEvent);

const parseResult = KafkaEnvelope.safeParse(kafkaEvent, TestSchema);
// Act
const result = KafkaEnvelope.safeParse(event, z.string());

expect(parseResult).toEqual({
success: false,
error: expect.any(ParseError),
originalEvent: kafkaEvent,
});
// Assess
expect(result).toEqual({
success: true,
data: ['{"key":"value"}'],
});
});

if (!parseResult.success && parseResult.error) {
expect(parseResult.error.cause).toBeInstanceOf(SyntaxError);
}
it('returns an error if the event is not a valid Kafka event', () => {
// Prepare
const event = structuredClone(baseEvent);
event.eventSource = 'SelfManagedKafka';
// @ts-expect-error - Intentionally invalid event
event.records['mytopic-0'] = [];

// Act
const result = KafkaEnvelope.safeParse(event, z.string());

// Assess
expect(result).toEqual({
success: false,
error: new ParseError('Failed to parse Kafka envelope', {
cause: new ZodError([
{
code: 'too_small',
minimum: 1,
type: 'array',
inclusive: true,
exact: false,
message: 'Array must contain at least 1 element(s)',
path: ['records', 'mytopic-0'],
},
]),
}),
originalEvent: event,
});
it('should return original event and error if envelope is invalid', () => {
expect(KafkaEnvelope.safeParse({ foo: 'bar' }, TestSchema)).toEqual({
success: false,
error: expect.any(ParseError),
originalEvent: { foo: 'bar' },
});
});

it('returns the original event and the error if the payload of the value does not match the schema', () => {
// Prepare
const event = structuredClone(baseEvent);

// Act
const result = KafkaEnvelope.safeParse(event, z.number());

// Assess
expect(result).toEqual({
success: false,
error: new ParseError('Failed to parse Kafka envelope', {
cause: new ZodError([
{
code: 'invalid_type',
expected: 'number',
received: 'string',
path: ['records', 'mytopic-0'],
message: 'Expected number, received string',
},
]),
}),
originalEvent: event,
});
});
});
Expand Down
Loading
Loading