-
Notifications
You must be signed in to change notification settings - Fork 36
/
Copy pathlistener.ts
316 lines (297 loc) · 12.2 KB
/
listener.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
import { Context } from "aws-lambda";
import { patchHttp, unpatchHttp } from "./patch-http";
import { extractTriggerTags, extractHTTPStatusCodeTag, parseEventSource } from "./trigger";
import { ColdStartTracerConfig, ColdStartTracer } from "./cold-start-tracer";
import { logDebug, tagObject } from "../utils";
import { didFunctionColdStart, isProactiveInitialization } from "../utils/cold-start";
import { datadogLambdaVersion } from "../constants";
import { ddtraceVersion, parentSpanFinishTimeHeader } from "./constants";
import { patchConsole } from "./patch-console";
import { SpanContext, TraceOptions, TracerWrapper } from "./tracer-wrapper";
import { SpanInferrer } from "./span-inferrer";
import { SpanWrapper } from "./span-wrapper";
import { getTraceTree, clearTraceTree } from "../runtime/index";
import { TraceContext, TraceContextService, TraceSource } from "./trace-context-service";
import { StepFunctionContext, StepFunctionContextService } from "./step-function-service";
import { XrayService } from "./xray-service";
import { AUTHORIZING_REQUEST_ID_HEADER } from "./context/extractors/http";
import { getSpanPointerAttributes } from "../utils/span-pointers";
export type TraceExtractor = (event: any, context: Context) => Promise<TraceContext> | TraceContext;
export interface TraceConfig {
/**
* Whether to automatically patch all outgoing http requests with Datadog's hybrid tracing headers.
* @default true.
*/
autoPatchHTTP: boolean;
/**
* Whether to capture the lambda payload and response in Datadog.
*/
captureLambdaPayload: boolean;
/**
* The captured AWS Lambda payloads will become tags of the `aws.lambda` span. This sets how deep
* it fathoms the JSON structure. When the max depth reached, the tag's value will be the
* stringified value of the deeper nested items.
*/
captureLambdaPayloadMaxDepth: number;
/**
* Whether to create inferred spans for managed services
*/
createInferredSpan: boolean;
/**
* Whether to encode trace context in authorizer metadata
*/
encodeAuthorizerContext: boolean;
/**
* Whether to decode trace context in authorizer metadata
*/
decodeAuthorizerContext: boolean;
/**
* Whether to automatically patch console.log with Datadog's tracing ids.
*/
injectLogContext: boolean;
/**
* Whether to merge traces produced from dd-trace with X-Ray
* @default false
*/
mergeDatadogXrayTraces: boolean;
/**
* Custom trace extractor function
*/
traceExtractor?: TraceExtractor;
/**
* Minimum duration dependency to trace
*/
minColdStartTraceDuration: number;
/**
* Libraries to ignore from cold start traces
*/
coldStartTraceSkipLib: string;
}
export class TraceListener {
private contextService: TraceContextService;
private context?: Context;
private stepFunctionContext?: StepFunctionContext;
private tracerWrapper: TracerWrapper;
private inferrer: SpanInferrer;
private inferredSpan?: SpanWrapper;
private wrappedCurrentSpan?: SpanWrapper;
private triggerTags?: { [key: string]: string };
private lambdaSpanParentContext?: SpanContext;
private spanPointerAttributesList: object[] = [];
public get currentTraceHeaders() {
return this.contextService.currentTraceHeaders;
}
constructor(private config: TraceConfig) {
this.tracerWrapper = new TracerWrapper();
this.contextService = new TraceContextService(this.tracerWrapper, this.config);
this.inferrer = new SpanInferrer(this.tracerWrapper);
}
public async onStartInvocation(event: any, context: Context) {
const tracerInitialized = this.tracerWrapper.isTracerAvailable;
if (this.config.injectLogContext) {
patchConsole(console, this.contextService);
logDebug("Patched console output with trace context");
} else {
logDebug("Didn't patch console output with trace context");
}
// If the DD tracer is initialized then it's doing http patching so we don't again here
if (this.config.autoPatchHTTP && !tracerInitialized) {
logDebug("Patching HTTP libraries");
patchHttp(this.contextService);
} else {
logDebug("Not patching HTTP libraries", { autoPatchHTTP: this.config.autoPatchHTTP, tracerInitialized });
}
// The aws.lambda span needs to have a parented to the Datadog trace context from the
// incoming event if available or the X-Ray trace context if hybrid tracing is enabled
const spanContextWrapper = await this.contextService.extract(event, context);
let parentSpanContext: SpanContext | undefined;
if (this.contextService.traceSource === TraceSource.Event || this.config.mergeDatadogXrayTraces) {
parentSpanContext = tracerInitialized ? spanContextWrapper?.spanContext : undefined;
logDebug("Attempting to find parent for the aws.lambda span");
} else {
logDebug("Didn't attempt to find parent for aws.lambda span", {
mergeDatadogXrayTraces: this.config.mergeDatadogXrayTraces,
traceSource: this.contextService.traceSource,
});
}
if (this.config.createInferredSpan) {
this.inferredSpan = this.inferrer.createInferredSpan(
event,
context,
parentSpanContext,
this.config.encodeAuthorizerContext,
);
}
this.lambdaSpanParentContext = this.inferredSpan?.span || parentSpanContext;
this.context = context;
const eventSource = parseEventSource(event);
this.triggerTags = extractTriggerTags(event, context, eventSource);
this.stepFunctionContext = StepFunctionContextService.instance().context;
const result = getSpanPointerAttributes(eventSource, event);
if (result) {
this.spanPointerAttributesList.push(...result);
}
}
/**
* onEndingInvocation runs after the user function has returned
* but before the wrapped function has returned
* this is needed to apply tags to the lambda span
* before it is flushed to logs or extension
*
* @param event
* @param result
* @param shouldTagPayload
*/
public onEndingInvocation(event: any, result: any, isResponseStreamFunction: boolean): boolean {
// Guard clause if something has gone horribly wrong
// so we won't crash user code.
if (!this.tracerWrapper.currentSpan) return false;
this.wrappedCurrentSpan = new SpanWrapper(this.tracerWrapper.currentSpan, {});
if (this.config.captureLambdaPayload) {
tagObject(this.tracerWrapper.currentSpan, "function.request", event, 0, this.config.captureLambdaPayloadMaxDepth);
tagObject(
this.tracerWrapper.currentSpan,
"function.response",
result,
0,
this.config.captureLambdaPayloadMaxDepth,
);
}
const coldStartNodes = getTraceTree();
if (coldStartNodes.length > 0) {
const coldStartConfig: ColdStartTracerConfig = {
tracerWrapper: this.tracerWrapper,
parentSpan:
didFunctionColdStart() || isProactiveInitialization()
? this.inferredSpan || this.wrappedCurrentSpan
: this.wrappedCurrentSpan,
lambdaFunctionName: this.context?.functionName,
currentSpanStartTime: this.wrappedCurrentSpan?.startTime(),
minDuration: this.config.minColdStartTraceDuration,
ignoreLibs: this.config.coldStartTraceSkipLib,
isColdStart: didFunctionColdStart() || isProactiveInitialization(),
};
const coldStartTracer = new ColdStartTracer(coldStartConfig);
coldStartTracer.trace(coldStartNodes);
clearTraceTree();
}
if (this.triggerTags) {
const statusCode = extractHTTPStatusCodeTag(this.triggerTags, result, isResponseStreamFunction);
// Store the status tag in the listener to send to Xray on invocation completion
this.triggerTags["http.status_code"] = statusCode!;
if (this.tracerWrapper.currentSpan) {
this.tracerWrapper.currentSpan.setTag("http.status_code", statusCode);
}
if (this.inferredSpan) {
this.inferredSpan.setTag("http.status_code", statusCode);
if (statusCode?.length === 3 && statusCode?.startsWith("5")) {
this.wrappedCurrentSpan.setTag("error", 1);
return true;
}
}
}
if (this.wrappedCurrentSpan) {
for (const attributes of this.spanPointerAttributesList) {
this.wrappedCurrentSpan.span.addSpanPointer(attributes);
}
}
return false;
}
injectAuthorizerSpan(result: any, requestId: string, finishTime: number): any {
if (!result.context) {
result.context = {};
}
const injectedHeaders = {
...this.tracerWrapper.injectSpan(this.inferredSpan?.span || this.wrappedCurrentSpan?.span),
[parentSpanFinishTimeHeader]: finishTime * 1e6,
// used as the start time in the authorizer span
// padding 1e6 in case this nodejs authorizer is used for a python main lambda function
};
if (requestId) {
// undefined in token-type authorizer
injectedHeaders[AUTHORIZING_REQUEST_ID_HEADER] = requestId;
}
result.context._datadog = Buffer.from(JSON.stringify(injectedHeaders)).toString("base64");
}
public async onCompleteInvocation(error?: any, result?: any, event?: any) {
// Create a new dummy Datadog subsegment for function trigger tags so we
// can attach them to X-Ray spans when hybrid tracing is used
if (this.triggerTags) {
const xray = new XrayService();
xray.addLambdaTriggerTags(this.triggerTags);
}
// If the DD tracer is initialized it manages patching of the http lib on its own
const tracerInitialized = this.tracerWrapper.isTracerAvailable;
if (this.config.autoPatchHTTP && !tracerInitialized) {
logDebug("Unpatching HTTP libraries");
unpatchHttp();
}
let finishTime = this.wrappedCurrentSpan?.endTime();
if (this.inferredSpan) {
logDebug("Finishing inferred span");
if (error && !this.inferredSpan.isAsync()) {
logDebug("Setting error tag to inferred span");
this.inferredSpan.setTag("error", error);
}
if (this.inferredSpan.isAsync()) {
finishTime = this.wrappedCurrentSpan?.startTime() || Date.now();
} else {
finishTime = Date.now();
}
this.inferredSpan.finish(finishTime);
}
if (this.config.encodeAuthorizerContext && result?.principalId && result?.policyDocument) {
// We're in an authorizer, pass on the trace context, requestId and finishTime to make the authorizer span
this.injectAuthorizerSpan(result, event?.requestContext?.requestId, finishTime || Date.now());
}
// Reset singleton
this.stepFunctionContext = undefined;
StepFunctionContextService.reset();
}
public onWrap<T = (...args: any[]) => any>(func: T): T {
const options: TraceOptions = {};
if (this.context) {
logDebug("Creating the aws.lambda span");
const functionArn = (this.context.invokedFunctionArn ?? "").toLowerCase();
const tk = functionArn.split(":");
options.tags = {
cold_start: didFunctionColdStart(),
function_arn: tk.length > 7 ? tk.slice(0, 7).join(":") : functionArn,
function_version: tk.length > 7 ? tk[7] : "$LATEST",
request_id: this.context.awsRequestId,
resource_names: this.context.functionName,
functionname: this.context?.functionName?.toLowerCase(),
datadog_lambda: datadogLambdaVersion,
dd_trace: ddtraceVersion,
};
if (isProactiveInitialization()) {
options.tags.proactive_initialization = true;
}
if (
(this.contextService.traceSource === TraceSource.Xray && this.config.mergeDatadogXrayTraces) ||
this.contextService.traceSource === TraceSource.Event
) {
options.tags["_dd.parent_source"] = this.contextService.traceSource;
}
if (this.triggerTags) {
options.tags = { ...options.tags, ...this.triggerTags };
}
}
if (this.stepFunctionContext) {
logDebug("Applying step function context to the aws.lambda span");
options.tags = {
...options.tags,
...this.stepFunctionContext,
};
}
if (this.lambdaSpanParentContext) {
options.childOf = this.lambdaSpanParentContext;
}
options.type = "serverless";
options.service = "aws.lambda";
if (this.context) {
options.resource = this.context.functionName;
}
return this.tracerWrapper.wrap("aws.lambda", options, func);
}
}