Skip to content

Commit 0b555fa

Browse files
authored
Force upgrade v1 batches to v3 (#1676)
* Force upgrade v1 batches to v3 * Actually lets process v1 batches async
1 parent dcf1ab6 commit 0b555fa

File tree

3 files changed

+61
-30
lines changed

3 files changed

+61
-30
lines changed

apps/webapp/app/routes/api.v1.tasks.$taskId.batch.ts

+36-8
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
import type { ActionFunctionArgs } from "@remix-run/server-runtime";
22
import { json } from "@remix-run/server-runtime";
3-
import { BatchTriggerTaskRequestBody } from "@trigger.dev/core/v3";
3+
import { BatchTriggerTaskRequestBody, BatchTriggerTaskV2RequestBody } from "@trigger.dev/core/v3";
44
import { z } from "zod";
5+
import { fromZodError } from "zod-validation-error";
56
import { MAX_BATCH_TRIGGER_ITEMS } from "~/consts";
7+
import { env } from "~/env.server";
68
import { authenticateApiRequest } from "~/services/apiAuth.server";
79
import { logger } from "~/services/logger.server";
8-
import { BatchTriggerTaskService } from "~/v3/services/batchTriggerTask.server";
10+
import { BatchTriggerV3Service } from "~/v3/services/batchTriggerV3.server";
911
import { HeadersSchema } from "./api.v1.tasks.$taskId.trigger";
10-
import { env } from "~/env.server";
11-
import { fromZodError } from "zod-validation-error";
1212

1313
const ParamsSchema = z.object({
1414
taskId: z.string(),
@@ -85,15 +85,17 @@ export async function action({ request, params }: ActionFunctionArgs) {
8585
);
8686
}
8787

88-
const service = new BatchTriggerTaskService();
88+
const service = new BatchTriggerV3Service();
8989

9090
const traceContext =
9191
traceparent && isFromWorker // If the request is from a worker, we should pass the trace context
9292
? { traceparent, tracestate }
9393
: undefined;
9494

95+
const v3Body = convertV1BodyToV2Body(body.data, taskId);
96+
9597
try {
96-
const result = await service.call(taskId, authenticationResult.environment, body.data, {
98+
const result = await service.call(authenticationResult.environment, v3Body, {
9799
idempotencyKey: idempotencyKey ?? undefined,
98100
triggerVersion: triggerVersion ?? undefined,
99101
traceContext,
@@ -106,8 +108,8 @@ export async function action({ request, params }: ActionFunctionArgs) {
106108

107109
return json(
108110
{
109-
batchId: result.batch.friendlyId,
110-
runs: result.runs,
111+
batchId: result.id,
112+
runs: result.runs.map((run) => run.id),
111113
},
112114
{
113115
headers: {
@@ -126,3 +128,29 @@ export async function action({ request, params }: ActionFunctionArgs) {
126128
return json({ error: "Something went wrong" }, { status: 500 });
127129
}
128130
}
131+
132+
// Strip from options:
133+
// - dependentBatch
134+
// - dependentAttempt
135+
// - parentBatch
136+
function convertV1BodyToV2Body(
137+
body: BatchTriggerTaskRequestBody,
138+
taskIdentifier: string
139+
): BatchTriggerTaskV2RequestBody {
140+
return {
141+
items: body.items.map((item) => ({
142+
task: taskIdentifier,
143+
payload: item.payload,
144+
context: item.context,
145+
options: item.options
146+
? {
147+
...item.options,
148+
dependentBatch: undefined,
149+
parentBatch: undefined,
150+
dependentAttempt: undefined,
151+
}
152+
: undefined,
153+
})),
154+
dependentAttempt: body.dependentAttempt,
155+
};
156+
}

apps/webapp/app/v3/services/batchTriggerV3.server.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -94,14 +94,17 @@ type RunItemData = {
9494
*/
9595
export class BatchTriggerV3Service extends BaseService {
9696
private _batchProcessingStrategy: BatchProcessingStrategy;
97+
private _asyncBatchProcessSizeThreshold: number;
9798

9899
constructor(
99100
batchProcessingStrategy?: BatchProcessingStrategy,
101+
asyncBatchProcessSizeThreshold: number = ASYNC_BATCH_PROCESS_SIZE_THRESHOLD,
100102
protected readonly _prisma: PrismaClientOrTransaction = prisma
101103
) {
102104
super(_prisma);
103105

104106
this._batchProcessingStrategy = batchProcessingStrategy ?? "parallel";
107+
this._asyncBatchProcessSizeThreshold = asyncBatchProcessSizeThreshold;
105108
}
106109

107110
public async call(
@@ -403,7 +406,7 @@ export class BatchTriggerV3Service extends BaseService {
403406
options: BatchTriggerTaskServiceOptions = {},
404407
dependentAttempt?: TaskRunAttempt
405408
) {
406-
if (runs.length <= ASYNC_BATCH_PROCESS_SIZE_THRESHOLD) {
409+
if (runs.length <= this._asyncBatchProcessSizeThreshold) {
407410
const batch = await this._prisma.batchTaskRun.create({
408411
data: {
409412
friendlyId: batchId,

packages/core/src/v3/schemas/api.ts

+21-21
Original file line numberDiff line numberDiff line change
@@ -74,25 +74,25 @@ export const TriggerTaskRequestBody = z.object({
7474
context: z.any(),
7575
options: z
7676
.object({
77+
concurrencyKey: z.string().optional(),
78+
delay: z.string().or(z.coerce.date()).optional(),
7779
dependentAttempt: z.string().optional(),
78-
parentAttempt: z.string().optional(),
7980
dependentBatch: z.string().optional(),
80-
parentBatch: z.string().optional(),
81-
lockToVersion: z.string().optional(),
82-
queue: QueueOptions.optional(),
83-
concurrencyKey: z.string().optional(),
8481
idempotencyKey: z.string().optional(),
8582
idempotencyKeyTTL: z.string().optional(),
86-
test: z.boolean().optional(),
87-
payloadType: z.string().optional(),
88-
delay: z.string().or(z.coerce.date()).optional(),
89-
ttl: z.string().or(z.number().nonnegative().int()).optional(),
90-
tags: RunTags.optional(),
83+
lockToVersion: z.string().optional(),
84+
machine: MachinePresetName.optional(),
9185
maxAttempts: z.number().int().optional(),
86+
maxDuration: z.number().optional(),
9287
metadata: z.any(),
9388
metadataType: z.string().optional(),
94-
maxDuration: z.number().optional(),
95-
machine: MachinePresetName.optional(),
89+
parentAttempt: z.string().optional(),
90+
parentBatch: z.string().optional(),
91+
payloadType: z.string().optional(),
92+
queue: QueueOptions.optional(),
93+
tags: RunTags.optional(),
94+
test: z.boolean().optional(),
95+
ttl: z.string().or(z.number().nonnegative().int()).optional(),
9696
})
9797
.optional(),
9898
});
@@ -118,22 +118,22 @@ export const BatchTriggerTaskItem = z.object({
118118
context: z.any(),
119119
options: z
120120
.object({
121-
lockToVersion: z.string().optional(),
122-
queue: QueueOptions.optional(),
123121
concurrencyKey: z.string().optional(),
122+
delay: z.string().or(z.coerce.date()).optional(),
124123
idempotencyKey: z.string().optional(),
125124
idempotencyKeyTTL: z.string().optional(),
126-
test: z.boolean().optional(),
127-
payloadType: z.string().optional(),
128-
delay: z.string().or(z.coerce.date()).optional(),
129-
ttl: z.string().or(z.number().nonnegative().int()).optional(),
130-
tags: RunTags.optional(),
125+
lockToVersion: z.string().optional(),
126+
machine: MachinePresetName.optional(),
131127
maxAttempts: z.number().int().optional(),
128+
maxDuration: z.number().optional(),
132129
metadata: z.any(),
133130
metadataType: z.string().optional(),
134-
maxDuration: z.number().optional(),
135131
parentAttempt: z.string().optional(),
136-
machine: MachinePresetName.optional(),
132+
payloadType: z.string().optional(),
133+
queue: QueueOptions.optional(),
134+
tags: RunTags.optional(),
135+
test: z.boolean().optional(),
136+
ttl: z.string().or(z.number().nonnegative().int()).optional(),
137137
})
138138
.optional(),
139139
});

0 commit comments

Comments
 (0)