Skip to content

Associate child runs with the parent span ID #1352

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/short-tomatoes-beam.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Add otel propagation headers "below" the API fetch span, to attribute the child runs with the proper parent span ID
5 changes: 3 additions & 2 deletions apps/webapp/app/v3/eventRepository.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,8 @@ export class EventRepository {
options: TraceEventOptions & { incomplete?: boolean },
callback: (
e: EventBuilder,
traceContext: Record<string, string | undefined>
traceContext: Record<string, string | undefined>,
traceparent?: { traceId: string; spanId: string }
) => Promise<TResult>
): Promise<TResult> {
const propagatedContext = extractContextFromCarrier(options.context ?? {});
Expand Down Expand Up @@ -892,7 +893,7 @@ export class EventRepository {
},
};

const result = await callback(eventBuilder, traceContext);
const result = await callback(eventBuilder, traceContext, propagatedContext?.traceparent);

const duration = process.hrtime.bigint() - start;

Expand Down
4 changes: 3 additions & 1 deletion apps/webapp/app/v3/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ export class TriggerTaskService extends BaseService {
incomplete: true,
immediate: true,
},
async (event, traceContext) => {
async (event, traceContext, traceparent) => {
const run = await autoIncrementCounter.incrementInTransaction(
`v3-run:${environment.id}:${taskId}`,
async (num, tx) => {
Expand Down Expand Up @@ -307,6 +307,8 @@ export class TriggerTaskService extends BaseService {
traceContext: traceContext,
traceId: event.traceId,
spanId: event.spanId,
parentSpanId:
options.parentAsLinkType === "replay" ? undefined : traceparent?.spanId,
lockedToVersionId: lockedToBackgroundWorker?.id,
concurrencyKey: body.options?.concurrencyKey,
queue: queueName,
Expand Down
6 changes: 3 additions & 3 deletions packages/cli-v3/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ function validateConfig(config: TriggerConfig, warn = true) {
if (config.additionalFiles && config.additionalFiles.length > 0) {
warn &&
prettyWarning(
`The "additionalFiles" option is deprecated and will be removed. Use the "additionalFiles" build extension instead. See https://trigger.dev/docs/guides/new-build-system-preview#additionalfiles for more information.`
`The "additionalFiles" option is deprecated and will be removed. Use the "additionalFiles" build extension instead. See https://trigger.dev/docs/config/config-file#additionalfiles for more information.`
);

config.build ??= {};
Expand All @@ -239,7 +239,7 @@ function validateConfig(config: TriggerConfig, warn = true) {
if (config.additionalPackages && config.additionalPackages.length > 0) {
warn &&
prettyWarning(
`The "additionalPackages" option is deprecated and will be removed. Use the "additionalPackages" build extension instead. See https://trigger.dev/docs/guides/new-build-system-preview#additionalpackages for more information.`
`The "additionalPackages" option is deprecated and will be removed. Use the "additionalPackages" build extension instead. See https://trigger.dev/docs/config/config-file#additionalpackages for more information.`
);

config.build ??= {};
Expand Down Expand Up @@ -275,7 +275,7 @@ function validateConfig(config: TriggerConfig, warn = true) {
if ("resolveEnvVars" in config && typeof config.resolveEnvVars === "function") {
warn &&
prettyWarning(
`The "resolveEnvVars" option is deprecated and will be removed. Use the "syncEnvVars" build extension instead. See https://trigger.dev/docs/guides/new-build-system-preview#resolveenvvars for more information.`
`The "resolveEnvVars" option is deprecated and will be removed. Use the "syncEnvVars" build extension instead. See https://trigger.dev/docs/config/config-file#syncenvvars for more information.`
);

const resolveEnvVarsFn = config.resolveEnvVars as ResolveEnvironmentVariablesFunction;
Expand Down
26 changes: 24 additions & 2 deletions packages/core/src/v3/apiClient/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { RetryOptions } from "../schemas/index.js";
import { calculateNextRetryDelay } from "../utils/retries.js";
import { ApiConnectionError, ApiError, ApiSchemaValidationError } from "./errors.js";

import { Attributes, Span } from "@opentelemetry/api";
import { Attributes, Span, context, propagation } from "@opentelemetry/api";
import { SemanticInternalAttributes } from "../semanticInternalAttributes.js";
import { TriggerTracer } from "../tracer.js";
import { accessoryAttributes } from "../utils/styleAttributes.js";
Expand Down Expand Up @@ -184,9 +184,11 @@ async function _doZodFetch<TResponseBodySchema extends z.ZodTypeAny>(
requestInit?: PromiseOrValue<RequestInit>,
options?: ZodFetchOptions
): Promise<ZodFetchResult<z.output<TResponseBodySchema>>> {
const $requestInit = await requestInit;
let $requestInit = await requestInit;

return traceZodFetch({ url, requestInit: $requestInit, options }, async (span) => {
$requestInit = injectPropagationHeadersIfInWorker($requestInit);

const result = await _doZodFetchWithRetries(schema, url, $requestInit, options);

if (options?.onResponseBody && span) {
Expand Down Expand Up @@ -577,3 +579,23 @@ export function isEmptyObj(obj: Object | null | undefined): boolean {
export function hasOwn(obj: Object, key: string): boolean {
return Object.prototype.hasOwnProperty.call(obj, key);
}

// If the requestInit has a header x-trigger-worker = true, then we will do
// propagation.inject(context.active(), headers);
// and return the new requestInit.
function injectPropagationHeadersIfInWorker(requestInit?: RequestInit): RequestInit | undefined {
const headers = new Headers(requestInit?.headers);

if (headers.get("x-trigger-worker") !== "true") {
return requestInit;
}

const headersObject = Object.fromEntries(headers.entries());

propagation.inject(context.active(), headersObject);

return {
...requestInit,
headers: new Headers(headersObject),
};
}
Comment on lines +586 to +601
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Simplify context propagation by injecting directly into Headers

Instead of converting Headers to an object and back, you can inject directly into the Headers instance. This simplifies the code and avoids potential issues with header case sensitivity.

Apply this diff to inject directly into the Headers object:

function injectPropagationHeadersIfInWorker(requestInit?: RequestInit): RequestInit | undefined {
  const headers = new Headers(requestInit?.headers);

  if (headers.get("x-trigger-worker") !== "true") {
    return requestInit;
  }

- const headersObject = Object.fromEntries(headers.entries());
- propagation.inject(context.active(), headersObject);
+ propagation.inject(context.active(), headers);

  return {
    ...requestInit,
-   headers: new Headers(headersObject),
+   headers,
  };
}

By injecting directly into headers, you leverage the built-in behavior of propagation.inject when working with Headers objects.

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
function injectPropagationHeadersIfInWorker(requestInit?: RequestInit): RequestInit | undefined {
const headers = new Headers(requestInit?.headers);
if (headers.get("x-trigger-worker") !== "true") {
return requestInit;
}
const headersObject = Object.fromEntries(headers.entries());
propagation.inject(context.active(), headersObject);
return {
...requestInit,
headers: new Headers(headersObject),
};
}
function injectPropagationHeadersIfInWorker(requestInit?: RequestInit): RequestInit | undefined {
const headers = new Headers(requestInit?.headers);
if (headers.get("x-trigger-worker") !== "true") {
return requestInit;
}
propagation.inject(context.active(), headers);
return {
...requestInit,
headers,
};
}

2 changes: 0 additions & 2 deletions packages/core/src/v3/apiClient/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { context, propagation } from "@opentelemetry/api";
import { z } from "zod";
import {
AddTagsRequestBody,
Expand Down Expand Up @@ -509,7 +508,6 @@ export class ApiClient {
// Only inject the context if we are inside a task
if (taskContext.isInsideTask) {
headers["x-trigger-worker"] = "true";
propagation.inject(context.active(), headers);

if (spanParentAsLink) {
headers["x-trigger-span-parent-as-link"] = "1";
Expand Down
38 changes: 38 additions & 0 deletions packages/database/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
## @trigger.dev/database

This is the internal database package for the Trigger.dev project. It exports a generated prisma client that can be instantiated with a connection string.

### How to add a new index on a large table

1. Modify the Prisma.schema with a single index change (no other changes, just one index at a time)
2. Create a Prisma migration using `cd packages/database && pnpm run db:migrate:dev --create-only`
3. Modify the SQL file: add IF NOT EXISTS to it and CONCURRENTLY:

```sql
CREATE INDEX CONCURRENTLY IF NOT EXISTS "JobRun_eventId_idx" ON "JobRun" ("eventId");
```

4. Don’t apply the Prisma migration locally yet. This is a good opportunity to test the flow.
5. Manually apply the index to your database, by running the index command.
6. Then locally run `pnpm run db:migrate:deploy`

#### Before deploying

Run the index creation before deploying

```sql
CREATE INDEX CONCURRENTLY IF NOT EXISTS "JobRun_eventId_idx" ON "JobRun" ("eventId");
```

These commands are useful:

```sql
-- creates an index safely, this can take a long time (2 mins maybe)
CREATE INDEX CONCURRENTLY IF NOT EXISTS "JobRun_eventId_idx" ON "JobRun" ("eventId");
-- checks the status of an index
SELECT * FROM pg_stat_progress_create_index WHERE relid = '"JobRun"'::regclass;
-- checks if the index is there
SELECT * FROM pg_indexes WHERE tablename = 'JobRun' AND indexname = 'JobRun_eventId_idx';
```

Now, when you deploy and prisma runs the migration, it will skip the index creation because it already exists. If you don't do this, there will be pain.
Comment on lines +1 to +38
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Inconsistency in index naming found in documentation and migrations.

The verification script revealed multiple instances of the unexpected index name "JobRun_eventId_idx" in the following files:

  • packages/database/README.md
  • packages/database/prisma/migrations/20240916155127_added_job_run_event_id_index/migration.sql

To maintain consistency with the PR objectives, please update these occurrences to "TaskRun_rootTaskRunId_idx".

Analysis chain

Overall, excellent documentation for database index management.

This README provides comprehensive and well-structured guidance for managing database indexes in the @trigger.dev/database package. The instructions are clear, follow best practices, and align well with the PR objectives for database optimization.

To ensure consistency throughout the documentation and codebase, please run the following script to verify the correct index name usage:

After running this script, ensure that all occurrences of "JobRun_eventId_idx" are replaced with "TaskRun_rootTaskRunId_idx" to maintain consistency with the PR objectives.

Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the correct index name is used consistently.

# Test: Search for both index names. Expect: Only occurrences of "TaskRun_rootTaskRunId_idx".
echo "Searching for 'TaskRun_rootTaskRunId_idx' (expected):"
rg --type sql --type md "TaskRun_rootTaskRunId_idx"

echo "\nSearching for 'JobRun_eventId_idx' (unexpected):"
rg --type sql --type md "JobRun_eventId_idx"

Length of output: 1167

Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- CreateIndex
CREATE INDEX CONCURRENTLY IF NOT EXISTS "TaskRun_rootTaskRunId_idx" ON "TaskRun"("rootTaskRunId");
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "TaskRun" ADD COLUMN "parentSpanId" TEXT;
5 changes: 5 additions & 0 deletions packages/database/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -1748,9 +1748,14 @@ model TaskRun {
/// The depth of this task run in the task run hierarchy
depth Int @default(0)

/// The span ID of the "trigger" span in the parent task run
parentSpanId String?

@@unique([runtimeEnvironmentId, taskIdentifier, idempotencyKey])
// Finding child runs
@@index([parentTaskRunId])
// Finding ancestor runs
@@index([rootTaskRunId])
// Task activity graph
@@index([projectId, createdAt, taskIdentifier])
//Runs list
Expand Down
Loading