Skip to content

feat(tracer): Add capability to continue traces comming from SQS triggers #364

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions docs/core/tracer.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Tracer is an opinionated thin wrapper for [AWS X-Ray SDK for Node.js](https://gi
* Auto-disable when not running in AWS Lambda environment
* Support tracing functions via decorators, middleware, and manual instrumentation
* Support tracing AWS SDK v2 and v3 via AWS X-Ray SDK for Node.js
* Support following traces comming from SQS (https://github.com/aws/aws-xray-sdk-node/issues/208)

![Tracer showcase](../media/tracer_utility_showcase.png)

Expand Down Expand Up @@ -254,6 +255,55 @@ You can trace other methods using the `captureMethod` decorator or manual instru

## Advanced

### Following SQS traces

If your lambda is triggered by SQS, it will not use the trace that went through SQS but will create a new one (see https://github.com/aws/aws-xray-sdk-node/issues/208 for more details). This lib will let you easily continue the SQS message's trace:

```typescript hl_lines="8 14-18 20 23 37 41"
import { Tracer } from '@aws-lambda-powertools/tracer';

const tracer = Tracer();

export const handler = async (event: { Records: SQSRecord[] }, context: Context) => {
const handlerExecStartTime = new Date().getTime() / 1000;

const toCloseSegments = Segment[];
// Iterate over received messages
for (const recordIndex in event.Records) {
const record = event.Records[recordIndex];

// Re build lambda segments for this record
const { lambdaSegment, lambdaFunctionSegment, invocationSubsegment } = tracer.continueSQSRecordTrace(
record,
context,
handlerExecStartTime
);

toCloseSegments.push([lambdaSegment, lambdaFunctionSegment, invocationSubsegment]);

// Retrieve current segment: the message processing segment
const messageProcessingSegment = tracer.getSegment();
console.log(`messageProcessingSegment: ${JSON.stringify(messageProcessingSegment)}`);

// use standard helper functions that will apply on current segment (messageProcessingSegment)
tracer.annotateColdStart();
tracer.addServiceNameAnnotation();

// Add custom annotation
messageProcessingSegment.addAnnotation('message_id', record.messageId);

// YOUR PROCESSING LOGIC HERE
// ...

// Close message processing segment to record end_time
messageProcessingSegment.close();
}

// Close lambda segments
toCloseSegments.forEach((segment) => segment.close());
};
```

### Patching AWS SDK clients

Tracer can patch [AWS SDK clients](https://docs.aws.amazon.com/xray/latest/devguide/xray-sdk-nodejs-awssdkclients.html) and create traces when your application makes calls to AWS services.
Expand Down
13,175 changes: 8,522 additions & 4,653 deletions packages/tracing/package-lock.json

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion packages/tracing/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"@types/node": "^16.0.0",
"@typescript-eslint/eslint-plugin": "^5.4.0",
"@typescript-eslint/parser": "^5.4.0",
"aws-cdk": "1.136.0",
"eslint": "^8.3.0",
"eslint-import-resolver-node": "^0.3.6",
"eslint-import-resolver-typescript": "^2.5.0",
Expand All @@ -54,7 +55,10 @@
},
"dependencies": {
"@aws-lambda-powertools/commons": "^0.0.2",
"@aws-sdk/client-sqs": "^3.45.0",
"@middy/core": "^2.5.3",
"aws-xray-sdk-core": "^3.3.3"
"aws-sdk": "^2.1048.0",
"aws-xray-sdk-core": "^3.3.4",
"esbuild": "^0.14.8"
}
}
8 changes: 7 additions & 1 deletion packages/tracing/src/Tracer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Handler } from 'aws-lambda';
import { Context, Handler, SQSRecord } from 'aws-lambda';
import { TracerInterface } from '.';
import { ConfigServiceInterface, EnvironmentVariablesService } from './config';
import { HandlerMethodDecorator, TracerOptions, MethodDecorator } from './types';
Expand Down Expand Up @@ -414,6 +414,12 @@ class Tracer implements TracerInterface {
};
}

public continueSQSRecordTrace(record: SQSRecord, context: Context, handlerExecStartTime?: number): {lambdaSegment?: Segment; lambdaFunctionSegment?: Segment; invocationSubsegment?: Subsegment} {
if (this.tracingEnabled === false) return {};

return this.provider.continueSQSRecordTrace(record, context, handlerExecStartTime);
}

/**
* Retrieve the current value of `ColdStart`.
*
Expand Down
7 changes: 7 additions & 0 deletions packages/tracing/src/middleware/middy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ const captureLambdaHandler = (target: Tracer): middy.MiddlewareObj => {

const close = (): void => {
const subsegment = target.getSegment();
if (subsegment.subsegments) {
for (const subsubSegment of subsegment.subsegments) {
console.log(`Closing Subsegment ${subsubSegment.name}`);
subsubSegment.close();
}
}

subsegment?.close();
target.setSegment(lambdaSegment as Segment);
};
Expand Down
42 changes: 42 additions & 0 deletions packages/tracing/src/provider/ProviderService.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import { ContextMissingStrategy } from 'aws-xray-sdk-core/dist/lib/context_utils';
import { processTraceData } from 'aws-xray-sdk-core/dist/lib/utils';
import { Namespace } from 'cls-hooked';
import { ProviderServiceInterface } from '.';
import { captureAWS, captureAWSClient, captureAWSv3Client, captureAsyncFunc, captureFunc, getNamespace, getSegment, setSegment, Segment, Subsegment, setContextMissingStrategy, setDaemonAddress, setLogger, Logger } from 'aws-xray-sdk-core';
import { Context, SQSRecord } from 'aws-lambda';

class ProviderService implements ProviderServiceInterface {

Expand All @@ -27,6 +29,46 @@ class ProviderService implements ProviderServiceInterface {
return captureFunc(name, fcn);
}

public continueSQSRecordTrace(record: SQSRecord, context: Context, handlerExecStartTime?: number): {lambdaSegment: Segment; lambdaFunctionSegment: Segment; invocationSubsegment: Subsegment} {
if (! record.attributes.AWSTraceHeader) {
throw new Error(`No trace header found in record ${record.messageId}. can't follow trace ... skipping`);
}

const traceHeaderStr = record.attributes.AWSTraceHeader;
const traceData = processTraceData(traceHeaderStr);

const functionName = process.env.AWS_LAMBDA_FUNCTION_NAME;
const lambdaSegment = new Segment(functionName!, traceData.root, traceData.parent);
lambdaSegment.origin = 'AWS::Lambda';
lambdaSegment.start_time = parseInt(record.attributes.ApproximateFirstReceiveTimestamp) / 1000;
lambdaSegment.addPluginData({
request_id: context.awsRequestId,
});

const lambdaFunctionSegment = new Segment(functionName!, lambdaSegment.trace_id, lambdaSegment.id);
lambdaFunctionSegment.origin = 'AWS::Lambda::Function';
lambdaFunctionSegment.start_time = parseInt(record.attributes.ApproximateFirstReceiveTimestamp) / 1000;
lambdaFunctionSegment.addPluginData({
function_arn: context.invokedFunctionArn,
resource_names: ['Consumer']
});

const invocationSubsegment = lambdaFunctionSegment.addNewSubsegment('Invocation');
invocationSubsegment.start_time = handlerExecStartTime ? handlerExecStartTime : lambdaFunctionSegment.start_time;
const previousProcessingSegment = invocationSubsegment.addNewSubsegment(`## previous processing`);
previousProcessingSegment.start_time = invocationSubsegment.start_time;
previousProcessingSegment.close();
const messageProcessingSegment = invocationSubsegment.addNewSubsegment(`## processing - ${record.messageId}`);

this.setSegment(messageProcessingSegment);

return {
invocationSubsegment,
lambdaSegment,
lambdaFunctionSegment,
};
}

public getNamespace(): Namespace {
return getNamespace();
}
Expand Down
2 changes: 2 additions & 0 deletions packages/tracing/src/provider/ProviderServiceInterface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ interface ProviderServiceInterface {
captureFunc(name: string, fcn: (subsegment?: Subsegment) => unknown, parent?: Segment | Subsegment): unknown

captureAsyncFunc(name: string, fcn: (subsegment?: Subsegment) => unknown, parent?: Segment | Subsegment): unknown

continueSQSRecordTrace(record: unknown, context: unknown, handlerExecStartTime?: number): {lambdaSegment: Segment; lambdaFunctionSegment: Segment; invocationSubsegment: Subsegment}
}

export {
Expand Down
82 changes: 82 additions & 0 deletions packages/tracing/tests/e2e/tracer.test.DecoratorDisabled.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import { Tracer } from '../../src';
import { Callback, Context } from 'aws-lambda';
import { STSClient, GetCallerIdentityCommand } from '@aws-sdk/client-sts';
// eslint-disable-next-line @typescript-eslint/no-var-requires
let AWS = require('aws-sdk');

const serviceName = process.env.EXPECTED_SERVICE_NAME ?? 'MyFunctionWithStandardHandler';
const customAnnotationKey = process.env.EXPECTED_CUSTOM_ANNOTATION_KEY ?? 'myAnnotation';
const customAnnotationValue = process.env.EXPECTED_CUSTOM_ANNOTATION_VALUE ?? 'myValue';
const customMetadataKey = process.env.EXPECTED_CUSTOM_METADATA_KEY ?? 'myMetadata';
const customMetadataValue = JSON.parse(process.env.EXPECTED_CUSTOM_METADATA_VALUE) ?? { bar: 'baz' };
const customResponseValue = JSON.parse(process.env.EXPECTED_CUSTOM_RESPONSE_VALUE) ?? { foo: 'bar' };
const customErrorMessage = process.env.EXPECTED_CUSTOM_ERROR_MESSAGE ?? 'An error has occurred';

interface CustomEvent {
throw: boolean
sdkV2: string
invocation: number
}

// Function that refreshes imports to ensure that we are instrumenting only one version of the AWS SDK v2 at a time.
const refreshAWSSDKImport = (): void => {
// Clean up the require cache to ensure we're using a newly imported version of the AWS SDK v2
for (const key in require.cache) {
if (key.indexOf('/aws-sdk/') !== -1) {
delete require.cache[key];
}
}
// eslint-disable-next-line @typescript-eslint/no-var-requires
AWS = require('aws-sdk');
};

const tracer = new Tracer({ serviceName: serviceName, enabled: false });
const stsv3 = tracer.captureAWSv3Client(new STSClient({}));

export class MyFunctionWithDecorator {
@tracer.captureLambdaHandler()
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
public handler(event: CustomEvent, _context: Context, _callback: Callback<unknown>): void | Promise<unknown> {
tracer.putAnnotation(customAnnotationKey, customAnnotationValue);
tracer.putMetadata(customMetadataKey, customMetadataValue);

let stsv2;
refreshAWSSDKImport();
if (event.sdkV2 === 'client') {
stsv2 = tracer.captureAWSClient(new AWS.STS());
} else if (event.sdkV2 === 'all') {
AWS = tracer.captureAWS(AWS);
stsv2 = new AWS.STS();
}

return Promise.all([
stsv2.getCallerIdentity().promise(),
stsv3.send(new GetCallerIdentityCommand({})),
new Promise((resolve, reject) => {
setTimeout(() => {
const res = this.myMethod();
if (event.throw) {
reject(new Error(customErrorMessage));
} else {
resolve(res);
}
}, 2000); // We need to wait for to make sure previous calls are finished
})
])
.then(([ _stsv2Res, _stsv3Res, promiseRes ]) => promiseRes)
.catch((err) => {
throw err;
});
}

@tracer.captureMethod()
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
public myMethod(): string {
return customResponseValue;
}
}

export const handlerClass = new MyFunctionWithDecorator();
export const handler = handlerClass.handler;
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import { Tracer } from '../../src';
import { Callback, Context } from 'aws-lambda';
import { STSClient, GetCallerIdentityCommand } from '@aws-sdk/client-sts';
// eslint-disable-next-line @typescript-eslint/no-var-requires
let AWS = require('aws-sdk');

const serviceName = process.env.EXPECTED_SERVICE_NAME ?? 'MyFunctionWithStandardHandler';
const customAnnotationKey = process.env.EXPECTED_CUSTOM_ANNOTATION_KEY ?? 'myAnnotation';
const customAnnotationValue = process.env.EXPECTED_CUSTOM_ANNOTATION_VALUE ?? 'myValue';
const customMetadataKey = process.env.EXPECTED_CUSTOM_METADATA_KEY ?? 'myMetadata';
const customMetadataValue = JSON.parse(process.env.EXPECTED_CUSTOM_METADATA_VALUE) ?? { bar: 'baz' };
const customResponseValue = JSON.parse(process.env.EXPECTED_CUSTOM_RESPONSE_VALUE) ?? { foo: 'bar' };
const customErrorMessage = process.env.EXPECTED_CUSTOM_ERROR_MESSAGE ?? 'An error has occurred';

interface CustomEvent {
throw: boolean
sdkV2: string
invocation: number
}

// Function that refreshes imports to ensure that we are instrumenting only one version of the AWS SDK v2 at a time.
const refreshAWSSDKImport = (): void => {
// Clean up the require cache to ensure we're using a newly imported version of the AWS SDK v2
for (const key in require.cache) {
if (key.indexOf('/aws-sdk/') !== -1) {
delete require.cache[key];
}
}
// eslint-disable-next-line @typescript-eslint/no-var-requires
AWS = require('aws-sdk');
};

// Disable capture errors & responses for this test
process.env.POWERTOOLS_TRACER_CAPTURE_RESPONSE = 'false';
process.env.POWERTOOLS_TRACER_CAPTURE_ERROR = 'false';
const tracer = new Tracer({ serviceName: serviceName });
const stsv3 = tracer.captureAWSv3Client(new STSClient({}));

export class MyFunctionWithDecorator {
@tracer.captureLambdaHandler()
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
public handler(event: CustomEvent, _context: Context, _callback: Callback<unknown>): void | Promise<unknown> {
tracer.putAnnotation(customAnnotationKey, customAnnotationValue);
tracer.putMetadata(customMetadataKey, customMetadataValue);

let stsv2;
refreshAWSSDKImport();
if (event.sdkV2 === 'client') {
stsv2 = tracer.captureAWSClient(new AWS.STS());
} else if (event.sdkV2 === 'all') {
AWS = tracer.captureAWS(AWS);
stsv2 = new AWS.STS();
}

return Promise.all([
stsv2.getCallerIdentity().promise(),
stsv3.send(new GetCallerIdentityCommand({})),
new Promise((resolve, reject) => {
setTimeout(() => {
const res = this.myMethod();
if (event.throw) {
reject(new Error(customErrorMessage));
} else {
resolve(res);
}
}, 2000); // We need to wait for to make sure previous calls are finished
})
])
.then(([ _stsv2Res, _stsv3Res, promiseRes ]) => promiseRes)
.catch((err) => {
throw err;
});
}

@tracer.captureMethod()
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
public myMethod(): string {
return customResponseValue;
}
}

export const handlerClass = new MyFunctionWithDecorator();
export const handler = handlerClass.handler;
Loading