Skip to content

fix(parser): CloudWatch Log Envelope handles non-JSON #3505

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 22, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 88 additions & 27 deletions packages/parser/src/envelopes/cloudwatch.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
import type { ZodSchema, z } from 'zod';
import { ZodError, type ZodIssue, type ZodSchema, type z } from 'zod';
import { ParseError } from '../errors.js';
import { CloudWatchLogsSchema } from '../schemas/index.js';
import type { ParsedResult } from '../types/index.js';
import { Envelope, envelopeDiscriminator } from './envelope.js';
import { envelopeDiscriminator } from './envelope.js';

/**
* CloudWatch Envelope to extract a List of log records.
*
* The record's body parameter is a string (after being base64 decoded and gzipped),
* though it can also be a JSON encoded string.
* Regardless of its type it'll be parsed into a BaseModel object.
*
* Note: The record will be parsed the same way so if model is str
* CloudWatch Envelope to extract messages from the `awslogs.data.logEvents` key.
*/
export const CloudWatchEnvelope = {
/**
@@ -22,45 +16,112 @@ export const CloudWatchEnvelope = {
parse<T extends ZodSchema>(data: unknown, schema: T): z.infer<T>[] {
const parsedEnvelope = CloudWatchLogsSchema.parse(data);

return parsedEnvelope.awslogs.data.logEvents.map((record) => {
return Envelope.parse(record.message, schema);
return parsedEnvelope.awslogs.data.logEvents.map((record, index) => {
try {
return schema.parse(record.message);
} catch (error) {
throw new ParseError(
`Failed to parse CloudWatch log event at index ${index}`,
{
cause: new ZodError(
(error as ZodError).issues.map((issue) => ({
...issue,
path: [
'awslogs',
'data',
'logEvents',
index,
'message',
...issue.path,
],
}))
),
}
);
}
});
},

safeParse<T extends ZodSchema>(
data: unknown,
schema: T
): ParsedResult<unknown, z.infer<T>[]> {
const parsedEnvelope = CloudWatchLogsSchema.safeParse(data);
let parsedEnvelope: ParsedResult<unknown, z.infer<T>>;
try {
parsedEnvelope = CloudWatchLogsSchema.safeParse(data);
} catch (error) {
parsedEnvelope = {
success: false,
error: error as Error,
};
}

if (!parsedEnvelope.success) {
return {
success: false,
error: new ParseError('Failed to parse CloudWatch envelope', {
error: new ParseError('Failed to parse CloudWatch Log envelope', {
cause: parsedEnvelope.error,
}),
originalEvent: data,
};
}
const parsedLogEvents: z.infer<T>[] = [];

for (const record of parsedEnvelope.data.awslogs.data.logEvents) {
const parsedMessage = Envelope.safeParse(record.message, schema);
if (!parsedMessage.success) {
return {
success: false,
error: new ParseError('Failed to parse CloudWatch log event', {
cause: parsedMessage.error,
}),
originalEvent: data,
};
const result = parsedEnvelope.data.awslogs.data.logEvents.reduce(
(
acc: {
success: boolean;
messages: z.infer<T>;
errors: { [key: number]: { issues: ZodIssue[] } };
},
record: { message: string },
index: number
) => {
const result = schema.safeParse(record.message);
if (!result.success) {
const issues = result.error.issues.map((issue) => ({
...issue,
path: [
'awslogs',
'data',
'logEvents',
index,
'message',
...issue.path,
],
}));

acc.success = false;
acc.errors[index] = { issues };
return acc;
}

acc.messages.push(result.data);
return acc;
},
{
success: true,
messages: [],
errors: {},
}
parsedLogEvents.push(parsedMessage.data);
);

if (result.success) {
return { success: true, data: result.messages };
}

const errorMessage =
Object.keys(result.errors).length > 1
? `Failed to parse CloudWatch Log messages at indexes ${Object.keys(result.errors).join(', ')}`
: `Failed to parse CloudWatch Log message at index ${Object.keys(result.errors)[0]}`;
const errorCause = new ZodError(
// @ts-expect-error - issues are assigned because success is false
Object.values(result.errors).flatMap((error) => error.issues)
);

return {
success: true,
data: parsedLogEvents,
success: false,
error: new ParseError(errorMessage, { cause: errorCause }),
originalEvent: data,
};
},
};
12 changes: 11 additions & 1 deletion packages/parser/src/errors.ts
Original file line number Diff line number Diff line change
@@ -12,4 +12,14 @@ class ParseError extends Error {
}
}

export { ParseError };
/**
* Custom error thrown when decompression fails.
*/
class DecompressError extends ParseError {
constructor(message: string, options?: { cause?: Error }) {
super(message, options);
this.name = 'DecompressError';
}
}

export { ParseError, DecompressError };
13 changes: 10 additions & 3 deletions packages/parser/src/schemas/cloudwatch.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { gunzipSync } from 'node:zlib';
import { z } from 'zod';
import { DecompressError } from '../errors.js';

const CloudWatchLogEventSchema = z.object({
id: z.string(),
@@ -13,15 +14,21 @@ const CloudWatchLogsDecodeSchema = z.object({
logGroup: z.string(),
logStream: z.string(),
subscriptionFilters: z.array(z.string()),
logEvents: z.array(CloudWatchLogEventSchema),
logEvents: z.array(CloudWatchLogEventSchema).min(1),
});

const decompressRecordToJSON = (
data: string
): z.infer<typeof CloudWatchLogsDecodeSchema> => {
const uncompressed = gunzipSync(Buffer.from(data, 'base64')).toString('utf8');
try {
const uncompressed = gunzipSync(Buffer.from(data, 'base64')).toString(
'utf8'
);

return CloudWatchLogsDecodeSchema.parse(JSON.parse(uncompressed));
return CloudWatchLogsDecodeSchema.parse(JSON.parse(uncompressed));
} catch (error) {
throw new DecompressError('Failed to decompress CloudWatch log data');
}
};

/**
54 changes: 0 additions & 54 deletions packages/parser/tests/events/activeMQEvent.json

This file was deleted.

This file was deleted.

Loading