Skip to content

Commit 7f9091f

Browse files
authored
fix metadata system not updating because of dual package hazard (#1428)
1 parent 9f6887b commit 7f9091f

File tree

11 files changed

+218
-129
lines changed

11 files changed

+218
-129
lines changed

packages/cli-v3/src/entryPoints/deploy-run-worker.ts

+5-2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import {
3333
TracingSDK,
3434
usage,
3535
UsageTimeoutManager,
36+
StandardMetadataManager,
3637
} from "@trigger.dev/core/v3/workers";
3738
import { ZodIpcConnection } from "@trigger.dev/core/v3/zodIpc";
3839
import { readFile } from "node:fs/promises";
@@ -99,6 +100,8 @@ timeout.setGlobalManager(new UsageTimeoutManager(devUsageManager));
99100
taskCatalog.setGlobalTaskCatalog(new StandardTaskCatalog());
100101
const durableClock = new DurableClock();
101102
clock.setGlobalClock(durableClock);
103+
const runMetadataManager = new StandardMetadataManager();
104+
runMetadata.setGlobalManager(runMetadataManager);
102105

103106
const triggerLogLevel = getEnvVar("TRIGGER_LOG_LEVEL");
104107

@@ -305,7 +308,7 @@ const zodIpc = new ZodIpcConnection({
305308
_execution = execution;
306309
_isRunning = true;
307310

308-
runMetadata.startPeriodicFlush(
311+
runMetadataManager.startPeriodicFlush(
309312
getNumberEnvVar("TRIGGER_RUN_METADATA_FLUSH_INTERVAL", 1000)
310313
);
311314

@@ -437,7 +440,7 @@ async function flushTracingSDK(timeoutInMs: number = 10_000) {
437440
async function flushMetadata(timeoutInMs: number = 10_000) {
438441
const now = performance.now();
439442

440-
await Promise.race([runMetadata.flush(), setTimeout(timeoutInMs)]);
443+
await Promise.race([runMetadataManager.flush(), setTimeout(timeoutInMs)]);
441444

442445
const duration = performance.now() - now;
443446

packages/cli-v3/src/entryPoints/dev-run-worker.ts

+5-2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import {
3232
TracingSDK,
3333
usage,
3434
getNumberEnvVar,
35+
StandardMetadataManager,
3536
} from "@trigger.dev/core/v3/workers";
3637
import { ZodIpcConnection } from "@trigger.dev/core/v3/zodIpc";
3738
import { readFile } from "node:fs/promises";
@@ -81,6 +82,8 @@ usage.setGlobalUsageManager(devUsageManager);
8182
const devRuntimeManager = new DevRuntimeManager();
8283
runtime.setGlobalRuntimeManager(devRuntimeManager);
8384
timeout.setGlobalManager(new UsageTimeoutManager(devUsageManager));
85+
const runMetadataManager = new StandardMetadataManager();
86+
runMetadata.setGlobalManager(runMetadataManager);
8487

8588
const triggerLogLevel = getEnvVar("TRIGGER_LOG_LEVEL");
8689

@@ -275,7 +278,7 @@ const zodIpc = new ZodIpcConnection({
275278
_execution = execution;
276279
_isRunning = true;
277280

278-
runMetadata.startPeriodicFlush(
281+
runMetadataManager.startPeriodicFlush(
279282
getNumberEnvVar("TRIGGER_RUN_METADATA_FLUSH_INTERVAL", 1000)
280283
);
281284
const measurement = usage.start();
@@ -350,7 +353,7 @@ const zodIpc = new ZodIpcConnection({
350353
}
351354
},
352355
FLUSH: async ({ timeoutInMs }, sender) => {
353-
await Promise.allSettled([_tracingSDK?.flush(), runMetadata.flush()]);
356+
await Promise.allSettled([_tracingSDK?.flush(), runMetadataManager.flush()]);
354357
},
355358
},
356359
});
+17-118
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
1-
import { dequal } from "dequal/lite";
21
import { DeserializedJson } from "../../schemas/json.js";
3-
import { apiClientManager } from "../apiClientManager-api.js";
4-
import { taskContext } from "../task-context-api.js";
52
import { getGlobal, registerGlobal } from "../utils/globals.js";
63
import { ApiRequestOptions } from "../zodfetch.js";
7-
import { JSONHeroPath } from "@jsonhero/path";
4+
import { NoopRunMetadataManager } from "./noopManager.js";
5+
import { RunMetadataManager } from "./types.js";
86

97
const API_NAME = "run-metadata";
108

11-
export class RunMetadataAPI {
9+
const NOOP_MANAGER = new NoopRunMetadataManager();
10+
11+
export class RunMetadataAPI implements RunMetadataManager {
1212
private static _instance?: RunMetadataAPI;
13-
private flushTimeoutId: NodeJS.Timeout | null = null;
14-
private hasChanges: boolean = false;
1513

1614
private constructor() {}
1715

@@ -23,138 +21,39 @@ export class RunMetadataAPI {
2321
return this._instance;
2422
}
2523

26-
get store(): Record<string, DeserializedJson> | undefined {
27-
return getGlobal(API_NAME);
24+
setGlobalManager(manager: RunMetadataManager): boolean {
25+
return registerGlobal(API_NAME, manager);
2826
}
2927

30-
set store(value: Record<string, DeserializedJson> | undefined) {
31-
registerGlobal(API_NAME, value, true);
28+
#getManager(): RunMetadataManager {
29+
return getGlobal(API_NAME) ?? NOOP_MANAGER;
3230
}
3331

3432
public enterWithMetadata(metadata: Record<string, DeserializedJson>): void {
35-
registerGlobal(API_NAME, metadata);
33+
this.#getManager().enterWithMetadata(metadata);
3634
}
3735

3836
public current(): Record<string, DeserializedJson> | undefined {
39-
return this.store;
37+
return this.#getManager().current();
4038
}
4139

4240
public getKey(key: string): DeserializedJson | undefined {
43-
return this.store?.[key];
41+
return this.#getManager().getKey(key);
4442
}
4543

4644
public setKey(key: string, value: DeserializedJson) {
47-
const runId = taskContext.ctx?.run.id;
48-
49-
if (!runId) {
50-
return;
51-
}
52-
53-
let nextStore: Record<string, DeserializedJson> | undefined = this.store
54-
? structuredClone(this.store)
55-
: undefined;
56-
57-
if (key.startsWith("$.")) {
58-
const path = new JSONHeroPath(key);
59-
path.set(nextStore, value);
60-
} else {
61-
nextStore = {
62-
...(nextStore ?? {}),
63-
[key]: value,
64-
};
65-
}
66-
67-
if (!nextStore) {
68-
return;
69-
}
70-
71-
if (!dequal(this.store, nextStore)) {
72-
this.hasChanges = true;
73-
}
74-
75-
this.store = nextStore;
45+
return this.#getManager().setKey(key, value);
7646
}
7747

7848
public deleteKey(key: string) {
79-
const runId = taskContext.ctx?.run.id;
80-
81-
if (!runId) {
82-
return;
83-
}
84-
85-
const nextStore = { ...(this.store ?? {}) };
86-
delete nextStore[key];
87-
88-
if (!dequal(this.store, nextStore)) {
89-
this.hasChanges = true;
90-
}
91-
92-
this.store = nextStore;
49+
return this.#getManager().deleteKey(key);
9350
}
9451

9552
public update(metadata: Record<string, DeserializedJson>): void {
96-
const runId = taskContext.ctx?.run.id;
97-
98-
if (!runId) {
99-
return;
100-
}
101-
102-
if (!dequal(this.store, metadata)) {
103-
this.hasChanges = true;
104-
}
105-
106-
this.store = metadata;
53+
return this.#getManager().update(metadata);
10754
}
10855

109-
public async flush(requestOptions?: ApiRequestOptions): Promise<void> {
110-
const runId = taskContext.ctx?.run.id;
111-
112-
if (!runId) {
113-
return;
114-
}
115-
116-
if (!this.store) {
117-
return;
118-
}
119-
120-
if (!this.hasChanges) {
121-
return;
122-
}
123-
124-
const apiClient = apiClientManager.clientOrThrow();
125-
126-
try {
127-
this.hasChanges = false;
128-
await apiClient.updateRunMetadata(runId, { metadata: this.store }, requestOptions);
129-
} catch (error) {
130-
this.hasChanges = true;
131-
throw error;
132-
}
133-
}
134-
135-
public startPeriodicFlush(intervalMs: number = 1000) {
136-
const periodicFlush = async (intervalMs: number) => {
137-
try {
138-
await this.flush();
139-
} catch (error) {
140-
console.error("Failed to flush metadata", error);
141-
throw error;
142-
} finally {
143-
scheduleNext();
144-
}
145-
};
146-
147-
const scheduleNext = () => {
148-
this.flushTimeoutId = setTimeout(() => periodicFlush(intervalMs), intervalMs);
149-
};
150-
151-
scheduleNext();
152-
}
153-
154-
stopPeriodicFlush(): void {
155-
if (this.flushTimeoutId) {
156-
clearTimeout(this.flushTimeoutId);
157-
this.flushTimeoutId = null;
158-
}
56+
flush(requestOptions?: ApiRequestOptions): Promise<void> {
57+
return this.#getManager().flush(requestOptions);
15958
}
16059
}
+140
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
import { JSONHeroPath } from "@jsonhero/path";
2+
import { dequal } from "dequal/lite";
3+
import { DeserializedJson } from "../../schemas/json.js";
4+
import { apiClientManager } from "../apiClientManager-api.js";
5+
import { taskContext } from "../task-context-api.js";
6+
import { ApiRequestOptions } from "../zodfetch.js";
7+
import { RunMetadataManager } from "./types.js";
8+
9+
export class StandardMetadataManager implements RunMetadataManager {
10+
private flushTimeoutId: NodeJS.Timeout | null = null;
11+
private hasChanges: boolean = false;
12+
private store: Record<string, DeserializedJson> | undefined;
13+
14+
public enterWithMetadata(metadata: Record<string, DeserializedJson>): void {
15+
this.store = metadata ?? {};
16+
}
17+
18+
public current(): Record<string, DeserializedJson> | undefined {
19+
return this.store;
20+
}
21+
22+
public getKey(key: string): DeserializedJson | undefined {
23+
return this.store?.[key];
24+
}
25+
26+
public setKey(key: string, value: DeserializedJson) {
27+
const runId = taskContext.ctx?.run.id;
28+
29+
if (!runId) {
30+
return;
31+
}
32+
33+
let nextStore: Record<string, DeserializedJson> | undefined = this.store
34+
? structuredClone(this.store)
35+
: undefined;
36+
37+
if (key.startsWith("$.")) {
38+
const path = new JSONHeroPath(key);
39+
path.set(nextStore, value);
40+
} else {
41+
nextStore = {
42+
...(nextStore ?? {}),
43+
[key]: value,
44+
};
45+
}
46+
47+
if (!nextStore) {
48+
return;
49+
}
50+
51+
if (!dequal(this.store, nextStore)) {
52+
this.hasChanges = true;
53+
}
54+
55+
this.store = nextStore;
56+
}
57+
58+
public deleteKey(key: string) {
59+
const runId = taskContext.ctx?.run.id;
60+
61+
if (!runId) {
62+
return;
63+
}
64+
65+
const nextStore = { ...(this.store ?? {}) };
66+
delete nextStore[key];
67+
68+
if (!dequal(this.store, nextStore)) {
69+
this.hasChanges = true;
70+
}
71+
72+
this.store = nextStore;
73+
}
74+
75+
public update(metadata: Record<string, DeserializedJson>): void {
76+
const runId = taskContext.ctx?.run.id;
77+
78+
if (!runId) {
79+
return;
80+
}
81+
82+
if (!dequal(this.store, metadata)) {
83+
this.hasChanges = true;
84+
}
85+
86+
this.store = metadata;
87+
}
88+
89+
public async flush(requestOptions?: ApiRequestOptions): Promise<void> {
90+
const runId = taskContext.ctx?.run.id;
91+
92+
if (!runId) {
93+
return;
94+
}
95+
96+
if (!this.store) {
97+
return;
98+
}
99+
100+
if (!this.hasChanges) {
101+
return;
102+
}
103+
104+
const apiClient = apiClientManager.clientOrThrow();
105+
106+
try {
107+
this.hasChanges = false;
108+
await apiClient.updateRunMetadata(runId, { metadata: this.store }, requestOptions);
109+
} catch (error) {
110+
this.hasChanges = true;
111+
throw error;
112+
}
113+
}
114+
115+
public startPeriodicFlush(intervalMs: number = 1000) {
116+
const periodicFlush = async (intervalMs: number) => {
117+
try {
118+
await this.flush();
119+
} catch (error) {
120+
console.error("Failed to flush metadata", error);
121+
throw error;
122+
} finally {
123+
scheduleNext();
124+
}
125+
};
126+
127+
const scheduleNext = () => {
128+
this.flushTimeoutId = setTimeout(() => periodicFlush(intervalMs), intervalMs);
129+
};
130+
131+
scheduleNext();
132+
}
133+
134+
stopPeriodicFlush(): void {
135+
if (this.flushTimeoutId) {
136+
clearTimeout(this.flushTimeoutId);
137+
this.flushTimeoutId = null;
138+
}
139+
}
140+
}

0 commit comments

Comments
 (0)