Skip to content

Commit cf4c4d7

Browse files
authored
Add attempt metrics in dev (prod WIP). Added max concurrent runs setting to dev using p-limit (#1766)
1 parent 722fae5 commit cf4c4d7

File tree

13 files changed

+131
-21
lines changed

13 files changed

+131
-21
lines changed

Diff for: apps/webapp/app/env.server.ts

+3
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,9 @@ const EnvironmentSchema = z.object({
571571
/** The max number of runs per API call that we'll dequeue in DEV */
572572
DEV_DEQUEUE_MAX_RUNS_PER_PULL: z.coerce.number().int().default(10),
573573

574+
/** The maximum concurrent local run processes executing at once in dev */
575+
DEV_MAX_CONCURRENT_RUNS: z.coerce.number().int().default(25),
576+
574577
LEGACY_RUN_ENGINE_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
575578
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),
576579
LEGACY_RUN_ENGINE_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(1),

Diff for: apps/webapp/app/routes/engine.v1.dev.config.ts

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ export const loader = createLoaderApiRoute(
2020
environmentId: authentication.environment.id,
2121
dequeueIntervalWithRun: env.DEV_DEQUEUE_INTERVAL_WITH_RUN,
2222
dequeueIntervalWithoutRun: env.DEV_DEQUEUE_INTERVAL_WITHOUT_RUN,
23+
maxConcurrentRuns: env.DEV_MAX_CONCURRENT_RUNS,
2324
});
2425
} catch (error) {
2526
logger.error("Failed to get dev settings", {

Diff for: internal-packages/run-engine/src/engine/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -925,6 +925,7 @@ export class RunEngine {
925925

926926
return {
927927
version: "1" as const,
928+
dequeuedAt: new Date(),
928929
snapshot: {
929930
id: newSnapshot.id,
930931
friendlyId: newSnapshot.friendlyId,

Diff for: packages/cli-v3/package.json

+1
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@
110110
"nypm": "^0.3.9",
111111
"object-hash": "^3.0.0",
112112
"open": "^10.0.3",
113+
"p-limit": "^6.2.0",
113114
"p-retry": "^6.1.0",
114115
"partysocket": "^1.0.2",
115116
"pkg-types": "^1.1.3",

Diff for: packages/cli-v3/src/commands/dev.ts

+5
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ const DevCommandOptions = CommonCommandOptions.extend({
1919
skipUpdateCheck: z.boolean().default(false),
2020
envFile: z.string().optional(),
2121
keepTmpFiles: z.boolean().default(false),
22+
maxConcurrentRuns: z.coerce.number().optional(),
2223
});
2324

2425
export type DevCommandOptions = z.infer<typeof DevCommandOptions>;
@@ -37,6 +38,10 @@ export function configureDevCommand(program: Command) {
3738
"--env-file <env file>",
3839
"Path to the .env file to use for the dev session. Defaults to .env in the project directory."
3940
)
41+
.option(
42+
"--max-concurrent-runs <max concurrent runs>",
43+
"The maximum number of concurrent runs to allow in the dev session"
44+
)
4045
.option("--debug-otel", "Enable OpenTelemetry debugging")
4146
.option("--skip-update-check", "Skip checking for @trigger.dev package updates")
4247
.option(

Diff for: packages/cli-v3/src/dev/devSupervisor.ts

+30-4
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import {
2424
WorkerClientToServerEvents,
2525
WorkerServerToClientEvents,
2626
} from "@trigger.dev/core/v3/workers";
27+
import pLimit from "p-limit";
2728

2829
export type WorkerRuntimeOptions = {
2930
name: string | undefined;
@@ -65,6 +66,8 @@ class DevSupervisor implements WorkerRuntime {
6566

6667
private socketConnections = new Set<string>();
6768

69+
private runLimiter?: ReturnType<typeof pLimit>;
70+
6871
constructor(public readonly options: WorkerRuntimeOptions) {}
6972

7073
async init(): Promise<void> {
@@ -81,6 +84,15 @@ class DevSupervisor implements WorkerRuntime {
8184
logger.debug("[DevSupervisor] Got dev settings", { settings: settings.data });
8285
this.config = settings.data;
8386

87+
const maxConcurrentRuns = Math.min(
88+
this.config.maxConcurrentRuns,
89+
this.options.args.maxConcurrentRuns ?? this.config.maxConcurrentRuns
90+
);
91+
92+
logger.debug("[DevSupervisor] Using maxConcurrentRuns", { maxConcurrentRuns });
93+
94+
this.runLimiter = pLimit(maxConcurrentRuns);
95+
8496
this.#createSocket();
8597

8698
//start an SSE connection for presence
@@ -178,6 +190,14 @@ class DevSupervisor implements WorkerRuntime {
178190
return;
179191
}
180192

193+
if (
194+
this.runLimiter &&
195+
this.runLimiter.activeCount + this.runLimiter.pendingCount > this.runLimiter.concurrency
196+
) {
197+
logger.debug(`[DevSupervisor] dequeueRuns. Run limit reached, trying again later`);
198+
setTimeout(() => this.#dequeueRuns(), this.config.dequeueIntervalWithoutRun);
199+
}
200+
181201
//get relevant versions
182202
//ignore deprecated and the latest worker
183203
const oldWorkerIds = this.#getActiveOldWorkers();
@@ -287,10 +307,16 @@ class DevSupervisor implements WorkerRuntime {
287307

288308
this.runControllers.set(message.run.friendlyId, runController);
289309

290-
//don't await for run completion, we want to dequeue more runs
291-
runController.start(message).then(() => {
292-
logger.debug("[DevSupervisor] Run started", { runId: message.run.friendlyId });
293-
});
310+
if (this.runLimiter) {
311+
this.runLimiter(() => runController.start(message)).then(() => {
312+
logger.debug("[DevSupervisor] Run started", { runId: message.run.friendlyId });
313+
});
314+
} else {
315+
//don't await for run completion, we want to dequeue more runs
316+
runController.start(message).then(() => {
317+
logger.debug("[DevSupervisor] Run started", { runId: message.run.friendlyId });
318+
});
319+
}
294320
}
295321

296322
setTimeout(() => this.#dequeueRuns(), this.config.dequeueIntervalWithRun);

Diff for: packages/cli-v3/src/entryPoints/dev-run-controller.ts

+34-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
LogLevel,
66
RunExecutionData,
77
TaskRunExecution,
8+
TaskRunExecutionMetrics,
89
TaskRunExecutionResult,
910
TaskRunFailedExecutionResult,
1011
} from "@trigger.dev/core/v3";
@@ -475,17 +476,21 @@ export class DevRunController {
475476
private async startAndExecuteRunAttempt({
476477
runFriendlyId,
477478
snapshotFriendlyId,
479+
dequeuedAt,
478480
isWarmStart = false,
479481
}: {
480482
runFriendlyId: string;
481483
snapshotFriendlyId: string;
484+
dequeuedAt?: Date;
482485
isWarmStart?: boolean;
483486
}) {
484487
this.subscribeToRunNotifications({
485488
run: { friendlyId: runFriendlyId },
486489
snapshot: { friendlyId: snapshotFriendlyId },
487490
});
488491

492+
const attemptStartedAt = Date.now();
493+
489494
const start = await this.httpClient.dev.startRunAttempt(runFriendlyId, snapshotFriendlyId);
490495

491496
if (!start.success) {
@@ -495,6 +500,8 @@ export class DevRunController {
495500
return;
496501
}
497502

503+
const attemptDuration = Date.now() - attemptStartedAt;
504+
498505
const { run, snapshot, execution, envVars } = start.data;
499506

500507
eventBus.emit("runStarted", this.opts.worker, execution);
@@ -508,8 +515,28 @@ export class DevRunController {
508515
// This is the only case where incrementing the attempt number is allowed
509516
this.enterRunPhase(run, snapshot);
510517

518+
const metrics = [
519+
{
520+
name: "start",
521+
event: "create_attempt",
522+
timestamp: attemptStartedAt,
523+
duration: attemptDuration,
524+
},
525+
].concat(
526+
dequeuedAt
527+
? [
528+
{
529+
name: "start",
530+
event: "dequeue",
531+
timestamp: dequeuedAt.getTime(),
532+
duration: 0,
533+
},
534+
]
535+
: []
536+
);
537+
511538
try {
512-
return await this.executeRun({ run, snapshot, execution, envVars });
539+
return await this.executeRun({ run, snapshot, execution, envVars, metrics });
513540
} catch (error) {
514541
// TODO: Handle the case where we're in the warm start phase or executing a new run
515542
// This can happen if we kill the run while it's still executing, e.g. after receiving an attempt number mismatch
@@ -566,7 +593,10 @@ export class DevRunController {
566593
snapshot,
567594
execution,
568595
envVars,
569-
}: WorkloadRunAttemptStartResponseBody) {
596+
metrics,
597+
}: WorkloadRunAttemptStartResponseBody & {
598+
metrics?: TaskRunExecutionMetrics;
599+
}) {
570600
if (!this.opts.worker.serverWorker) {
571601
throw new Error(`No server worker for Dev ${run.friendlyId}`);
572602
}
@@ -594,6 +624,7 @@ export class DevRunController {
594624
payload: {
595625
execution,
596626
traceContext: execution.run.traceContext ?? {},
627+
metrics,
597628
},
598629
messageId: run.friendlyId,
599630
});
@@ -753,6 +784,7 @@ export class DevRunController {
753784
await this.startAndExecuteRunAttempt({
754785
runFriendlyId: dequeueMessage.run.friendlyId,
755786
snapshotFriendlyId: dequeueMessage.snapshot.friendlyId,
787+
dequeuedAt: dequeueMessage.dequeuedAt,
756788
}).finally(async () => {});
757789
}
758790

Diff for: packages/cli-v3/src/entryPoints/dev-run-worker.ts

+25-6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import {
1717
waitUntil,
1818
WorkerManifest,
1919
WorkerToExecutorMessageCatalog,
20+
runTimelineMetrics,
2021
} from "@trigger.dev/core/v3";
2122
import { TriggerTracer } from "@trigger.dev/core/v3/tracer";
2223
import {
@@ -36,6 +37,7 @@ import {
3637
TracingSDK,
3738
usage,
3839
UsageTimeoutManager,
40+
StandardRunTimelineMetricsManager,
3941
} from "@trigger.dev/core/v3/workers";
4042
import { ZodIpcConnection } from "@trigger.dev/core/v3/zodIpc";
4143
import { readFile } from "node:fs/promises";
@@ -87,6 +89,10 @@ process.on("uncaughtException", function (error, origin) {
8789

8890
const heartbeatIntervalMs = getEnvVar("HEARTBEAT_INTERVAL_MS");
8991

92+
const standardRunTimelineMetricsManager = new StandardRunTimelineMetricsManager();
93+
runTimelineMetrics.setGlobalManager(standardRunTimelineMetricsManager);
94+
standardRunTimelineMetricsManager.seedMetricsFromEnvironment();
95+
9096
const devUsageManager = new DevUsageManager();
9197
usage.setGlobalUsageManager(devUsageManager);
9298
timeout.setGlobalManager(new UsageTimeoutManager(devUsageManager));
@@ -189,9 +195,11 @@ const zodIpc = new ZodIpcConnection({
189195
emitSchema: ExecutorToWorkerMessageCatalog,
190196
process,
191197
handlers: {
192-
EXECUTE_TASK_RUN: async ({ execution, traceContext, metadata }, sender) => {
198+
EXECUTE_TASK_RUN: async ({ execution, traceContext, metadata, metrics }, sender) => {
193199
log(`[${new Date().toISOString()}] Received EXECUTE_TASK_RUN`, execution);
194200

201+
standardRunTimelineMetricsManager.registerMetricsFromExecution(metrics);
202+
195203
if (_isRunning) {
196204
logError("Worker is already running a task");
197205

@@ -246,11 +254,22 @@ const zodIpc = new ZodIpcConnection({
246254
}
247255

248256
try {
249-
const beforeImport = performance.now();
250-
await import(normalizeImportPath(taskManifest.entryPoint));
251-
const durationMs = performance.now() - beforeImport;
252-
253-
log(`Imported task ${execution.task.id} [${taskManifest.entryPoint}] in ${durationMs}ms`);
257+
await runTimelineMetrics.measureMetric(
258+
"trigger.dev/start",
259+
"import",
260+
{
261+
entryPoint: taskManifest.entryPoint,
262+
},
263+
async () => {
264+
const beforeImport = performance.now();
265+
await import(normalizeImportPath(taskManifest.entryPoint));
266+
const durationMs = performance.now() - beforeImport;
267+
268+
log(
269+
`Imported task ${execution.task.id} [${taskManifest.entryPoint}] in ${durationMs}ms`
270+
);
271+
}
272+
);
254273
} catch (err) {
255274
logError(`Failed to import task ${execution.task.id}`, err);
256275

Diff for: packages/cli-v3/src/entryPoints/managed-run-worker.ts

+24-7
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import {
1717
runMetadata,
1818
waitUntil,
1919
apiClientManager,
20+
runTimelineMetrics,
2021
} from "@trigger.dev/core/v3";
2122
import { TriggerTracer } from "@trigger.dev/core/v3/tracer";
2223
import {
@@ -37,6 +38,7 @@ import {
3738
StandardMetadataManager,
3839
StandardWaitUntilManager,
3940
ManagedRuntimeManager,
41+
StandardRunTimelineMetricsManager,
4042
} from "@trigger.dev/core/v3/workers";
4143
import { ZodIpcConnection } from "@trigger.dev/core/v3/zodIpc";
4244
import { readFile } from "node:fs/promises";
@@ -91,6 +93,10 @@ const usageEventUrl = getEnvVar("USAGE_EVENT_URL");
9193
const triggerJWT = getEnvVar("TRIGGER_JWT");
9294
const heartbeatIntervalMs = getEnvVar("HEARTBEAT_INTERVAL_MS");
9395

96+
const standardRunTimelineMetricsManager = new StandardRunTimelineMetricsManager();
97+
runTimelineMetrics.setGlobalManager(standardRunTimelineMetricsManager);
98+
standardRunTimelineMetricsManager.seedMetricsFromEnvironment();
99+
94100
const devUsageManager = new DevUsageManager();
95101
const prodUsageManager = new ProdUsageManager(devUsageManager, {
96102
heartbeatIntervalMs: usageIntervalMs ? parseInt(usageIntervalMs, 10) : undefined,
@@ -199,7 +205,9 @@ const zodIpc = new ZodIpcConnection({
199205
emitSchema: ExecutorToWorkerMessageCatalog,
200206
process,
201207
handlers: {
202-
EXECUTE_TASK_RUN: async ({ execution, traceContext, metadata }, sender) => {
208+
EXECUTE_TASK_RUN: async ({ execution, traceContext, metadata, metrics }, sender) => {
209+
standardRunTimelineMetricsManager.registerMetricsFromExecution(metrics);
210+
203211
console.log(`[${new Date().toISOString()}] Received EXECUTE_TASK_RUN`, execution);
204212

205213
if (_isRunning) {
@@ -256,12 +264,21 @@ const zodIpc = new ZodIpcConnection({
256264
}
257265

258266
try {
259-
const beforeImport = performance.now();
260-
await import(normalizeImportPath(taskManifest.entryPoint));
261-
const durationMs = performance.now() - beforeImport;
262-
263-
console.log(
264-
`Imported task ${execution.task.id} [${taskManifest.entryPoint}] in ${durationMs}ms`
267+
await runTimelineMetrics.measureMetric(
268+
"trigger.dev/start",
269+
"import",
270+
{
271+
entryPoint: taskManifest.entryPoint,
272+
},
273+
async () => {
274+
const beforeImport = performance.now();
275+
await import(normalizeImportPath(taskManifest.entryPoint));
276+
const durationMs = performance.now() - beforeImport;
277+
278+
console.log(
279+
`Imported task ${execution.task.id} [${taskManifest.entryPoint}] in ${durationMs}ms`
280+
);
281+
}
265282
);
266283
} catch (err) {
267284
console.error(`Failed to import task ${execution.task.id}`, err);

Diff for: packages/core/src/v3/schemas/api.ts

+1
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,7 @@ export const DevConfigResponseBody = z.object({
427427
environmentId: z.string(),
428428
dequeueIntervalWithRun: z.number(),
429429
dequeueIntervalWithoutRun: z.number(),
430+
maxConcurrentRuns: z.number(),
430431
});
431432
export type DevConfigResponseBody = z.infer<typeof DevConfigResponseBody>;
432433

Diff for: packages/core/src/v3/schemas/runEngine.ts

+1
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ export type ExecutionResult = z.infer<typeof ExecutionResult>;
139139
export const DequeuedMessage = z.object({
140140
version: z.literal("1"),
141141
snapshot: ExecutionSnapshot,
142+
dequeuedAt: z.coerce.date(),
142143
image: z.string().optional(),
143144
checkpoint: z
144145
.object({

Diff for: pnpm-lock.yaml

+3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Diff for: references/v3-catalog/package.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
"schema": "./prisma/schema.zmodel"
77
},
88
"scripts": {
9-
"dev:trigger": "trigger dev",
9+
"dev": "trigger dev",
1010
"deploy": "trigger deploy --self-hosted --load-image",
1111
"management": "tsx -r dotenv/config ./src/management.ts",
1212
"queues": "ts-node -r dotenv/config -r tsconfig-paths/register ./src/queues.ts",
@@ -84,4 +84,4 @@
8484
"ts-node": "^10.9.2",
8585
"tsconfig-paths": "^4.2.0"
8686
}
87-
}
87+
}

0 commit comments

Comments
 (0)