-
Notifications
You must be signed in to change notification settings - Fork 36
/
Copy pathlistener.ts
249 lines (222 loc) · 8.49 KB
/
listener.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
import { StatsD } from "hot-shots";
import { promisify } from "util";
import { logDebug, logError, logWarning } from "../utils";
import { flushExtension, isExtensionRunning } from "./extension";
import { KMSService } from "./kms-service";
import { writeMetricToStdout } from "./metric-log";
import { Distribution } from "./model";
import { Context } from "aws-lambda";
import { getEnhancedMetricTags } from "./enhanced-metrics";
const METRICS_BATCH_SEND_INTERVAL = 10000; // 10 seconds
const HISTORICAL_METRICS_THRESHOLD_HOURS = 4 * 60 * 60 * 1000; // 4 hours
export interface MetricsConfig {
/**
* Whether to retry sending metrics when flushing at the end of the lambda.
* This can potentially delay the completion of your lambda.
* @default false
*/
shouldRetryMetrics: boolean;
/**
* The api key used to talk to the Datadog API. If this is empty, apiKeyKMS key
* will be used instead.
*/
apiKey: string;
/**
* A KMS encrypted api key used to talk to the Datadog API. It will automatically
* be decrypted before any metrics are sent.
*/
apiKeyKMS: string;
/**
* An api key stored in secrets manager used to talk to the Datadog API.
*/
apiKeySecretARN: string;
/**
* The site of the Datadog URL to send to. This should either be 'datadoghq.com', (default),
* or 'datadoghq.eu', for customers in the eu.
* @default "datadoghq.com"
*/
siteURL: string;
/**
* Whether to send metrics to cloud watch for log forwarding, rather than directly to the Datadog
* API. This method requires more setup work, but when enabled won't have any effect on your lambda's performance.
* @default false
*/
logForwarding: boolean;
/**
* Whether to increment invocations and errors Lambda integration metrics from this layer.
* @default false
*/
enhancedMetrics: boolean;
/**
* Whether to call the extension's Flush endpoint in a local test
* Only needed locally, as the extension knows about the end of the invocation
* from the runtime
*/
localTesting: boolean;
}
export class MetricsListener {
private currentProcessor?: Promise<any>;
private apiKey: Promise<string>;
private statsDClient?: StatsD;
private isExtensionRunning?: boolean = undefined;
private globalTags?: string[] = [];
constructor(private kmsClient: KMSService, private config: MetricsConfig) {
this.apiKey = this.getAPIKey(config);
this.config = config;
}
public async onStartInvocation(_: any, context?: Context) {
if (this.isExtensionRunning === undefined) {
this.isExtensionRunning = await isExtensionRunning();
logDebug(`Extension present: ${this.isExtensionRunning}`);
}
if (this.isExtensionRunning) {
logDebug(`Using StatsD client`);
this.globalTags = this.getGlobalTags(context);
// About 200 chars per metric, so 8KB buffer size holds approx 40 metrics per request
this.statsDClient = new StatsD({ host: "127.0.0.1", closingFlushInterval: 1, maxBufferSize: 8192 });
return;
}
if (this.config.logForwarding) {
logDebug(`logForwarding configured`);
return;
}
this.currentProcessor = this.createProcessor(this.config, this.apiKey);
}
public async onCompleteInvocation() {
// Flush any metrics
try {
if (this.currentProcessor !== undefined) {
const processor = await this.currentProcessor;
// After the processor becomes available, it's possible there are some pending
// distribution metric promises. We make sure those promises run
// first before we flush by yielding control of the event loop.
await promisify(setImmediate)();
await processor.flush();
}
if (this.statsDClient !== undefined) {
logDebug(`Flushing statsD`);
// Make sure all stats are flushed to extension
await new Promise<void>((resolve, reject) => {
this.statsDClient?.close((error) => {
if (error !== undefined) {
reject(error);
}
resolve();
});
});
this.statsDClient = undefined;
}
} catch (error) {
// This can fail for a variety of reasons, from the API not being reachable,
// to KMS key decryption failing.
if (error instanceof Error) {
logError("failed to flush metrics", error as Error);
}
}
// Flush only when testing extension locally.
// Passing config flag so we can lazy load the request module.
if (this.isExtensionRunning) {
await flushExtension(this.config.localTesting);
}
this.currentProcessor = undefined;
}
public sendDistributionMetricWithDate(
name: string,
value: number,
metricTime: Date, // TODO: Next breaking change to update to optional or 'Date | undefined'?
forceAsync: boolean,
...tags: string[]
) {
if (this.isExtensionRunning) {
const isMetricTimeValid = Date.parse(metricTime.toString()) > 0;
if (isMetricTimeValid) {
const dateCeiling = new Date(Date.now() - HISTORICAL_METRICS_THRESHOLD_HOURS); // 4 hours ago
if (dateCeiling > metricTime) {
logWarning(`Timestamp ${metricTime.toISOString()} is older than 4 hours, not submitting metric ${name}`);
return;
}
// Only create the processor to submit metrics to the API when a user provides a valid timestamp as
// Dogstatsd does not support timestamps for distributions.
this.currentProcessor = this.createProcessor(this.config, this.apiKey);
// Add global tags to metrics sent to the API
if (this.globalTags !== undefined && this.globalTags.length > 0) {
tags = [...tags, ...this.globalTags];
}
} else {
this.statsDClient?.distribution(name, value, undefined, tags);
return;
}
}
if (this.config.logForwarding || forceAsync) {
writeMetricToStdout(name, value, metricTime, tags);
return;
}
const dist = new Distribution(name, [{ timestamp: metricTime, value }], ...tags);
if (!this.apiKey) {
const errorMessage = "api key not configured, see https://dtdg.co/sls-node-metrics";
logError(errorMessage);
return;
}
if (this.currentProcessor !== undefined) {
// tslint:disable-next-line: no-floating-promises
this.currentProcessor.then((processor) => {
processor.addMetric(dist);
});
} else {
logError("can't send metrics, datadog lambda handler not set up.");
}
}
public sendDistributionMetric(name: string, value: number, forceAsync: boolean, ...tags: string[]) {
// The Extension doesn't support distribution metrics with timestamps. Use sendDistributionMetricWithDate instead.
const metricTime = this.isExtensionRunning ? new Date(0) : new Date(Date.now());
this.sendDistributionMetricWithDate(name, value, metricTime, forceAsync, ...tags);
}
private async createProcessor(config: MetricsConfig, apiKey: Promise<string>) {
if (!this.config.logForwarding) {
const { APIClient } = require("./api");
const { Processor } = require("./processor");
const key = await apiKey;
const url = `https://api.${config.siteURL}`;
const apiClient = new APIClient(key, url);
const processor = new Processor(apiClient, METRICS_BATCH_SEND_INTERVAL, config.shouldRetryMetrics);
processor.startProcessing(this.globalTags);
return processor;
}
}
private async getAPIKey(config: MetricsConfig) {
if (config.apiKey !== "") {
return config.apiKey;
}
if (config.apiKeyKMS !== "") {
try {
return await this.kmsClient.decrypt(config.apiKeyKMS);
} catch (error) {
logError("couldn't decrypt kms api key", error as Error);
}
}
if (config.apiKeySecretARN !== "") {
try {
const { default: secretsClient } = await import("aws-sdk/clients/secretsmanager");
const secretsManager = new secretsClient();
const secret = await secretsManager.getSecretValue({ SecretId: config.apiKeySecretARN }).promise();
return secret?.SecretString ?? "";
} catch (error) {
logError("couldn't get secrets manager api key", error as Error);
}
}
return "";
}
private getGlobalTags(context?: Context) {
const tags = getEnhancedMetricTags(context);
if (context?.invokedFunctionArn) {
const splitArn = context.invokedFunctionArn.split(":");
if (splitArn.length > 7) {
// Get rid of the alias
splitArn.pop();
}
const arn = splitArn.join(":");
tags.push(`function_arn:${arn}`);
}
return tags;
}
}