Skip to content

Reduce contention on batchTaskRun when setting expected count #1662

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 88 additions & 63 deletions apps/webapp/app/v3/services/batchTriggerV3.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -760,10 +760,21 @@ export class BatchTriggerV3Service extends BaseService {
}));

let workingIndex = currentIndex;
let expectedCount = 0;

for (const item of itemsToProcess) {
try {
await this.#processBatchTaskRunItem(batch, environment, item, workingIndex, options);
const created = await this.#processBatchTaskRunItem(
batch,
environment,
item,
workingIndex,
options
);

if (created) {
expectedCount++;
}

workingIndex++;
} catch (error) {
Expand All @@ -780,6 +791,17 @@ export class BatchTriggerV3Service extends BaseService {
}
}

if (expectedCount > 0) {
await this._prisma.batchTaskRun.update({
where: { id: batch.id },
data: {
expectedCount: {
increment: expectedCount,
},
},
});
}

return { workingIndex };
}

Expand Down Expand Up @@ -825,21 +847,15 @@ export class BatchTriggerV3Service extends BaseService {

if (!result.isCached) {
try {
await $transaction(this._prisma, async (tx) => {
// [batchTaskRunId, taskRunId] is a unique index
await tx.batchTaskRunItem.create({
data: {
batchTaskRunId: batch.id,
taskRunId: result.run.id,
status: batchTaskRunItemStatusForRunStatus(result.run.status),
},
});

await tx.batchTaskRun.update({
where: { id: batch.id },
data: { expectedCount: { increment: 1 } },
});
await this._prisma.batchTaskRunItem.create({
data: {
batchTaskRunId: batch.id,
taskRunId: result.run.id,
status: batchTaskRunItemStatusForRunStatus(result.run.status),
},
});

return true;
} catch (error) {
if (isUniqueConstraintError(error, ["batchTaskRunId", "taskRunId"])) {
// This means there is already a batchTaskRunItem for this batch and taskRun
Expand All @@ -852,12 +868,14 @@ export class BatchTriggerV3Service extends BaseService {
}
);

return;
return false;
}

throw error;
}
}

return false;
}

async #enqueueBatchTaskRun(options: BatchProcessingOptions, tx?: PrismaClientOrTransaction) {
Expand Down Expand Up @@ -907,62 +925,69 @@ export async function completeBatchTaskRunItemV3(
scheduleResumeOnComplete = false,
taskRunAttemptId?: string
) {
await $transaction(tx, "completeBatchTaskRunItemV3", async (tx, span) => {
span?.setAttribute("batch_id", batchTaskRunId);

// Update the item to complete
const updated = await tx.batchTaskRunItem.updateMany({
where: {
id: itemId,
status: "PENDING",
},
data: {
status: "COMPLETED",
taskRunAttemptId,
},
});

if (updated.count === 0) {
return;
}

const updatedBatchRun = await tx.batchTaskRun.update({
where: {
id: batchTaskRunId,
},
data: {
completedCount: {
increment: 1,
await $transaction(
tx,
"completeBatchTaskRunItemV3",
async (tx, span) => {
span?.setAttribute("batch_id", batchTaskRunId);

// Update the item to complete
const updated = await tx.batchTaskRunItem.updateMany({
where: {
id: itemId,
status: "PENDING",
},
},
select: {
sealed: true,
status: true,
completedCount: true,
expectedCount: true,
dependentTaskAttemptId: true,
},
});
data: {
status: "COMPLETED",
taskRunAttemptId,
},
});

if (updated.count === 0) {
return;
}

if (
updatedBatchRun.status === "PENDING" &&
updatedBatchRun.completedCount === updatedBatchRun.expectedCount &&
updatedBatchRun.sealed
) {
await tx.batchTaskRun.update({
const updatedBatchRun = await tx.batchTaskRun.update({
where: {
id: batchTaskRunId,
},
data: {
status: "COMPLETED",
completedAt: new Date(),
completedCount: {
increment: 1,
},
},
select: {
sealed: true,
status: true,
completedCount: true,
expectedCount: true,
dependentTaskAttemptId: true,
},
});

// We only need to resume the batch if it has a dependent task attempt ID
if (scheduleResumeOnComplete && updatedBatchRun.dependentTaskAttemptId) {
await ResumeBatchRunService.enqueue(batchTaskRunId, true, tx);
if (
updatedBatchRun.status === "PENDING" &&
updatedBatchRun.completedCount === updatedBatchRun.expectedCount &&
updatedBatchRun.sealed
) {
await tx.batchTaskRun.update({
where: {
id: batchTaskRunId,
},
data: {
status: "COMPLETED",
completedAt: new Date(),
},
});

// We only need to resume the batch if it has a dependent task attempt ID
if (scheduleResumeOnComplete && updatedBatchRun.dependentTaskAttemptId) {
await ResumeBatchRunService.enqueue(batchTaskRunId, true, tx);
}
}
},
{
timeout: 10000,
}
});
);
}
18 changes: 17 additions & 1 deletion internal-packages/database/src/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,21 @@ export type PrismaTransactionOptions = {
isolationLevel?: Prisma.TransactionIsolationLevel;

swallowPrismaErrors?: boolean;

/**
* The maximum number of times the transaction will be retried in case of a serialization failure. The default value is 0.
*
* See https://www.prisma.io/docs/orm/prisma-client/queries/transactions#transaction-timing-issues
*/
maxRetries?: number;
};

export async function $transaction<R>(
prisma: PrismaClientOrTransaction,
fn: (prisma: PrismaTransactionClient) => Promise<R>,
prismaError: (error: Prisma.PrismaClientKnownRequestError) => void,
options?: PrismaTransactionOptions
options?: PrismaTransactionOptions,
attempt = 0
): Promise<R | undefined> {
if (isTransactionClient(prisma)) {
return fn(prisma);
Expand All @@ -46,6 +54,14 @@ export async function $transaction<R>(
return await (prisma as PrismaClient).$transaction(fn, options);
} catch (error) {
if (isPrismaKnownError(error)) {
if (
error.code === "P2034" &&
typeof options?.maxRetries === "number" &&
attempt < options.maxRetries
) {
return $transaction(prisma, fn, prismaError, options, attempt + 1);
}

prismaError(error);

if (options?.swallowPrismaErrors) {
Expand Down