This repository was archived by the owner on Aug 4, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 30
/
Copy pathindex.js
1754 lines (1581 loc) · 61.4 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/*
* Copyright Elasticsearch B.V. and other contributors where applicable.
* Licensed under the BSD 2-Clause License; you may not use this file except in
* compliance with the BSD 2-Clause License.
*/
'use strict'
const assert = require('assert')
const crypto = require('crypto')
const fs = require('fs')
const http = require('http')
const https = require('https')
const util = require('util')
const { performance } = require('perf_hooks')
const { URL } = require('url')
const zlib = require('zlib')
const HttpAgentKeepAlive = require('agentkeepalive')
const HttpsAgentKeepAlive = HttpAgentKeepAlive.HttpsAgent
const Filters = require('object-filter-sequence')
const querystring = require('querystring')
const Writable = require('readable-stream').Writable
const getContainerInfo = require('./lib/container-info')
const eos = require('end-of-stream')
const semver = require('semver')
const streamToBuffer = require('fast-stream-to-buffer')
const StreamChopper = require('stream-chopper')
const { detectHostname } = require('./lib/detect-hostname')
const ndjson = require('./lib/ndjson')
const { NoopLogger } = require('./lib/logging')
const truncate = require('./lib/truncate')
const { getCentralConfigIntervalS } = require('./lib/central-config')
module.exports = Client
// These symbols are used as markers in the client stream to indicate special
// flush handling.
const kFlush = Symbol('flush')
const kLambdaEndFlush = Symbol('lambdaEndFlush')
function isFlushMarker (obj) {
return obj === kFlush || obj === kLambdaEndFlush
}
const requiredOpts = [
'agentName',
'agentVersion',
'serviceName',
'userAgent'
]
// Get handles on uninstrumented functions for making HTTP(S) requests before
// the APM agent has a chance to wrap them. This allows the Client to make
// requests to APM server without interfering with the APM agent's tracing
// of the user application.
const httpGet = http.get
const httpRequest = http.request
const httpsGet = https.get
const httpsRequest = https.request
const containerInfo = getContainerInfo.sync()
const isLambdaExecutionEnvironment = !!process.env.AWS_LAMBDA_FUNCTION_NAME
// All sockets on the agent are unreffed when they are created. This means that
// when the user process's event loop is done, and these are the only handles
// left, the process 'beforeExit' event will be emitted. By listening for this
// we can make sure to end the requests properly before process exit. This way
// we don't keep the process running until the `time` timeout happens.
//
// An exception to this is AWS Lambda which, in some cases (sync function
// handlers that use a callback), will wait for 'beforeExit' to freeze the
// Lambda instance VM *for later re-use*. This means we never want to shutdown
// the `Client` on 'beforeExit'.
const clientsToAutoEnd = []
if (!isLambdaExecutionEnvironment) {
process.once('beforeExit', function () {
clientsToAutoEnd.forEach(function (client) {
if (!client) {
// Clients remove themselves from the array when they end.
return
}
client._gracefulExit()
})
})
}
util.inherits(Client, Writable)
Client.encoding = Object.freeze({
METADATA: Symbol('metadata'),
TRANSACTION: Symbol('transaction'),
SPAN: Symbol('span'),
ERROR: Symbol('error'),
METRICSET: Symbol('metricset')
})
function Client (opts) {
if (!(this instanceof Client)) return new Client(opts)
Writable.call(this, { objectMode: true })
this._corkTimer = null
this._agent = null
this._activeIntakeReq = false
this._onIntakeReqConcluded = null
this._transport = null
this._configTimer = null
this._backoffReconnectCount = 0
this._intakeRequestGracefulExitFn = null // set in makeIntakeRequest
this._encodedMetadata = null
this._cloudMetadata = null
this._extraMetadata = null
this._metadataFilters = new Filters()
// _lambdaActive indicates if a Lambda function invocation is active. It is
// only meaningful if `isLambdaExecutionEnvironment`.
this._lambdaActive = false
// Whether to forward `.lambdaRegisterTransaction()` calls to the Lambda
// extension. This will be set false if a previous attempt failed.
this._lambdaShouldRegisterTransactions = true
// Internal runtime stats for developer debugging/tuning.
this._numEvents = 0 // number of events given to the client
this._numEventsDropped = 0 // number of events dropped because overloaded
this._numEventsEnqueued = 0 // number of events written through to chopper
this.sent = 0 // number of events sent to APM server (not necessarily accepted)
this._slowWriteBatch = { // data on slow or the slowest _writeBatch
numOver10Ms: 0,
// Data for the slowest _writeBatch:
encodeTimeMs: 0,
fullTimeMs: 0,
numEvents: 0,
numBytes: 0
}
this.config(opts)
this._log = this._conf.logger || new NoopLogger()
// `_apmServerVersion` is one of:
// - `undefined`: the version has not yet been fetched
// - `null`: the APM server version is unknown, could not be determined
// - a semver.SemVer instance
this._apmServerVersion = this._conf.apmServerVersion ? semver.SemVer(this._conf.apmServerVersion) : undefined
if (!this._apmServerVersion) {
this._fetchApmServerVersion()
}
const numExtraMdOpts = [
this._conf.cloudMetadataFetcher,
this._conf.expectExtraMetadata,
this._conf.extraMetadata
].reduce((accum, curr) => curr ? accum + 1 : accum, 0)
if (numExtraMdOpts > 1) {
throw new Error('it is an error to configure a Client with more than one of "cloudMetadataFetcher", "expectExtraMetadata", or "extraMetadata"')
} else if (this._conf.cloudMetadataFetcher) {
// Start stream in corked mode, uncork when cloud metadata is fetched and
// assigned. Also, the _maybeUncork will not uncork until _encodedMetadata
// is set.
this._log.trace('corking (cloudMetadataFetcher)')
this.cork()
this._fetchAndEncodeMetadata(() => {
// _fetchAndEncodeMetadata will have set/memoized the encoded
// metadata to the _encodedMetadata property.
// This reverses the cork() call in the constructor above. "Maybe" uncork,
// in case the client has been destroyed before this callback is called.
this._maybeUncork()
this._log.trace('uncorked (cloudMetadataFetcher)')
// the `cloud-metadata` event allows listeners to know when the
// agent has finished fetching and encoding its metadata for the
// first time
this.emit('cloud-metadata', this._encodedMetadata)
})
} else if (this._conf.expectExtraMetadata) {
// Uncorking will happen in the expected `.setExtraMetadata()` call.
this._log.trace('corking (expectExtraMetadata)')
this.cork()
} else if (this._conf.extraMetadata) {
this.setExtraMetadata(this._conf.extraMetadata)
} else {
this._resetEncodedMetadata()
}
this._chopper = new StreamChopper({
size: this._conf.size,
time: this._conf.time,
type: StreamChopper.overflow,
transform () {
return zlib.createGzip({
level: zlib.constants.Z_BEST_SPEED
})
}
})
const onIntakeError = (err) => {
if (this.destroyed === false) {
this.emit('request-error', err)
}
}
this._chopper.on('stream', getChoppedStreamHandler(this, onIntakeError))
// We don't expect the chopper stream to end until the client is ending.
// Make sure to clean up if this does happen unexpectedly.
const fail = () => {
if (this._writableState.ending === false) this.destroy()
}
eos(this._chopper, fail)
this._index = clientsToAutoEnd.length
clientsToAutoEnd.push(this)
// The 'beforeExit' event is significant in Lambda invocation completion
// handling, so we log it for debugging.
if (isLambdaExecutionEnvironment && this._log.isLevelEnabled('trace')) {
process.prependListener('beforeExit', () => {
this._log.trace('process "beforeExit"')
})
}
if (this._conf.centralConfig) {
this._pollConfig()
}
}
// Return current internal stats.
Client.prototype._getStats = function () {
return {
numEvents: this._numEvents,
numEventsDropped: this._numEventsDropped,
numEventsEnqueued: this._numEventsEnqueued,
numEventsSent: this.sent,
slowWriteBatch: this._slowWriteBatch,
backoffReconnectCount: this._backoffReconnectCount
}
}
Client.prototype.config = function (opts) {
this._conf = Object.assign(this._conf || {}, opts)
this._conf.globalLabels = normalizeGlobalLabels(this._conf.globalLabels)
const missing = requiredOpts.filter(name => !this._conf[name])
if (missing.length > 0) throw new Error('Missing required option(s): ' + missing.join(', '))
// default values
if (!this._conf.size && this._conf.size !== 0) this._conf.size = 750 * 1024
if (!this._conf.time && this._conf.time !== 0) this._conf.time = 10000
if (!this._conf.serverTimeout && this._conf.serverTimeout !== 0) this._conf.serverTimeout = 15000
if (!this._conf.serverUrl) this._conf.serverUrl = 'http://127.0.0.1:8200'
if (!this._conf.environment) this._conf.environment = process.env.NODE_ENV || 'development'
if (!this._conf.truncateKeywordsAt) this._conf.truncateKeywordsAt = 1024
if (!this._conf.truncateStringsAt) this._conf.truncateStringsAt = 1024
if (!this._conf.truncateCustomKeysAt) this._conf.truncateCustomKeysAt = 1024
if (!this._conf.truncateLongFieldsAt) this._conf.truncateLongFieldsAt = 10000
// The deprecated `truncateErrorMessagesAt` will be honored if specified.
if (!this._conf.bufferWindowTime) this._conf.bufferWindowTime = 20
if (!this._conf.bufferWindowSize) this._conf.bufferWindowSize = 50
if (!this._conf.maxQueueSize) this._conf.maxQueueSize = 1024
if (!this._conf.intakeResTimeout) this._conf.intakeResTimeout = 10000
if (!this._conf.intakeResTimeoutOnEnd) this._conf.intakeResTimeoutOnEnd = 1000
this._conf.keepAlive = this._conf.keepAlive !== false
this._conf.centralConfig = this._conf.centralConfig || false
if (!('keepAliveMsecs' in this._conf)) this._conf.keepAliveMsecs = 1000
if (!('maxSockets' in this._conf)) this._conf.maxSockets = Infinity
if (!('maxFreeSockets' in this._conf)) this._conf.maxFreeSockets = 256
if (!('freeSocketTimeout' in this._conf)) this._conf.freeSocketTimeout = 4000
// processed values
this._conf.serverUrl = new URL(this._conf.serverUrl)
this._conf.detectedHostname = detectHostname()
if (containerInfo) {
if (!this._conf.containerId && containerInfo.containerId) {
this._conf.containerId = containerInfo.containerId
}
if (!this._conf.kubernetesPodUID && containerInfo.podId) {
this._conf.kubernetesPodUID = containerInfo.podId
}
if (!this._conf.kubernetesPodName && containerInfo.podId) {
// https://kubernetes.io/docs/concepts/workloads/pods/#working-with-pods
// suggests a pod name should just be the shorter "DNS label", and my
// guess is k8s defaults a pod name to just the *short* hostname, not
// the FQDN.
this._conf.kubernetesPodName = this._conf.detectedHostname.split('.', 1)[0]
}
}
let AgentKeepAlive
switch (this._conf.serverUrl.protocol) {
case 'http:':
this._transport = http
this._transportRequest = httpRequest
this._transportGet = httpGet
AgentKeepAlive = HttpAgentKeepAlive
break
case 'https:':
this._transport = https
this._transportRequest = httpsRequest
this._transportGet = httpsGet
AgentKeepAlive = HttpsAgentKeepAlive
break
default:
throw new Error('Unknown protocol ' + this._conf.serverUrl.protocol)
}
// Only reset `this._agent` if the serverUrl has changed to avoid
// unnecessarily abandoning keep-alive connections.
if (!this._agent || (opts && 'serverUrl' in opts)) {
if (this._agent) {
this._agent.destroy()
}
this._agent = new AgentKeepAlive({
keepAlive: this._conf.keepAlive,
keepAliveMsecs: this._conf.keepAliveMsecs,
freeSocketTimeout: this._conf.freeSocketTimeout,
timeout: this._conf.serverTimeout,
maxSockets: this._conf.maxSockets,
maxFreeSockets: this._conf.maxFreeSockets
})
}
// http request options
this._conf.requestIntake = getIntakeRequestOptions(this._conf, this._agent)
this._conf.requestConfig = getConfigRequestOptions(this._conf, this._agent)
this._conf.requestSignalLambdaEnd = getSignalLambdaEndRequestOptions(this._conf, this._agent)
this._conf.requestRegisterTransaction = getRegisterTransactionRequestOptions(this._conf, this._agent)
// fixes bug where cached/memoized _encodedMetadata wouldn't be
// updated when client was reconfigured
if (this._encodedMetadata) {
this._resetEncodedMetadata()
}
}
/**
* Set extra additional metadata to be sent to APM Server in intake requests.
*
* If the Client was configured with `expectExtraMetadata: true` then will
* uncork the client to allow intake requests to begin.
*
* If this is called multiple times, it is additive.
*/
Client.prototype.setExtraMetadata = function (extraMetadata) {
if (!this._extraMetadata) {
this._extraMetadata = extraMetadata
} else {
metadataMergeDeep(this._extraMetadata, extraMetadata)
}
this._resetEncodedMetadata()
if (this._conf.expectExtraMetadata) {
this._log.trace('maybe uncork (expectExtraMetadata)')
this._maybeUncork()
}
}
/**
* Add a filter function used to filter the "metadata" object sent to APM
* server. See the APM Agent `addMetadataFilter` documentation for details.
* https://www.elastic.co/guide/en/apm/agent/nodejs/current/agent-api.html#apm-add-metadata-filter
*/
Client.prototype.addMetadataFilter = function (fn) {
assert.strictEqual(typeof fn, 'function', 'fn arg must be a function')
this._metadataFilters.push(fn)
if (this._encodedMetadata) {
this._resetEncodedMetadata()
}
}
/**
* (Re)set `_encodedMetadata` from this._conf, this._cloudMetadata,
* this._extraMetadata and possible this._metadataFilters.
*/
Client.prototype._resetEncodedMetadata = function () {
// Make a deep clone so that the originals are not modified when (a) adding
// `.cloud` and (b) filtering. This isn't perf-sensitive code, so this JSON
// cycle for cloning should suffice.
let metadata = metadataFromConf(this._conf, this)
if (this._cloudMetadata) {
metadata.cloud = deepClone(this._cloudMetadata)
}
if (this._extraMetadata) {
metadataMergeDeep(metadata, deepClone(this._extraMetadata))
}
// Possible filters from APM agent's `apm.addMetadataFilter()`.
if (this._metadataFilters && this._metadataFilters.length > 0) {
metadata = this._metadataFilters.process(metadata)
}
// This is the only code path that should set `_encodedMetadata`.
this._encodedMetadata = this._encode({ metadata }, Client.encoding.METADATA)
this._log.trace({ _encodedMetadata: this._encodedMetadata }, '_resetEncodedMetadata')
}
Client.prototype._pollConfig = function () {
const opts = this._conf.requestConfig
if (this._conf.lastConfigEtag) {
opts.headers['If-None-Match'] = this._conf.lastConfigEtag
}
const req = this._transportGet(opts, res => {
res.on('error', err => {
// Not sure this event can ever be emitted, but just in case
res.destroy(err)
})
this._scheduleNextConfigPoll(getMaxAge(res))
if (
res.statusCode === 304 || // No new config since last time
res.statusCode === 403 || // Central config not enabled in APM Server
res.statusCode === 404 // Old APM Server that doesn't support central config
) {
res.resume()
return
}
streamToBuffer(res, (err, buf) => {
if (err) {
this.emit('request-error', processConfigErrorResponse(res, buf, err))
return
}
if (res.statusCode === 200) {
// 200: New config available (or no config for the given service.name / service.environment)
const etag = res.headers.etag
if (etag) this._conf.lastConfigEtag = etag
let config
try {
config = JSON.parse(buf)
} catch (parseErr) {
this.emit('request-error', processConfigErrorResponse(res, buf, parseErr))
return
}
this.emit('config', config)
} else {
this.emit('request-error', processConfigErrorResponse(res, buf))
}
})
})
req.on('error', err => {
this._scheduleNextConfigPoll()
this.emit('request-error', err)
})
}
Client.prototype._scheduleNextConfigPoll = function (seconds) {
if (this._configTimer !== null) return
const delayS = getCentralConfigIntervalS(seconds)
this._configTimer = setTimeout(() => {
this._configTimer = null
this._pollConfig()
}, delayS * 1000)
this._configTimer.unref()
}
// re-ref the open socket handles
Client.prototype._ref = function () {
Object.keys(this._agent.sockets).forEach(remote => {
this._agent.sockets[remote].forEach(function (socket) {
socket.ref()
})
})
}
Client.prototype._write = function (obj, enc, cb) {
if (isFlushMarker(obj)) {
this._writeFlush(obj, cb)
} else {
const t = process.hrtime()
const chunk = this._encode(obj, enc)
this._numEventsEnqueued++
this._chopper.write(chunk, cb)
this._log.trace({
fullTimeMs: deltaMs(t),
numEvents: 1,
numBytes: chunk.length
}, '_write: encode object')
}
}
Client.prototype._writev = function (objs, cb) {
// Limit the size of individual writes to manageable batches, primarily to
// limit large sync pauses due to `_encode`ing in `_writeBatch`. This value
// is not particularly well tuned. It was selected to get sync pauses under
// 10ms on a developer machine.
const MAX_WRITE_BATCH_SIZE = 32
let offset = 0
const processBatch = () => {
if (this.destroyed) {
cb()
return
}
let flushIdx = -1
const limit = Math.min(objs.length, offset + MAX_WRITE_BATCH_SIZE)
for (let i = offset; i < limit; i++) {
if (isFlushMarker(objs[i].chunk)) {
flushIdx = i
break
}
}
if (offset === 0 && flushIdx === -1 && objs.length <= MAX_WRITE_BATCH_SIZE) {
// A shortcut if there is no flush marker and the whole `objs` fits in a batch.
this._writeBatch(objs, cb)
} else if (flushIdx === -1) {
// No flush marker in this batch.
this._writeBatch(objs.slice(offset, limit),
limit === objs.length ? cb : processBatch)
offset = limit
} else if (flushIdx > offset) {
// There are some events in the queue before a flush marker.
this._writeBatch(objs.slice(offset, flushIdx), processBatch)
offset = flushIdx
} else if (flushIdx === objs.length - 1) {
// The next item is a flush marker, and it is the *last* item in the queue.
this._writeFlush(objs[flushIdx].chunk, cb)
} else {
// The next item in the queue is a flush.
this._writeFlush(objs[flushIdx].chunk, processBatch)
offset++
}
}
processBatch()
}
// Write a batch of events (excluding specially handled "flush" events) to
// the stream chopper.
Client.prototype._writeBatch = function (objs, cb) {
const t = process.hrtime()
const chunks = []
for (var i = 0; i < objs.length; i++) {
const obj = objs[i]
chunks.push(this._encode(obj.chunk, obj.encoding))
}
const chunk = chunks.join('')
const encodeTimeMs = deltaMs(t)
this._numEventsEnqueued += objs.length
this._chopper.write(chunk, cb)
const fullTimeMs = deltaMs(t)
if (fullTimeMs > this._slowWriteBatch.fullTimeMs) {
this._slowWriteBatch.encodeTimeMs = encodeTimeMs
this._slowWriteBatch.fullTimeMs = fullTimeMs
this._slowWriteBatch.numEvents = objs.length
this._slowWriteBatch.numBytes = chunk.length
}
if (fullTimeMs > 10) {
this._slowWriteBatch.numOver10Ms++
}
this._log.trace({
encodeTimeMs,
fullTimeMs,
numEvents: objs.length,
numBytes: chunk.length
}, '_writeBatch')
}
Client.prototype._writeFlush = function (flushMarker, cb) {
this._log.trace({ activeIntakeReq: this._activeIntakeReq, lambdaEnd: flushMarker === kLambdaEndFlush }, '_writeFlush')
let onFlushed = cb
if (isLambdaExecutionEnvironment && flushMarker === kLambdaEndFlush) {
onFlushed = () => {
// Signal the Elastic AWS Lambda extension that it is done passing data
// for this invocation, then call `cb()` so the wrapped Lambda handler
// can finish.
this._signalLambdaEnd(cb)
}
}
if (this._activeIntakeReq) {
this._onIntakeReqConcluded = onFlushed
this._chopper.chop()
} else {
this._chopper.chop(onFlushed)
}
}
Client.prototype._maybeCork = function () {
if (!this._writableState.corked) {
if (isLambdaExecutionEnvironment && !this._lambdaActive) {
this.cork()
} else if (this._conf.bufferWindowTime !== -1) {
this.cork()
if (this._corkTimer && this._corkTimer.refresh) {
// the refresh function was added in Node 10.2.0
this._corkTimer.refresh()
} else {
this._corkTimer = setTimeout(() => {
this.uncork()
}, this._conf.bufferWindowTime)
}
}
} else if (this._writableState.length >= this._conf.bufferWindowSize) {
this._maybeUncork()
}
}
Client.prototype._maybeUncork = function () {
if (!this._encodedMetadata) {
// The client must remain corked until cloud metadata has been
// fetched-or-skipped.
return
} else if (isLambdaExecutionEnvironment && !this._lambdaActive) {
// In a Lambda env, we must only uncork when an invocation is active,
// otherwise we could start an intake request just before the VM is frozen.
return
}
if (this._writableState.corked) {
// Wait till next tick, so that the current write that triggered the call
// to `_maybeUncork` have time to be added to the queue. If we didn't do
// this, that last write would trigger a single call to `_write`.
process.nextTick(() => {
if (this.destroyed === false && !(isLambdaExecutionEnvironment && !this._lambdaActive)) {
this.uncork()
}
})
if (this._corkTimer) {
clearTimeout(this._corkTimer)
this._corkTimer = null
}
}
}
Client.prototype._encode = function (obj, enc) {
const out = {}
switch (enc) {
case Client.encoding.SPAN:
out.span = truncate.span(obj.span, this._conf)
break
case Client.encoding.TRANSACTION:
out.transaction = truncate.transaction(obj.transaction, this._conf)
break
case Client.encoding.METADATA:
out.metadata = truncate.metadata(obj.metadata, this._conf)
break
case Client.encoding.ERROR:
out.error = truncate.error(obj.error, this._conf)
break
case Client.encoding.METRICSET:
out.metricset = truncate.metricset(obj.metricset, this._conf)
break
}
return ndjson.serialize(out)
}
Client.prototype.lambdaStart = function () {
this._lambdaActive = true
}
/**
* Indicate whether the APM agent -- when in a Lambda environment -- should
* bother calling `.lambdaRegisterTransaction(...)`.
*
* @returns {boolean}
*/
Client.prototype.lambdaShouldRegisterTransactions = function () {
return this._lambdaShouldRegisterTransactions
}
/**
* Tell the local Lambda extension about the just-started transaction. This
* allows the extension to report the transaction in certain error cases
* where the APM agent isn't able to *end* the transaction and report it,
* e.g. if the function is about to timeout, or if the process crashes.
*
* The expected request is as follows, and a 200 status code is expected in
* response:
*
* POST /register/transaction
* Content-Type: application/vnd.elastic.apm.transaction+ndjson
* x-elastic-aws-request-id: ${awsRequestId}
*
* {"metadata":{...}}
* {"transaction":{...partial transaction data...}}
*
* @param {object} trans - a mostly complete APM Transaction object. It should
* have a default `outcome` value. `duration` and `result` (and possibly
* `outcome`) fields will be set by the Elastic Lambda extension if this
* transaction is used.
* @param {import('crypto').UUID} awsRequestId
* @returns {Promise || undefined} So this can, and should, be `await`ed.
* If returning a promise, it will only resolve, never reject.
*/
Client.prototype.lambdaRegisterTransaction = function (trans, awsRequestId) {
if (!isLambdaExecutionEnvironment) {
return
}
if (!this._lambdaShouldRegisterTransactions) {
return
}
assert(this._encodedMetadata, '_encodedMetadata is set')
// We expect to be talking to the localhost Elastic Lambda extension, so we
// want a shorter timeout than `_conf.serverTimeout`.
const TIMEOUT_MS = 5000
const startTime = performance.now()
return new Promise((resolve, reject) => {
this._log.trace({ awsRequestId, traceId: trans.trace_id, transId: trans.id }, 'lambdaRegisterTransaction start')
var out = this._encode({ transaction: trans }, Client.encoding.TRANSACTION)
const finish = errOrErrMsg => {
const durationMs = performance.now() - startTime
if (errOrErrMsg) {
this._log.debug({ awsRequestId, err: errOrErrMsg, durationMs }, 'lambdaRegisterTransaction unsuccessful')
this._lambdaShouldRegisterTransactions = false
} else {
this._log.trace({ awsRequestId, durationMs }, 'lambdaRegisterTransaction success')
}
resolve() // always resolve, never reject
}
// Every `POST /register/transaction` request must set the
// `x-elastic-aws-request-id` header. Instead of creating a new options obj
// each time, we just modify in-place.
this._conf.requestRegisterTransaction.headers['x-elastic-aws-request-id'] = awsRequestId
const req = this._transportRequest(this._conf.requestRegisterTransaction, res => {
res.on('error', err => {
// Not sure this event can ever be emitted, but just in case.
res.destroy(err)
})
res.resume()
if (res.statusCode !== 200) {
finish(`unexpected response status code: ${res.statusCode}`)
return
}
res.on('end', function () {
finish()
})
})
req.setTimeout(TIMEOUT_MS)
req.on('timeout', () => {
req.destroy(new Error(`timeout (${TIMEOUT_MS}ms) registering lambda transaction`))
})
req.on('error', err => {
finish(err)
})
req.write(this._encodedMetadata)
req.write(out)
req.end()
})
}
// With the cork/uncork handling on this stream, `this.write`ing on this
// stream when already destroyed will lead to:
// Error: Cannot call write after a stream was destroyed
// when the `_corkTimer` expires.
Client.prototype._isUnsafeToWrite = function () {
return this.destroyed
}
Client.prototype._shouldDropEvent = function () {
this._numEvents++
const shouldDrop = this._writableState.length >= this._conf.maxQueueSize
if (shouldDrop) {
this._numEventsDropped++
}
return shouldDrop
}
Client.prototype.sendSpan = function (span, cb) {
if (this._isUnsafeToWrite() || this._shouldDropEvent()) {
return
}
this._maybeCork()
return this.write({ span }, Client.encoding.SPAN, cb)
}
Client.prototype.sendTransaction = function (transaction, cb) {
if (this._isUnsafeToWrite() || this._shouldDropEvent()) {
return
}
this._maybeCork()
return this.write({ transaction }, Client.encoding.TRANSACTION, cb)
}
Client.prototype.sendError = function (error, cb) {
if (this._isUnsafeToWrite() || this._shouldDropEvent()) {
return
}
this._maybeCork()
return this.write({ error }, Client.encoding.ERROR, cb)
}
Client.prototype.sendMetricSet = function (metricset, cb) {
if (this._isUnsafeToWrite() || this._shouldDropEvent()) {
return
}
this._maybeCork()
return this.write({ metricset }, Client.encoding.METRICSET, cb)
}
/**
* If possible, start a flush of currently queued APM events to APM server.
*
* "If possible," because there are some guards on uncorking. See `_maybeUncork`.
*
* @param {Object} opts - Optional.
* - {Boolean} opts.lambdaEnd - Optional. Default false. Setting this true
* tells the client to also handle the end of a Lambda function invocation.
* @param {Function} cb - Optional. `cb()` will be called when the data has
* be sent to APM Server (or failed in the attempt).
*/
Client.prototype.flush = function (opts, cb) {
if (typeof opts === 'function') {
cb = opts
opts = {}
} else if (!opts) {
opts = {}
}
const lambdaEnd = !!opts.lambdaEnd
// Write the special "flush" signal. We do this so that the order of writes
// and flushes are kept. If we where to just flush the client right here, the
// internal Writable buffer might still contain data that hasn't yet been
// given to the _write function.
if (lambdaEnd && isLambdaExecutionEnvironment && this._lambdaActive) {
// To flush the current data and ensure that subsequently sent events *in
// the same tick* do not start a new intake request, we must uncork
// synchronously -- rather than the nextTick uncork done in `_maybeUncork()`.
assert(this._encodedMetadata, 'client.flush({lambdaEnd:true}) must not be called before metadata has been set')
const rv = this.write(kLambdaEndFlush, cb)
this.uncork()
this._lambdaActive = false
return rv
} else {
this._maybeUncork()
return this.write(kFlush, cb)
}
}
// A handler that can be called on process "beforeExit" to attempt quick and
// orderly shutdown of the client. It attempts to ensure that the current
// active intake API request to APM server is completed quickly.
Client.prototype._gracefulExit = function () {
this._log.trace('_gracefulExit')
if (this._intakeRequestGracefulExitFn) {
this._intakeRequestGracefulExitFn()
}
// Calling _ref here, instead of relying on the _ref call in `_final`,
// is necessary because `client.end()` does *not* result in the Client's
// `_final()` being called when the process is exiting.
this._ref()
this.end()
}
Client.prototype._final = function (cb) {
this._log.trace('_final')
if (this._configTimer) {
clearTimeout(this._configTimer)
this._configTimer = null
}
clientsToAutoEnd[this._index] = null // remove global reference to ease garbage collection
this._ref()
this._chopper.end()
cb()
}
Client.prototype._destroy = function (err, cb) {
this._log.trace({ err }, '_destroy')
if (this._configTimer) {
clearTimeout(this._configTimer)
this._configTimer = null
}
if (this._corkTimer) {
clearTimeout(this._corkTimer)
this._corkTimer = null
}
clientsToAutoEnd[this._index] = null // remove global reference to ease garbage collection
this._chopper.destroy()
this._agent.destroy()
cb(err)
}
// Return the appropriate backoff delay (in milliseconds) before a next possible
// request to APM server.
// Spec: https://github.com/elastic/apm/blob/main/specs/agents/transport.md#transport-errors
//
// In a Lambda environment, a backoff delay can be harmful: The backoff
// setTimeout is unref'd, to not hold the process open. A subsequent Lambda
// function invocation during that timer will result in no active handles and
// a process "beforeExit" event. That event is interpreted by the Lambda Runtime
// as "the Lambda function callback was never called", and it terminates the
// function and responds with `null`. The solution is to never backoff in a
// Lambda environment -- we expect and assume the Lambda extension is working,
// and pass responsibility for backoff to the extension.
Client.prototype._getBackoffDelay = function (isErr) {
let reconnectCount = this._backoffReconnectCount
if (isErr && !isLambdaExecutionEnvironment) {
this._backoffReconnectCount++
} else {
this._backoffReconnectCount = 0
reconnectCount = 0
}
// min(reconnectCount++, 6) ** 2 ± 10%
const delayS = Math.pow(Math.min(reconnectCount, 6), 2)
const jitterS = delayS * (0.2 * Math.random() - 0.1)
const delayMs = (delayS + jitterS) * 1000
return delayMs
}
function getChoppedStreamHandler (client, onerror) {
// Make a request to the apm-server intake API.
// https://www.elastic.co/guide/en/apm/server/current/events-api.html
//
// In normal operation this works as follows:
// - The StreamChopper (`this._chopper`) calls this function with a newly
// created Gzip stream, to which it writes encoded event data.
// - It `gzipStream.end()`s the stream when:
// (a) approximately `apiRequestSize` of data have been written,
// (b) `apiRequestTime` seconds have passed, or
// (c) `_chopper.chop()` is explicitly called via `client.flush()`,
// e.g. used by the Node.js APM agent after `client.sendError()`.
// - This function makes the HTTP POST to the apm-server, pipes the gzipStream
// to it, and waits for the completion of the request and the apm-server
// response.
// - Then it calls the given `next` callback to signal StreamChopper that
// another chopped stream can be created, when there is more the send.
//
// Of course, things can go wrong. Here are the known ways this pipeline can
// conclude.
// - intake response success - A successful response from the APM server. This
// is the normal operation case described above.
// - gzipStream error - An "error" event on the gzip stream.
// - intake request error - An "error" event on the intake HTTP request, e.g.
// ECONNREFUSED or ECONNRESET.
// - intakeResTimeout - A timer started *after* we are finished sending data
// to the APM server by which we require a response (including its body). By
// default this is 10s -- a very long time to allow for a slow or far
// apm-server. If we hit this, APM server is problematic anyway, so the
// delay doesn't add to the problems.
// - serverTimeout - An idle timeout value (default 30s) set on the socket.
// This is a catch-all fallback for an otherwised wedged connection. If this
// is being hit, there is some major issue in the application (possibly a
// bug in the APM agent).
// - process completion - The Client takes pains to always `.unref()` its
// handles to never keep a using process open if it is ready to exit. When
// the process is ready to exit, the following happens:
// - The "beforeExit" handler above will call `client._gracefulExit()` ...
// - ... which calls `client._ref()` to *hold the process open* to
// complete this request, and `client.end()` to end the `gzipStream` so
// this request can complete soon.
// - We then expect this request to complete quickly and the process will
// then finish exiting. A subtlety is if the APM server is not responding
// then we'll wait on the shorter `intakeResTimeoutOnEnd` (by default 1s).
return function makeIntakeRequest (gzipStream, next) {
const reqId = crypto.randomBytes(16).toString('hex')
const log = client._log.child({ reqId })
const startTime = process.hrtime()
const timeline = []
let bytesWritten = 0
let intakeRes
let intakeReqSocket = null
let intakeResTimer = null
let intakeRequestGracefulExitCalled = false
const intakeResTimeout = client._conf.intakeResTimeout
const intakeResTimeoutOnEnd = client._conf.intakeResTimeoutOnEnd
// `_activeIntakeReq` is used to coordinate the callback to `client.flush(db)`.
client._activeIntakeReq = true
// Handle conclusion of this intake request. Each "part" is expected to call
// `completePart()` at least once -- multiple calls are okay for cases like
// the "error" and "close" events on a stream being called. When a part
// errors or all parts are completed, then we can conclude.
let concluded = false
const completedFromPart = {
gzipStream: false,
intakeReq: false,
intakeRes: false
}
let numToComplete = Object.keys(completedFromPart).length
const completePart = (part, err) => {
log.trace({ err, concluded }, 'completePart %s', part)
timeline.push([deltaMs(startTime), `completePart ${part}`, err && err.message])
assert(part in completedFromPart, `'${part}' is in completedFromPart`)
if (concluded) {
return