Skip to content

Commit ea04184

Browse files
mcollinaRafaelGSS
authored andcommitted
worker: add worker.getHeapStatistics()
Adds worker.getHeapStatistics() so that the heap usage of the worker could be observer from the parent thread. Signed-off-by: Matteo Collina <[email protected]> PR-URL: #57888 Reviewed-By: Yagiz Nizipli <[email protected]> Reviewed-By: Chengzhong Wu <[email protected]> Reviewed-By: Darshan Sen <[email protected]> Reviewed-By: Stephen Belanger <[email protected]>
1 parent 0f55a96 commit ea04184

File tree

9 files changed

+219
-0
lines changed

9 files changed

+219
-0
lines changed

doc/api/worker_threads.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1337,6 +1337,18 @@ If the Worker thread is no longer running, which may occur before the
13371337
[`'exit'` event][] is emitted, the returned `Promise` is rejected
13381338
immediately with an [`ERR_WORKER_NOT_RUNNING`][] error.
13391339

1340+
### `worker.getHeapStatistics()`
1341+
1342+
<!-- YAML
1343+
added: REPLACEME
1344+
-->
1345+
1346+
* Returns: {Promise}
1347+
1348+
This method returns a `Promise` that will resolve to an object identical to [`v8.getHeapStatistics()`][],
1349+
or reject with an [`ERR_WORKER_NOT_RUNNING`][] error if the worker is no longer running.
1350+
This methods allows the statistics to be observed from outside the actual thread.
1351+
13401352
### `worker.performance`
13411353

13421354
<!-- YAML
@@ -1631,6 +1643,7 @@ thread spawned will spawn another until the application crashes.
16311643
[`require('node:worker_threads').workerData`]: #workerworkerdata
16321644
[`trace_events`]: tracing.md
16331645
[`v8.getHeapSnapshot()`]: v8.md#v8getheapsnapshotoptions
1646+
[`v8.getHeapStatistics()`]: v8.md#v8getheapstatistics
16341647
[`vm`]: vm.md
16351648
[`worker.SHARE_ENV`]: #workershare_env
16361649
[`worker.on('message')`]: #event-message_1

lib/internal/worker.js

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,17 @@ class Worker extends EventEmitter {
459459
};
460460
});
461461
}
462+
463+
getHeapStatistics() {
464+
const taker = this[kHandle]?.getHeapStatistics();
465+
466+
return new Promise((resolve, reject) => {
467+
if (!taker) return reject(new ERR_WORKER_NOT_RUNNING());
468+
taker.ondone = (handle) => {
469+
resolve(handle);
470+
};
471+
});
472+
}
462473
}
463474

464475
/**

src/async_wrap.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ namespace node {
7979
V(SIGINTWATCHDOG) \
8080
V(WORKER) \
8181
V(WORKERHEAPSNAPSHOT) \
82+
V(WORKERHEAPSTATISTICS) \
8283
V(WRITEWRAP) \
8384
V(ZLIB)
8485

src/env_properties.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -464,6 +464,7 @@
464464
V(tty_constructor_template, v8::FunctionTemplate) \
465465
V(write_wrap_template, v8::ObjectTemplate) \
466466
V(worker_heap_snapshot_taker_template, v8::ObjectTemplate) \
467+
V(worker_heap_statistics_taker_template, v8::ObjectTemplate) \
467468
V(x509_constructor_template, v8::FunctionTemplate)
468469

469470
#define PER_REALM_STRONG_PERSISTENT_VALUES(V) \

src/node_worker.cc

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -816,6 +816,116 @@ void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
816816
}
817817
}
818818

819+
class WorkerHeapStatisticsTaker : public AsyncWrap {
820+
public:
821+
WorkerHeapStatisticsTaker(Environment* env, Local<Object> obj)
822+
: AsyncWrap(env, obj, AsyncWrap::PROVIDER_WORKERHEAPSTATISTICS) {}
823+
824+
SET_NO_MEMORY_INFO()
825+
SET_MEMORY_INFO_NAME(WorkerHeapStatisticsTaker)
826+
SET_SELF_SIZE(WorkerHeapStatisticsTaker)
827+
};
828+
829+
void Worker::GetHeapStatistics(const FunctionCallbackInfo<Value>& args) {
830+
Worker* w;
831+
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
832+
833+
Environment* env = w->env();
834+
AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
835+
Local<Object> wrap;
836+
if (!env->worker_heap_statistics_taker_template()
837+
->NewInstance(env->context())
838+
.ToLocal(&wrap)) {
839+
return;
840+
}
841+
842+
// The created WorkerHeapStatisticsTaker is an object owned by main
843+
// thread's Isolate, it can not be accessed by worker thread
844+
std::unique_ptr<BaseObjectPtr<WorkerHeapStatisticsTaker>> taker =
845+
std::make_unique<BaseObjectPtr<WorkerHeapStatisticsTaker>>(
846+
MakeDetachedBaseObject<WorkerHeapStatisticsTaker>(env, wrap));
847+
848+
// Interrupt the worker thread and take a snapshot, then schedule a call
849+
// on the parent thread that turns that snapshot into a readable stream.
850+
bool scheduled = w->RequestInterrupt([taker = std::move(taker),
851+
env](Environment* worker_env) mutable {
852+
// We create a unique pointer to HeapStatistics so that the actual object
853+
// it's not copied in the lambda, but only the pointer is.
854+
auto heap_stats = std::make_unique<v8::HeapStatistics>();
855+
worker_env->isolate()->GetHeapStatistics(heap_stats.get());
856+
857+
// Here, the worker thread temporarily owns the WorkerHeapStatisticsTaker
858+
// object.
859+
860+
env->SetImmediateThreadsafe(
861+
[taker = std::move(taker),
862+
heap_stats = std::move(heap_stats)](Environment* env) mutable {
863+
Isolate* isolate = env->isolate();
864+
HandleScope handle_scope(isolate);
865+
Context::Scope context_scope(env->context());
866+
867+
AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(taker->get());
868+
869+
Local<v8::Name> heap_stats_names[] = {
870+
FIXED_ONE_BYTE_STRING(isolate, "total_heap_size"),
871+
FIXED_ONE_BYTE_STRING(isolate, "total_heap_size_executable"),
872+
FIXED_ONE_BYTE_STRING(isolate, "total_physical_size"),
873+
FIXED_ONE_BYTE_STRING(isolate, "total_available_size"),
874+
FIXED_ONE_BYTE_STRING(isolate, "used_heap_size"),
875+
FIXED_ONE_BYTE_STRING(isolate, "heap_size_limit"),
876+
FIXED_ONE_BYTE_STRING(isolate, "malloced_memory"),
877+
FIXED_ONE_BYTE_STRING(isolate, "peak_malloced_memory"),
878+
FIXED_ONE_BYTE_STRING(isolate, "does_zap_garbage"),
879+
FIXED_ONE_BYTE_STRING(isolate, "number_of_native_contexts"),
880+
FIXED_ONE_BYTE_STRING(isolate, "number_of_detached_contexts"),
881+
FIXED_ONE_BYTE_STRING(isolate, "total_global_handles_size"),
882+
FIXED_ONE_BYTE_STRING(isolate, "used_global_handles_size"),
883+
FIXED_ONE_BYTE_STRING(isolate, "external_memory")};
884+
885+
// Define an array of property values
886+
Local<Value> heap_stats_values[] = {
887+
Number::New(isolate, heap_stats->total_heap_size()),
888+
Number::New(isolate, heap_stats->total_heap_size_executable()),
889+
Number::New(isolate, heap_stats->total_physical_size()),
890+
Number::New(isolate, heap_stats->total_available_size()),
891+
Number::New(isolate, heap_stats->used_heap_size()),
892+
Number::New(isolate, heap_stats->heap_size_limit()),
893+
Number::New(isolate, heap_stats->malloced_memory()),
894+
Number::New(isolate, heap_stats->peak_malloced_memory()),
895+
Boolean::New(isolate, heap_stats->does_zap_garbage()),
896+
Number::New(isolate, heap_stats->number_of_native_contexts()),
897+
Number::New(isolate, heap_stats->number_of_detached_contexts()),
898+
Number::New(isolate, heap_stats->total_global_handles_size()),
899+
Number::New(isolate, heap_stats->used_global_handles_size()),
900+
Number::New(isolate, heap_stats->external_memory())};
901+
902+
DCHECK_EQ(arraysize(heap_stats_names), arraysize(heap_stats_values));
903+
904+
// Create the object with the property names and values
905+
Local<Object> stats = Object::New(isolate,
906+
Null(isolate),
907+
heap_stats_names,
908+
heap_stats_values,
909+
arraysize(heap_stats_names));
910+
911+
Local<Value> args[] = {stats};
912+
taker->get()->MakeCallback(
913+
env->ondone_string(), arraysize(args), args);
914+
// implicitly delete `taker`
915+
},
916+
CallbackFlags::kUnrefed);
917+
918+
// Now, the lambda is delivered to the main thread, as a result, the
919+
// WorkerHeapStatisticsTaker object is delivered to the main thread, too.
920+
});
921+
922+
if (scheduled) {
923+
args.GetReturnValue().Set(wrap);
924+
} else {
925+
args.GetReturnValue().Set(Local<Object>());
926+
}
927+
}
928+
819929
void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {
820930
Worker* w;
821931
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
@@ -996,6 +1106,7 @@ void CreateWorkerPerIsolateProperties(IsolateData* isolate_data,
9961106
SetProtoMethod(isolate, w, "takeHeapSnapshot", Worker::TakeHeapSnapshot);
9971107
SetProtoMethod(isolate, w, "loopIdleTime", Worker::LoopIdleTime);
9981108
SetProtoMethod(isolate, w, "loopStartTime", Worker::LoopStartTime);
1109+
SetProtoMethod(isolate, w, "getHeapStatistics", Worker::GetHeapStatistics);
9991110

10001111
SetConstructorFunction(isolate, target, "Worker", w);
10011112
}
@@ -1014,6 +1125,20 @@ void CreateWorkerPerIsolateProperties(IsolateData* isolate_data,
10141125
wst->InstanceTemplate());
10151126
}
10161127

1128+
{
1129+
Local<FunctionTemplate> wst = NewFunctionTemplate(isolate, nullptr);
1130+
1131+
wst->InstanceTemplate()->SetInternalFieldCount(
1132+
WorkerHeapSnapshotTaker::kInternalFieldCount);
1133+
wst->Inherit(AsyncWrap::GetConstructorTemplate(isolate_data));
1134+
1135+
Local<String> wst_string =
1136+
FIXED_ONE_BYTE_STRING(isolate, "WorkerHeapStatisticsTaker");
1137+
wst->SetClassName(wst_string);
1138+
isolate_data->set_worker_heap_statistics_taker_template(
1139+
wst->InstanceTemplate());
1140+
}
1141+
10171142
SetMethod(isolate, target, "getEnvMessagePort", GetEnvMessagePort);
10181143
}
10191144

@@ -1079,6 +1204,7 @@ void RegisterExternalReferences(ExternalReferenceRegistry* registry) {
10791204
registry->Register(Worker::TakeHeapSnapshot);
10801205
registry->Register(Worker::LoopIdleTime);
10811206
registry->Register(Worker::LoopStartTime);
1207+
registry->Register(Worker::GetHeapStatistics);
10821208
}
10831209

10841210
} // anonymous namespace

src/node_worker.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ class Worker : public AsyncWrap {
7878
static void TakeHeapSnapshot(const v8::FunctionCallbackInfo<v8::Value>& args);
7979
static void LoopIdleTime(const v8::FunctionCallbackInfo<v8::Value>& args);
8080
static void LoopStartTime(const v8::FunctionCallbackInfo<v8::Value>& args);
81+
static void GetHeapStatistics(
82+
const v8::FunctionCallbackInfo<v8::Value>& args);
8183

8284
private:
8385
bool CreateEnvMessagePort(Environment* env);
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const fixtures = require('../common/fixtures');
5+
6+
common.skipIfInspectorDisabled();
7+
8+
const {
9+
Worker,
10+
isMainThread,
11+
} = require('worker_threads');
12+
13+
if (!isMainThread) {
14+
common.skip('This test only works on a main thread');
15+
}
16+
17+
// Ensures that worker.getHeapStatistics() returns valid data
18+
19+
const assert = require('assert');
20+
21+
if (isMainThread) {
22+
const name = 'Hello Thread';
23+
const worker = new Worker(fixtures.path('worker-name.js'), {
24+
name,
25+
});
26+
worker.once('message', common.mustCall(async (message) => {
27+
const stats = await worker.getHeapStatistics();
28+
const keys = [
29+
`total_heap_size`,
30+
`total_heap_size_executable`,
31+
`total_physical_size`,
32+
`total_available_size`,
33+
`used_heap_size`,
34+
`heap_size_limit`,
35+
`malloced_memory`,
36+
`peak_malloced_memory`,
37+
`does_zap_garbage`,
38+
`number_of_native_contexts`,
39+
`number_of_detached_contexts`,
40+
`total_global_handles_size`,
41+
`used_global_handles_size`,
42+
`external_memory`,
43+
].sort();
44+
assert.deepStrictEqual(keys, Object.keys(stats).sort());
45+
for (const key of keys) {
46+
if (key === 'does_zap_garbage') {
47+
assert.strictEqual(typeof stats[key], 'boolean', `Expected ${key} to be a boolean`);
48+
continue;
49+
}
50+
assert.strictEqual(typeof stats[key], 'number', `Expected ${key} to be a number`);
51+
assert.ok(stats[key] >= 0, `Expected ${key} to be >= 0`);
52+
}
53+
54+
worker.postMessage('done');
55+
}));
56+
57+
worker.once('exit', common.mustCall(async (code) => {
58+
assert.strictEqual(code, 0);
59+
await assert.rejects(worker.getHeapStatistics(), {
60+
code: 'ERR_WORKER_NOT_RUNNING'
61+
});
62+
}));
63+
}

test/sequential/test-async-wrap-getasyncid.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ const { getSystemErrorName } = require('util');
6161
delete providers.ELDHISTOGRAM;
6262
delete providers.SIGINTWATCHDOG;
6363
delete providers.WORKERHEAPSNAPSHOT;
64+
delete providers.WORKERHEAPSTATISTICS;
6465
delete providers.BLOBREADER;
6566
delete providers.RANDOMPRIMEREQUEST;
6667
delete providers.CHECKPRIMEREQUEST;

typings/internalBinding/worker.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ declare namespace InternalWorkerBinding {
1515
unref(): void;
1616
getResourceLimits(): Float64Array;
1717
takeHeapSnapshot(): object;
18+
getHeapStatistics(): Promise<object>;
1819
loopIdleTime(): number;
1920
loopStartTime(): number;
2021
}

0 commit comments

Comments
 (0)