Skip to content

re2: env based queue selection algo #1775

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 7, 2025
14 changes: 0 additions & 14 deletions .eslintrc.js

This file was deleted.

8 changes: 8 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@
"command": "pnpm run test --filter @internal/run-engine",
"cwd": "${workspaceFolder}",
"sourceMaps": true
},
{
"type": "node-terminal",
"request": "launch",
"name": "Debug RunQueue tests",
"command": "pnpm run test ./src/engine/tests/waitpoints.test.ts",
"cwd": "${workspaceFolder}/internal-packages/run-engine",
"sourceMaps": true
}
]
}
4 changes: 0 additions & 4 deletions apps/supervisor/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,5 @@
"compilerOptions": {
"rootDir": "src",
"outDir": "dist"
},
"paths": {
"@trigger.dev/core/v3": ["../../packages/core/src/v3"],
"@trigger.dev/core/v3/*": ["../../packages/core/src/v3/*"]
}
}
7 changes: 1 addition & 6 deletions apps/webapp/.eslintrc
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
{
"plugins": [
"@trigger.dev/eslint-plugin",
"react-hooks",
"@typescript-eslint/eslint-plugin",
"import"
],
"plugins": ["react-hooks", "@typescript-eslint/eslint-plugin", "import"],
"parser": "@typescript-eslint/parser",
"overrides": [
{
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/components/run/TriggerDetail.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
RunPanelIconSection,
RunPanelProperties,
} from "./RunCard";
import { DisplayProperty } from "@trigger.dev/core";
import type { DisplayProperty } from "@trigger.dev/core";

export function TriggerDetail({
trigger,
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/entry.server.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
OperatingSystemContextProvider,
OperatingSystemPlatform,
} from "./components/primitives/OperatingSystemProvider";
import { getSharedSqsEventConsumer } from "./services/events/sqsEventConsumer";
import { getSharedSqsEventConsumer } from "./services/events/sqsEventConsumer.server";
import { singleton } from "./utils/singleton";

const ABORT_DELAY = 30000;
Expand Down
6 changes: 6 additions & 0 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,12 @@ const EnvironmentSchema = z.object({
RUN_ENGINE_TIMEOUT_EXECUTING: z.coerce.number().int().default(60_000),
RUN_ENGINE_TIMEOUT_EXECUTING_WITH_WAITPOINTS: z.coerce.number().int().default(60_000),
RUN_ENGINE_DEBUG_WORKER_NOTIFICATIONS: z.coerce.boolean().default(false),
RUN_ENGINE_PARENT_QUEUE_LIMIT: z.coerce.number().int().default(1000),
RUN_ENGINE_CONCURRENCY_LIMIT_BIAS: z.coerce.number().default(0.75),
RUN_ENGINE_AVAILABLE_CAPACITY_BIAS: z.coerce.number().default(0.3),
RUN_ENGINE_QUEUE_AGE_RANDOMIZATION_BIAS: z.coerce.number().default(0.25),
RUN_ENGINE_REUSE_SNAPSHOT_COUNT: z.coerce.number().int().default(0),
RUN_ENGINE_MAXIMUM_ENV_COUNT: z.coerce.number().int().optional(),

RUN_ENGINE_WORKER_REDIS_HOST: z
.string()
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/hooks/useSyncTraceRuns.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Prettify } from "@trigger.dev/core";
import type { Prettify } from "@trigger.dev/core";
import { TaskRun } from "@trigger.dev/database";
import { SyncedShapeData, useSyncedShape } from "./useSyncedShape";

Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/presenters/HttpEndpointPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { z } from "zod";
import { PrismaClient, prisma } from "~/db.server";
import { sortEnvironments } from "~/utils/environmentSort";
import { httpEndpointUrl } from "~/services/httpendpoint/HandleHttpEndpointService";
import { httpEndpointUrl } from "~/services/httpendpoint/HandleHttpEndpointService.server";
import { getSecretStore } from "~/services/secrets/secretStore.server";
import { projectPath } from "~/utils/pathBuilder";

Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/presenters/v3/SpanPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { eventRepository } from "~/v3/eventRepository.server";
import { machinePresetFromName } from "~/v3/machinePresets.server";
import { FINAL_ATTEMPT_STATUSES, isFailedRunStatus, isFinalRunStatus } from "~/v3/taskStatus";
import { BasePresenter } from "./basePresenter.server";
import { getMaxDuration } from "@trigger.dev/core/v3/apps";
import { getMaxDuration } from "@trigger.dev/core/v3/isomorphic";
import { logger } from "~/services/logger.server";
import { getTaskEventStoreTableForRun, TaskEventStoreTable } from "~/v3/taskEventStore.server";
import { Pi } from "lucide-react";
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/presenters/v3/TaskListPresenter.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import { logger } from "~/services/logger.server";
import { BasePresenter } from "./basePresenter.server";
import { TaskRunStatus } from "~/database-types";
import { concurrencyTracker } from "~/v3/services/taskRunConcurrencyTracker.server";
import { CURRENT_DEPLOYMENT_LABEL } from "@trigger.dev/core/v3/apps";
import { CURRENT_DEPLOYMENT_LABEL } from "@trigger.dev/core/v3/isomorphic";

export type Task = {
slug: string;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { RegisterScheduleBodySchema, RegisterScheduleResponseBodySchema } from "@trigger.dev/core";
import {
RegisterScheduleBodySchema,
RegisterScheduleResponseBodySchema,
} from "@trigger.dev/core/schemas";
import { z } from "zod";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/routes/api.v1.$endpointSlug.sources.$id.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { UpdateTriggerSourceBodyV1Schema } from "@trigger.dev/core";
import { UpdateTriggerSourceBodyV1Schema } from "@trigger.dev/core/schemas";
import { z } from "zod";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { RegisterTriggerBodySchemaV1 } from "@trigger.dev/core";
import { RegisterTriggerBodySchemaV1 } from "@trigger.dev/core/schemas";
import { z } from "zod";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { InitializeTriggerBodySchema } from "@trigger.dev/core";
import { InitializeTriggerBodySchema } from "@trigger.dev/core/schemas";
import { z } from "zod";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { CreateExternalConnectionBodySchema, ErrorWithStackSchema } from "@trigger.dev/core";
import {
CreateExternalConnectionBodySchema,
ErrorWithStackSchema,
} from "@trigger.dev/core/schemas";
import { z } from "zod";
import { generateErrorMessage } from "zod-error";
import { authenticateApiRequest } from "~/services/apiAuth.server";
Expand Down
5 changes: 4 additions & 1 deletion apps/webapp/app/routes/api.v1.endpointindex.$indexId.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import { GetEndpointIndexResponse, GetEndpointIndexResponseSchema } from "@trigger.dev/core";
import {
GetEndpointIndexResponse,
GetEndpointIndexResponseSchema,
} from "@trigger.dev/core/schemas";
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
import { z } from "zod";
import { prisma } from "~/db.server";
Expand Down
7 changes: 1 addition & 6 deletions apps/webapp/app/routes/api.v1.event-dispatchers.ephemeral.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import {
EphemeralEventDispatcherRequestBodySchema,
InvokeJobRequestBodySchema,
} from "@trigger.dev/core";
import { z } from "zod";
import { EphemeralEventDispatcherRequestBodySchema } from "@trigger.dev/core/schemas";
import { PrismaErrorSchema } from "~/db.server";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { CreateEphemeralEventDispatcherService } from "~/services/dispatchers/createEphemeralEventDispatcher.server";
import { InvokeJobService } from "~/services/jobs/invokeJob.server";
import { logger } from "~/services/logger.server";

export async function action({ request, params }: ActionFunctionArgs) {
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/routes/api.v1.events.$eventId.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { LoaderFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { GetEvent } from "@trigger.dev/core";
import { GetEvent } from "@trigger.dev/core/schemas";
import { z } from "zod";
import { prisma } from "~/db.server";
import { runOriginalStatus } from "~/models/jobRun.server";
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/routes/api.v1.events.bulk.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { SendBulkEventsBodySchema } from "@trigger.dev/core";
import { SendBulkEventsBodySchema } from "@trigger.dev/core/schemas";
import { generateErrorMessage } from "zod-error";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { IngestSendEvent } from "~/services/events/ingestSendEvent.server";
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/routes/api.v1.events.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { SendEventBodySchema } from "@trigger.dev/core";
import { SendEventBodySchema } from "@trigger.dev/core/schemas";
import { generateErrorMessage } from "zod-error";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { IngestSendEvent } from "~/services/events/ingestSendEvent.server";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import type { ActionFunctionArgs, LoaderFunctionArgs } from "@remix-run/server-r
import {
HandleHttpEndpointService,
HttpEndpointParamsSchema,
} from "~/services/httpendpoint/HandleHttpEndpointService";
} from "~/services/httpendpoint/HandleHttpEndpointService.server";
import { logger } from "~/services/logger.server";

export async function action({ request, params }: ActionFunctionArgs) {
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/routes/api.v1.jobs.$jobSlug.invoke.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { InvokeJobRequestBodySchema } from "@trigger.dev/core";
import { InvokeJobRequestBodySchema } from "@trigger.dev/core/schemas";
import { z } from "zod";
import { PrismaErrorSchema } from "~/db.server";
import { authenticateApiRequest } from "~/services/apiAuth.server";
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/routes/api.v1.runs.$runId.logs/route.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { LogMessageSchema } from "@trigger.dev/core";
import { LogMessageSchema } from "@trigger.dev/core/schemas";
import { z } from "zod";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { CreateRunLogService } from "./CreateRunLogService.server";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { JobRunStatusRecordSchema, StatusUpdateSchema } from "@trigger.dev/core";
import { JobRunStatusRecordSchema, StatusUpdateSchema } from "@trigger.dev/core/schemas";
import { z } from "zod";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/routes/api.v1.runs.$runId.statuses.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { LoaderFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { JobRunStatusRecordSchema } from "@trigger.dev/core";
import { JobRunStatusRecordSchema } from "@trigger.dev/core/schemas";
import { z } from "zod";
import { prisma } from "~/db.server";
import { runOriginalStatus } from "~/models/jobRun.server";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import type { CompleteTaskBodyOutput } from "@trigger.dev/core";
import type { CompleteTaskBodyOutput } from "@trigger.dev/core/schemas";
import {
API_VERSIONS,
CompleteTaskBodyInputSchema,
CompleteTaskBodyV2InputSchema,
} from "@trigger.dev/core";
} from "@trigger.dev/core/schemas";
import { z } from "zod";
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { authenticateApiRequest } from "~/services/apiAuth.server";
Expand All @@ -14,6 +13,11 @@ import { startActiveSpan } from "~/v3/tracer.server";
import { parseRequestJsonAsync } from "~/utils/parseRequestJson.server";
import { FailRunTaskService } from "../api.v1.runs.$runId.tasks.$id.fail/FailRunTaskService.server";

const API_VERSIONS = {
LAZY_LOADED_CACHED_TASKS: "2023-09-29",
SERIALIZED_TASK_OUTPUT: "2023-11-01",
};

const ParamsSchema = z.object({
runId: z.string(),
id: z.string(),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { FailTaskBodyInputSchema } from "@trigger.dev/core";
import { FailTaskBodyInputSchema } from "@trigger.dev/core/schemas";
import { z } from "zod";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
Expand Down
7 changes: 6 additions & 1 deletion apps/webapp/app/routes/api.v1.runs.$runId.tasks/route.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { API_VERSIONS, RunTaskBodyOutputSchema } from "@trigger.dev/core";
import { RunTaskBodyOutputSchema } from "@trigger.dev/core/schemas";
import { z } from "zod";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
Expand All @@ -9,6 +9,11 @@ import { ChangeRequestLazyLoadedCachedTasks } from "./ChangeRequestLazyLoadedCac
import { startActiveSpan } from "~/v3/tracer.server";
import { parseRequestJsonAsync } from "~/utils/parseRequestJson.server";

const API_VERSIONS = {
LAZY_LOADED_CACHED_TASKS: "2023-09-29",
SERIALIZED_TASK_OUTPUT: "2023-11-01",
};

const ParamsSchema = z.object({
runId: z.string(),
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import {
conditionallyExportPacket,
stringifyIO,
} from "@trigger.dev/core/v3";
import { WaitpointId } from "@trigger.dev/core/v3/apps";
import { WaitpointId } from "@trigger.dev/core/v3/isomorphic";
import { z } from "zod";
import { $replica } from "~/db.server";
import { env } from "~/env.server";
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/routes/api.v1.waitpoints.tokens.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
CreateWaitpointTokenRequestBody,
CreateWaitpointTokenResponseBody,
} from "@trigger.dev/core/v3";
import { WaitpointId } from "@trigger.dev/core/v3/apps";
import { WaitpointId } from "@trigger.dev/core/v3/isomorphic";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { parseDelay } from "~/utils/delays";
import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server";
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/routes/api.v1.webhooks.$key.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { UpdateWebhookBodySchema } from "@trigger.dev/core";
import { UpdateWebhookBodySchema } from "@trigger.dev/core/schemas";
import { z } from "zod";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/routes/api.v2.$endpointSlug.sources.$id.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { UpdateTriggerSourceBodyV2Schema } from "@trigger.dev/core";
import { UpdateTriggerSourceBodyV2Schema } from "@trigger.dev/core/schemas";
import { z } from "zod";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
REGISTER_SOURCE_EVENT_V2,
RegisterSourceEventV2,
RegisterTriggerBodySchemaV2,
} from "@trigger.dev/core";
} from "@trigger.dev/core/schemas";
import { z } from "zod";
import { authenticateApiRequest } from "~/services/apiAuth.server";
import { IngestSendEvent } from "~/services/events/ingestSendEvent.server";
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/routes/api.v2.events.$eventId.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { LoaderFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { GetEvent } from "@trigger.dev/core";
import { GetEvent } from "@trigger.dev/core/schemas";
import { z } from "zod";
import { $replica } from "~/db.server";
import { authenticateApiRequest } from "~/services/apiAuth.server";
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/routes/api.v2.runs.$runId.statuses.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { LoaderFunctionArgs } from "@remix-run/server-runtime";
import { json } from "@remix-run/server-runtime";
import { JobRunStatusRecordSchema } from "@trigger.dev/core";
import { JobRunStatusRecordSchema } from "@trigger.dev/core/schemas";
import { z } from "zod";
import { prisma } from "~/db.server";
import { authenticateApiRequest } from "~/services/apiAuth.server";
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/routes/engine.v1.dev.dequeue.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { json } from "@remix-run/server-runtime";
import { DequeuedMessage, DevDequeueRequestBody, MachineResources } from "@trigger.dev/core/v3";
import { BackgroundWorkerId } from "@trigger.dev/core/v3/apps";
import { BackgroundWorkerId } from "@trigger.dev/core/v3/isomorphic";
import { env } from "~/env.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { engine } from "~/v3/runEngine.server";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { TypedResponse } from "@remix-run/server-runtime";
import { assertExhaustive } from "@trigger.dev/core";
import { RunId } from "@trigger.dev/core/v3/apps";
import { assertExhaustive } from "@trigger.dev/core/utils";
import { RunId } from "@trigger.dev/core/v3/isomorphic";
import {
WorkerApiDebugLogBody,
WorkerApiRunAttemptStartResponseBody,
Expand Down
Loading