Skip to content

Commit 38ddd83

Browse files
authored
Add supervisor split service controls (#1774)
* dockerignore node_modules in subdirectories * image tag action should handle re2 tags * add supervisor containerfile * add publish worker re2 workflow * fix copypasta * require branch check * add more granular service control to supervisor session * fix supervisor api domain for split setups * remove default workload api domain * option to disable workload api * fix bool env var coercion
1 parent 4dbf11a commit 38ddd83

File tree

7 files changed

+136
-38
lines changed

7 files changed

+136
-38
lines changed

Diff for: apps/supervisor/src/env.ts

+28-8
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,41 @@
11
import { randomUUID } from "crypto";
22
import { env as stdEnv } from "std-env";
33
import { z } from "zod";
4-
import { getDockerHostDomain } from "./util.js";
4+
5+
const BoolEnv = z.preprocess((val) => {
6+
if (typeof val !== "string") {
7+
return val;
8+
}
9+
10+
return ["true", "1"].includes(val.toLowerCase().trim());
11+
}, z.boolean());
512

613
const Env = z.object({
7-
// This will come from `status.hostIP` in k8s
8-
WORKER_HOST_IP: z.string().default(getDockerHostDomain()),
9-
TRIGGER_API_URL: z.string().url(),
10-
TRIGGER_WORKER_TOKEN: z.string(),
1114
// This will come from `spec.nodeName` in k8s
1215
TRIGGER_WORKER_INSTANCE_NAME: z.string().default(randomUUID()),
16+
17+
// Required settings
18+
TRIGGER_API_URL: z.string().url(),
19+
TRIGGER_WORKER_TOKEN: z.string(),
1320
MANAGED_WORKER_SECRET: z.string(),
14-
TRIGGER_WORKLOAD_API_PORT: z.coerce.number().default(8020),
15-
TRIGGER_WORKLOAD_API_PORT_EXTERNAL: z.coerce.number().default(8020),
21+
22+
// Workload API settings (coordinator mode) - the workload API is what the run controller connects to
23+
TRIGGER_WORKLOAD_API_ENABLED: BoolEnv.default("true"),
24+
TRIGGER_WORKLOAD_API_PROTOCOL: z
25+
.string()
26+
.transform((s) => z.enum(["http", "https"]).parse(s.toLowerCase()))
27+
.default("http"),
28+
TRIGGER_WORKLOAD_API_DOMAIN: z.string().optional(), // If unset, will use orchestrator-specific default
29+
TRIGGER_WORKLOAD_API_PORT_INTERNAL: z.coerce.number().default(8020), // This is the port the workload API listens on
30+
TRIGGER_WORKLOAD_API_PORT_EXTERNAL: z.coerce.number().default(8020), // This is the exposed port passed to the run controller
31+
32+
// Dequeue settings (provider mode)
33+
TRIGGER_DEQUEUE_ENABLED: BoolEnv.default("true"),
34+
TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(1000),
35+
36+
// Optional services
1637
TRIGGER_WARM_START_URL: z.string().optional(),
1738
TRIGGER_CHECKPOINT_URL: z.string().optional(),
18-
TRIGGER_DEQUEUE_INTERVAL_MS: z.coerce.number().int().default(1000),
1939

2040
// Used by the workload manager, e.g docker/k8s
2141
DOCKER_NETWORK: z.string().default("host"),

Diff for: apps/supervisor/src/index.ts

+23-5
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ class ManagedSupervisor {
2929
private readonly warmStartUrl = env.TRIGGER_WARM_START_URL;
3030

3131
constructor() {
32-
const workerApiUrl = `http://${env.WORKER_HOST_IP}:${env.TRIGGER_WORKLOAD_API_PORT_EXTERNAL}`;
32+
const workloadApiProtocol = env.TRIGGER_WORKLOAD_API_PROTOCOL;
33+
const workloadApiDomain = env.TRIGGER_WORKLOAD_API_DOMAIN;
34+
const workloadApiPortExternal = env.TRIGGER_WORKLOAD_API_PORT_EXTERNAL;
3335

3436
if (this.warmStartUrl) {
3537
this.logger.log("[ManagedWorker] 🔥 Warm starts enabled", {
@@ -40,13 +42,17 @@ class ManagedSupervisor {
4042
if (this.isKubernetes) {
4143
this.resourceMonitor = new KubernetesResourceMonitor(createK8sApi(), "");
4244
this.workloadManager = new KubernetesWorkloadManager({
43-
workerApiUrl,
45+
workloadApiProtocol,
46+
workloadApiDomain,
47+
workloadApiPort: workloadApiPortExternal,
4448
warmStartUrl: this.warmStartUrl,
4549
});
4650
} else {
4751
this.resourceMonitor = new DockerResourceMonitor(new Docker());
4852
this.workloadManager = new DockerWorkloadManager({
49-
workerApiUrl,
53+
workloadApiProtocol,
54+
workloadApiDomain,
55+
workloadApiPort: workloadApiPortExternal,
5056
warmStartUrl: this.warmStartUrl,
5157
});
5258
}
@@ -57,6 +63,8 @@ class ManagedSupervisor {
5763
instanceName: env.TRIGGER_WORKER_INSTANCE_NAME,
5864
managedWorkerSecret: env.MANAGED_WORKER_SECRET,
5965
dequeueIntervalMs: env.TRIGGER_DEQUEUE_INTERVAL_MS,
66+
queueConsumerEnabled: env.TRIGGER_DEQUEUE_ENABLED,
67+
runNotificationsEnabled: env.TRIGGER_WORKLOAD_API_ENABLED,
6068
preDequeue: async () => {
6169
if (this.isKubernetes) {
6270
// TODO: Test k8s resource monitor and remove this
@@ -180,7 +188,7 @@ class ManagedSupervisor {
180188

181189
// Responds to workload requests only
182190
this.workloadServer = new WorkloadServer({
183-
port: env.TRIGGER_WORKLOAD_API_PORT,
191+
port: env.TRIGGER_WORKLOAD_API_PORT_INTERNAL,
184192
workerClient: this.workerSession.httpClient,
185193
checkpointClient: this.checkpointClient,
186194
});
@@ -238,7 +246,17 @@ class ManagedSupervisor {
238246
async start() {
239247
this.logger.log("[ManagedWorker] Starting up");
240248

241-
await this.workloadServer.start();
249+
if (env.TRIGGER_WORKLOAD_API_ENABLED) {
250+
this.logger.log("[ManagedWorker] Workload API enabled", {
251+
protocol: env.TRIGGER_WORKLOAD_API_PROTOCOL,
252+
domain: env.TRIGGER_WORKLOAD_API_DOMAIN,
253+
port: env.TRIGGER_WORKLOAD_API_PORT_INTERNAL,
254+
});
255+
await this.workloadServer.start();
256+
} else {
257+
this.logger.warn("[ManagedWorker] Workload API disabled");
258+
}
259+
242260
await this.workerSession.start();
243261

244262
await this.httpServer.start();

Diff for: apps/supervisor/src/workloadManager/docker.ts

+11-3
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,18 @@ import {
66
} from "./types.js";
77
import { x } from "tinyexec";
88
import { env } from "../env.js";
9-
import { RunnerId } from "../util.js";
9+
import { getDockerHostDomain, RunnerId } from "../util.js";
1010

1111
export class DockerWorkloadManager implements WorkloadManager {
1212
private readonly logger = new SimpleStructuredLogger("docker-workload-provider");
1313

14-
constructor(private opts: WorkloadManagerOptions) {}
14+
constructor(private opts: WorkloadManagerOptions) {
15+
if (opts.workloadApiDomain) {
16+
this.logger.warn("[DockerWorkloadProvider] ⚠️ Custom workload API domain", {
17+
domain: opts.workloadApiDomain,
18+
});
19+
}
20+
}
1521

1622
async create(opts: WorkloadManagerCreateOptions) {
1723
this.logger.log("[DockerWorkloadProvider] Creating container", { opts });
@@ -24,7 +30,9 @@ export class DockerWorkloadManager implements WorkloadManager {
2430
`--env=TRIGGER_ENV_ID=${opts.envId}`,
2531
`--env=TRIGGER_RUN_ID=${opts.runFriendlyId}`,
2632
`--env=TRIGGER_SNAPSHOT_ID=${opts.snapshotFriendlyId}`,
27-
`--env=TRIGGER_WORKER_API_URL=${this.opts.workerApiUrl}`,
33+
`--env=TRIGGER_SUPERVISOR_API_PROTOCOL=${this.opts.workloadApiProtocol}`,
34+
`--env=TRIGGER_SUPERVISOR_API_PORT=${this.opts.workloadApiPort}`,
35+
`--env=TRIGGER_SUPERVISOR_API_DOMAIN=${this.opts.workloadApiDomain ?? getDockerHostDomain()}`,
2836
`--env=TRIGGER_WORKER_INSTANCE_NAME=${env.TRIGGER_WORKER_INSTANCE_NAME}`,
2937
`--env=OTEL_EXPORTER_OTLP_ENDPOINT=${env.OTEL_EXPORTER_OTLP_ENDPOINT}`,
3038
`--env=TRIGGER_RUNNER_ID=${runnerId}`,

Diff for: apps/supervisor/src/workloadManager/kubernetes.ts

+26-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ export class KubernetesWorkloadManager implements WorkloadManager {
2323

2424
constructor(private opts: WorkloadManagerOptions) {
2525
this.k8s = createK8sApi();
26+
27+
if (opts.workloadApiDomain) {
28+
this.logger.warn("[KubernetesWorkloadManager] ⚠️ Custom workload API domain", {
29+
domain: opts.workloadApiDomain,
30+
});
31+
}
2632
}
2733

2834
async create(opts: WorkloadManagerCreateOptions) {
@@ -72,8 +78,26 @@ export class KubernetesWorkloadManager implements WorkloadManager {
7278
value: opts.snapshotFriendlyId,
7379
},
7480
{
75-
name: "TRIGGER_WORKER_API_URL",
76-
value: this.opts.workerApiUrl,
81+
name: "TRIGGER_SUPERVISOR_API_PROTOCOL",
82+
value: this.opts.workloadApiProtocol,
83+
},
84+
{
85+
name: "TRIGGER_SUPERVISOR_API_PORT",
86+
value: `${this.opts.workloadApiPort}`,
87+
},
88+
{
89+
name: "TRIGGER_SUPERVISOR_API_DOMAIN",
90+
...(this.opts.workloadApiDomain
91+
? {
92+
value: this.opts.workloadApiDomain,
93+
}
94+
: {
95+
valueFrom: {
96+
fieldRef: {
97+
fieldPath: "status.hostIP",
98+
},
99+
},
100+
}),
77101
},
78102
{
79103
name: "TRIGGER_WORKER_INSTANCE_NAME",

Diff for: apps/supervisor/src/workloadManager/types.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import { type EnvironmentType, type MachinePreset } from "@trigger.dev/core/v3";
22

33
export interface WorkloadManagerOptions {
4-
workerApiUrl: string;
4+
workloadApiProtocol: "http" | "https";
5+
workloadApiDomain?: string; // If unset, will use orchestrator-specific default
6+
workloadApiPort: number;
57
warmStartUrl?: string;
68
}
79

Diff for: packages/cli-v3/src/entryPoints/managed-run-controller.ts

+9-4
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ const Env = z.object({
3636
NODE_EXTRA_CA_CERTS: z.string().optional(),
3737

3838
// Set at runtime
39-
TRIGGER_WORKER_API_URL: z.string().url(),
39+
TRIGGER_SUPERVISOR_API_PROTOCOL: z.enum(["http", "https"]),
40+
TRIGGER_SUPERVISOR_API_DOMAIN: z.string(),
41+
TRIGGER_SUPERVISOR_API_PORT: z.coerce.number(),
4042
TRIGGER_WORKLOAD_CONTROLLER_ID: z.string().default(`controller_${randomUUID()}`),
4143
TRIGGER_ENV_ID: z.string(),
4244
TRIGGER_RUN_ID: z.string().optional(), // This is only useful for cold starts
@@ -84,6 +86,8 @@ class ManagedRunController {
8486
private readonly snapshotPoller: HeartbeatService;
8587
private readonly snapshotPollIntervalSeconds: number;
8688

89+
private readonly workerApiUrl: string;
90+
8791
private state:
8892
| {
8993
phase: "RUN";
@@ -246,8 +250,10 @@ class ManagedRunController {
246250
this.heartbeatIntervalSeconds = opts.heartbeatIntervalSeconds || 30;
247251
this.snapshotPollIntervalSeconds = 5;
248252

253+
this.workerApiUrl = `${env.TRIGGER_SUPERVISOR_API_PROTOCOL}://${env.TRIGGER_SUPERVISOR_API_DOMAIN}:${env.TRIGGER_SUPERVISOR_API_PORT}`;
254+
249255
this.httpClient = new WorkloadHttpClient({
250-
workerApiUrl: env.TRIGGER_WORKER_API_URL,
256+
workerApiUrl: this.workerApiUrl,
251257
deploymentId: env.TRIGGER_DEPLOYMENT_ID,
252258
runnerId: env.TRIGGER_RUNNER_ID,
253259
});
@@ -746,8 +752,7 @@ class ManagedRunController {
746752
}
747753

748754
createSocket() {
749-
const wsUrl = new URL(env.TRIGGER_WORKER_API_URL);
750-
wsUrl.pathname = "/workload";
755+
const wsUrl = new URL("/workload", this.workerApiUrl);
751756

752757
this.socket = io(wsUrl.href, {
753758
transports: ["websocket"],

Diff for: packages/core/src/v3/runEngineWorker/supervisor/session.ts

+36-15
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import { getDefaultWorkerHeaders } from "./util.js";
1111
import { HeartbeatService } from "../../utils/heartbeat.js";
1212

1313
type SupervisorSessionOptions = SupervisorClientCommonOptions & {
14+
queueConsumerEnabled?: boolean;
15+
runNotificationsEnabled?: boolean;
1416
heartbeatIntervalSeconds?: number;
1517
dequeueIntervalMs?: number;
1618
preDequeue?: PreDequeueFn;
@@ -20,15 +22,21 @@ type SupervisorSessionOptions = SupervisorClientCommonOptions & {
2022
export class SupervisorSession extends EventEmitter<WorkerEvents> {
2123
public readonly httpClient: SupervisorHttpClient;
2224

23-
private socket?: Socket<WorkerServerToClientEvents, WorkerClientToServerEvents>;
25+
private readonly runNotificationsEnabled: boolean;
26+
private runNotificationsSocket?: Socket<WorkerServerToClientEvents, WorkerClientToServerEvents>;
2427

28+
private readonly queueConsumerEnabled: boolean;
2529
private readonly queueConsumer: RunQueueConsumer;
30+
2631
private readonly heartbeatService: HeartbeatService;
2732
private readonly heartbeatIntervalSeconds: number;
2833

2934
constructor(private opts: SupervisorSessionOptions) {
3035
super();
3136

37+
this.runNotificationsEnabled = opts.runNotificationsEnabled ?? true;
38+
this.queueConsumerEnabled = opts.queueConsumerEnabled ?? true;
39+
3240
this.httpClient = new SupervisorHttpClient(opts);
3341
this.queueConsumer = new RunQueueConsumer({
3442
client: this.httpClient,
@@ -76,12 +84,12 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
7684
subscribeToRunNotifications(runFriendlyIds: string[]) {
7785
console.log("[SupervisorSession] Subscribing to run notifications", { runFriendlyIds });
7886

79-
if (!this.socket) {
87+
if (!this.runNotificationsSocket) {
8088
console.error("[SupervisorSession] Socket not connected");
8189
return;
8290
}
8391

84-
this.socket.emit("run:subscribe", { version: "1", runFriendlyIds });
92+
this.runNotificationsSocket.emit("run:subscribe", { version: "1", runFriendlyIds });
8593

8694
Promise.allSettled(
8795
runFriendlyIds.map((runFriendlyId) =>
@@ -96,12 +104,12 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
96104
unsubscribeFromRunNotifications(runFriendlyIds: string[]) {
97105
console.log("[SupervisorSession] Unsubscribing from run notifications", { runFriendlyIds });
98106

99-
if (!this.socket) {
107+
if (!this.runNotificationsSocket) {
100108
console.error("[SupervisorSession] Socket not connected");
101109
return;
102110
}
103111

104-
this.socket.emit("run:unsubscribe", { version: "1", runFriendlyIds });
112+
this.runNotificationsSocket.emit("run:unsubscribe", { version: "1", runFriendlyIds });
105113

106114
Promise.allSettled(
107115
runFriendlyIds.map((runFriendlyId) =>
@@ -116,15 +124,15 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
116124
);
117125
}
118126

119-
private createSocket() {
127+
private createRunNotificationsSocket() {
120128
const wsUrl = new URL(this.opts.apiUrl);
121129
wsUrl.pathname = "/worker";
122130

123-
this.socket = io(wsUrl.href, {
131+
const socket = io(wsUrl.href, {
124132
transports: ["websocket"],
125133
extraHeaders: getDefaultWorkerHeaders(this.opts),
126134
});
127-
this.socket.on("run:notify", ({ version, run }) => {
135+
socket.on("run:notify", ({ version, run }) => {
128136
console.log("[SupervisorSession][WS] Received run notification", { version, run });
129137
this.emit("runNotification", { time: new Date(), run });
130138

@@ -137,15 +145,17 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
137145
console.error("[SupervisorSession] Failed to send debug log", { error });
138146
});
139147
});
140-
this.socket.on("connect", () => {
148+
socket.on("connect", () => {
141149
console.log("[SupervisorSession][WS] Connected to platform");
142150
});
143-
this.socket.on("connect_error", (error) => {
151+
socket.on("connect_error", (error) => {
144152
console.error("[SupervisorSession][WS] Connection error", { error });
145153
});
146-
this.socket.on("disconnect", (reason, description) => {
154+
socket.on("disconnect", (reason, description) => {
147155
console.log("[SupervisorSession][WS] Disconnected from platform", { reason, description });
148156
});
157+
158+
return socket;
149159
}
150160

151161
async start() {
@@ -167,14 +177,25 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
167177
name: workerGroup.name,
168178
});
169179

170-
this.queueConsumer.start();
171-
this.heartbeatService.start();
172-
this.createSocket();
180+
if (this.queueConsumerEnabled) {
181+
console.log("[SupervisorSession] Queue consumer enabled");
182+
this.queueConsumer.start();
183+
this.heartbeatService.start();
184+
} else {
185+
console.warn("[SupervisorSession] Queue consumer disabled");
186+
}
187+
188+
if (this.runNotificationsEnabled) {
189+
console.log("[SupervisorSession] Run notifications enabled");
190+
this.runNotificationsSocket = this.createRunNotificationsSocket();
191+
} else {
192+
console.warn("[SupervisorSession] Run notifications disabled");
193+
}
173194
}
174195

175196
async stop() {
176197
this.heartbeatService.stop();
177-
this.socket?.disconnect();
198+
this.runNotificationsSocket?.disconnect();
178199
}
179200

180201
private getHeartbeatBody(): WorkerApiHeartbeatRequestBody {

0 commit comments

Comments
 (0)