From 1567e1b79c5dc81f70c062424161e31b41294cef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Loren=20=F0=9F=A4=93?= Date: Fri, 11 Mar 2022 19:41:15 -0500 Subject: [PATCH 1/3] feat(client)!: Return a friendly type from handle.describe() --- packages/client/src/types.ts | 16 +++++ packages/client/src/workflow-client.ts | 23 ++++++- packages/internal-workflow-common/src/time.ts | 11 ++++ packages/test/src/integration-tests.ts | 63 +++++++++---------- 4 files changed, 78 insertions(+), 35 deletions(-) diff --git a/packages/client/src/types.ts b/packages/client/src/types.ts index cc8c4e31c..7def01aa8 100644 --- a/packages/client/src/types.ts +++ b/packages/client/src/types.ts @@ -10,3 +10,19 @@ export type DescribeWorkflowExecutionResponse = temporal.api.workflowservice.v1. export type TerminateWorkflowExecutionResponse = temporal.api.workflowservice.v1.ITerminateWorkflowExecutionResponse; export type RequestCancelWorkflowExecutionResponse = temporal.api.workflowservice.v1.IRequestCancelWorkflowExecutionResponse; + +export interface WorkflowExecutionDescription { + type: string; + workflowId: string; + runId?: string; + taskQueue: string; + status: temporal.api.enums.v1.WorkflowExecutionStatus; + historyLength: Long; + startTime: Date; + executionTime?: Date; + closeTime?: Date; + memo?: temporal.api.common.v1.IMemo; + searchAttributes?: temporal.api.common.v1.ISearchAttributes; + parentExecution?: temporal.api.common.v1.IWorkflowExecution; + raw: DescribeWorkflowExecutionResponse; +} diff --git a/packages/client/src/workflow-client.ts b/packages/client/src/workflow-client.ts index 842cba2a0..c75c6498b 100644 --- a/packages/client/src/workflow-client.ts +++ b/packages/client/src/workflow-client.ts @@ -21,8 +21,10 @@ import { BaseWorkflowHandle, compileRetryPolicy, composeInterceptors, + optionalTsToDate, QueryDefinition, SignalDefinition, + tsToDate, WithWorkflowArgs, Workflow, WorkflowNotFoundError, @@ -57,6 +59,7 @@ import { StartWorkflowExecutionRequest, TerminateWorkflowExecutionResponse, WorkflowExecution, + WorkflowExecutionDescription, } from './types'; import { compileWorkflowOptions, WorkflowOptions, WorkflowSignalWithStartOptions } from './workflow-options'; @@ -113,7 +116,7 @@ export interface WorkflowHandle extends BaseWorkf /** * Describe the current workflow execution */ - describe(): Promise; + describe(): Promise; /** * Readonly accessor to the underlying WorkflowClient @@ -765,9 +768,25 @@ export class WorkflowClient { async describe() { const next = this.client._describeWorkflowHandler.bind(this.client); const fn = interceptors.length ? composeInterceptors(interceptors, 'describe', next) : next; - return await fn({ + const raw = await fn({ workflowExecution: { workflowId, runId }, }); + return { + /* eslint-disable @typescript-eslint/no-non-null-assertion */ + type: raw.workflowExecutionInfo!.type!.name!, + workflowId: raw.workflowExecutionInfo!.execution!.workflowId!, + runId: raw.workflowExecutionInfo!.execution!.runId!, + taskQueue: raw.workflowExecutionInfo!.taskQueue!, + status: raw.workflowExecutionInfo!.status!, + historyLength: raw.workflowExecutionInfo!.historyLength!, + startTime: tsToDate(raw.workflowExecutionInfo!.startTime!), + executionTime: optionalTsToDate(raw.workflowExecutionInfo!.executionTime), + closeTime: optionalTsToDate(raw.workflowExecutionInfo!.closeTime), + memo: raw.workflowExecutionInfo!.memo ?? undefined, + searchAttributes: raw.workflowExecutionInfo!.searchAttributes ?? undefined, + parentExecution: raw.workflowExecutionInfo!.parentExecution ?? undefined, + raw, + }; }, async signal(def: SignalDefinition | string, ...args: Args): Promise { const next = this.client._signalWorkflowHandler.bind(this.client); diff --git a/packages/internal-workflow-common/src/time.ts b/packages/internal-workflow-common/src/time.ts index 82f3eafe4..c8b147f9f 100644 --- a/packages/internal-workflow-common/src/time.ts +++ b/packages/internal-workflow-common/src/time.ts @@ -74,3 +74,14 @@ export function msToNumber(val: string | number): number { } return ms(val); } + +export function tsToDate(ts: Timestamp): Date { + return new Date(tsToMs(ts)); +} + +export function optionalTsToDate(ts: Timestamp | null | undefined): Date | undefined { + if (ts === undefined || ts === null) { + return undefined; + } + return new Date(tsToMs(ts)); +} diff --git a/packages/test/src/integration-tests.ts b/packages/test/src/integration-tests.ts index 43b6ddd8d..80dbb70fc 100644 --- a/packages/test/src/integration-tests.ts +++ b/packages/test/src/integration-tests.ts @@ -547,22 +547,25 @@ export function runIntegrationTests(codec?: PayloadCodec): void { }); await workflow.result(); const execution = await workflow.describe(); - t.deepEqual( - execution.workflowExecutionInfo?.type, - new iface.temporal.api.common.v1.WorkflowType({ name: 'argsAndReturn' }) - ); - t.deepEqual(execution.workflowExecutionInfo?.memo, new iface.temporal.api.common.v1.Memo({ fields: {} })); - t.deepEqual(Object.keys(execution.workflowExecutionInfo!.searchAttributes!.indexedFields!), ['BinaryChecksums']); + t.deepEqual(execution.type, 'argsAndReturn'); + t.deepEqual(execution.memo, new iface.temporal.api.common.v1.Memo({ fields: {} })); + t.true(execution.startTime instanceof Date); + t.deepEqual(Object.keys(execution.raw.workflowExecutionInfo!.searchAttributes!.indexedFields!), [ + 'BinaryChecksums', + ]); const checksums = defaultPayloadConverter.fromPayload( - execution.workflowExecutionInfo!.searchAttributes!.indexedFields!.BinaryChecksums! + execution.raw.workflowExecutionInfo!.searchAttributes!.indexedFields!.BinaryChecksums! ); t.true(checksums instanceof Array && checksums.length === 1); t.regex((checksums as string[])[0], /@temporalio\/worker@\d+\.\d+\.\d+/); - t.is(execution.executionConfig?.taskQueue?.name, 'test'); - t.is(execution.executionConfig?.taskQueue?.kind, iface.temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_NORMAL); - t.is(execution.executionConfig?.workflowRunTimeout, null); - t.is(execution.executionConfig?.workflowExecutionTimeout, null); + t.is(execution.raw.executionConfig?.taskQueue?.name, 'test'); + t.is( + execution.raw.executionConfig?.taskQueue?.kind, + iface.temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_NORMAL + ); + t.is(execution.raw.executionConfig?.workflowRunTimeout, null); + t.is(execution.raw.executionConfig?.workflowExecutionTimeout, null); }); test('WorkflowOptions are passed correctly', async (t) => { @@ -584,22 +587,23 @@ export function runIntegrationTests(codec?: PayloadCodec): void { }); const execution = await workflow.describe(); t.deepEqual( - execution.workflowExecutionInfo?.type, + execution.raw.workflowExecutionInfo?.type, new iface.temporal.api.common.v1.WorkflowType({ name: 'sleeper' }) ); - t.deepEqual(await fromPayload(execution.workflowExecutionInfo!.memo!.fields!.a!), 'b'); + t.deepEqual(await fromPayload(execution.raw.workflowExecutionInfo!.memo!.fields!.a!), 'b'); t.deepEqual( - await defaultPayloadConverter.fromPayload( - execution.workflowExecutionInfo!.searchAttributes!.indexedFields!.CustomIntField! - ), + await fromPayload(execution.raw.workflowExecutionInfo!.searchAttributes!.indexedFields!.CustomIntField!), 3 ); - t.is(execution.executionConfig?.taskQueue?.name, 'test2'); - t.is(execution.executionConfig?.taskQueue?.kind, iface.temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_NORMAL); + t.is(execution.raw.executionConfig?.taskQueue?.name, 'test2'); + t.is( + execution.raw.executionConfig?.taskQueue?.kind, + iface.temporal.api.enums.v1.TaskQueueKind.TASK_QUEUE_KIND_NORMAL + ); - t.is(tsToMs(execution.executionConfig!.workflowRunTimeout!), ms(options.workflowRunTimeout)); - t.is(tsToMs(execution.executionConfig!.workflowExecutionTimeout!), ms(options.workflowExecutionTimeout)); - t.is(tsToMs(execution.executionConfig!.defaultWorkflowTaskTimeout!), ms(options.workflowTaskTimeout)); + t.is(tsToMs(execution.raw.executionConfig!.workflowRunTimeout!), ms(options.workflowRunTimeout)); + t.is(tsToMs(execution.raw.executionConfig!.workflowExecutionTimeout!), ms(options.workflowExecutionTimeout)); + t.is(tsToMs(execution.raw.executionConfig!.defaultWorkflowTaskTimeout!), ms(options.workflowTaskTimeout)); }); test('WorkflowHandle.result() throws if terminated', async (t) => { @@ -667,7 +671,7 @@ export function runIntegrationTests(codec?: PayloadCodec): void { }); await workflow.result(); const info = await workflow.describe(); - t.is(info.workflowExecutionInfo?.type?.name, 'sleeper'); + t.is(info.raw.workflowExecutionInfo?.type?.name, 'sleeper'); const { history } = await client.service.getWorkflowExecutionHistory({ namespace, execution: { workflowId: workflow.workflowId, runId: err.newExecutionRunId }, @@ -753,14 +757,7 @@ export function runIntegrationTests(codec?: PayloadCodec): void { return; } t.is(failure.message, 'unhandled rejection'); - t.true( - failure.stackTrace?.includes( - dedent` - Error: unhandled rejection - at eval (webpack-internal:///./lib/workflows/unhandled-rejection.js - ` - ) - ); + t.true(failure.stackTrace?.includes(`Error: unhandled rejection`)); t.is(failure.cause?.message, 'root failure'); }, { minTimeout: 300, factor: 1, retries: 100 } @@ -815,8 +812,8 @@ export function runIntegrationTests(codec?: PayloadCodec): void { }); await t.throwsAsync(handle.result()); const handleForSecondAttempt = client.getHandle(workflowId); - const { workflowExecutionInfo } = await handleForSecondAttempt.describe(); - t.not(workflowExecutionInfo?.execution?.runId, handle.originalRunId); + const { raw } = await handleForSecondAttempt.describe(); + t.not(raw.workflowExecutionInfo?.execution?.runId, handle.originalRunId); }); test('Workflow RetryPolicy ignored with nonRetryable failure', async (t) => { @@ -835,7 +832,7 @@ export function runIntegrationTests(codec?: PayloadCodec): void { await t.throwsAsync(handle.result()); const res = await handle.describe(); t.is( - res.workflowExecutionInfo?.status, + res.raw.workflowExecutionInfo?.status, iface.temporal.api.enums.v1.WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_FAILED ); }); From 95dd75e10597497b71b47a820a63b8be26ccf701 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Loren=20=F0=9F=A4=93?= Date: Sat, 12 Mar 2022 11:40:40 -0500 Subject: [PATCH 2/3] Address comments --- packages/client/src/types.ts | 8 +-- packages/client/src/workflow-client.ts | 18 +++++-- .../src/codec-helpers.ts | 53 ++++++++++++++----- packages/test/src/integration-tests.ts | 24 ++++++++- 4 files changed, 82 insertions(+), 21 deletions(-) diff --git a/packages/client/src/types.ts b/packages/client/src/types.ts index 7def01aa8..546d2cc85 100644 --- a/packages/client/src/types.ts +++ b/packages/client/src/types.ts @@ -14,15 +14,15 @@ export type RequestCancelWorkflowExecutionResponse = export interface WorkflowExecutionDescription { type: string; workflowId: string; - runId?: string; + runId: string; taskQueue: string; status: temporal.api.enums.v1.WorkflowExecutionStatus; historyLength: Long; startTime: Date; executionTime?: Date; closeTime?: Date; - memo?: temporal.api.common.v1.IMemo; - searchAttributes?: temporal.api.common.v1.ISearchAttributes; - parentExecution?: temporal.api.common.v1.IWorkflowExecution; + memo?: Record; + searchAttributes?: Record; + parentExecution?: Required; raw: DescribeWorkflowExecutionResponse; } diff --git a/packages/client/src/workflow-client.ts b/packages/client/src/workflow-client.ts index c75c6498b..1f0a3ed9a 100644 --- a/packages/client/src/workflow-client.ts +++ b/packages/client/src/workflow-client.ts @@ -12,6 +12,7 @@ import { import { decodeArrayFromPayloads, decodeFromPayloadsAtIndex, + decodeMapFromPayloads, decodeOptionalFailureToOptionalError, encodeMapToPayloads, encodeToPayloads, @@ -782,9 +783,20 @@ export class WorkflowClient { startTime: tsToDate(raw.workflowExecutionInfo!.startTime!), executionTime: optionalTsToDate(raw.workflowExecutionInfo!.executionTime), closeTime: optionalTsToDate(raw.workflowExecutionInfo!.closeTime), - memo: raw.workflowExecutionInfo!.memo ?? undefined, - searchAttributes: raw.workflowExecutionInfo!.searchAttributes ?? undefined, - parentExecution: raw.workflowExecutionInfo!.parentExecution ?? undefined, + memo: await decodeMapFromPayloads( + this.client.options.loadedDataConverter, + raw.workflowExecutionInfo!.memo?.fields + ), + searchAttributes: await decodeMapFromPayloads( + defaultDataConverter, + raw.workflowExecutionInfo!.searchAttributes?.indexedFields + ), + parentExecution: raw.workflowExecutionInfo!.parentExecution + ? { + workflowId: raw.workflowExecutionInfo!.parentExecution!.workflowId!, + runId: raw.workflowExecutionInfo!.parentExecution!.runId!, + } + : undefined, raw, }; }, diff --git a/packages/internal-non-workflow-common/src/codec-helpers.ts b/packages/internal-non-workflow-common/src/codec-helpers.ts index 12a013393..52292fa22 100644 --- a/packages/internal-non-workflow-common/src/codec-helpers.ts +++ b/packages/internal-non-workflow-common/src/codec-helpers.ts @@ -12,7 +12,6 @@ import { toPayload, toPayloads, } from '@temporalio/common'; - import { DecodedPayload, DecodedProtoFailure, EncodedPayload, EncodedProtoFailure } from './codec-types'; export interface TypecheckedPayloadCodec { @@ -63,7 +62,8 @@ export async function encodeOptional( codec: PayloadCodec, payloads: Payload[] | null | undefined ): Promise { - if (!payloads) return payloads; + if (payloads === null) return null; + if (payloads === undefined) return undefined; return (await codec.encode(payloads)) as EncodedPayload[]; } @@ -72,7 +72,8 @@ export async function decodeOptional( codec: PayloadCodec, payloads: Payload[] | null | undefined ): Promise { - if (!payloads) return payloads; + if (payloads === null) return null; + if (payloads === undefined) return undefined; return (await codec.decode(payloads)) as DecodedPayload[]; } @@ -86,7 +87,8 @@ export async function encodeOptionalSingle( codec: PayloadCodec, payload: Payload | null | undefined ): Promise { - if (!payload) return payload; + if (payload === null) return null; + if (payload === undefined) return undefined; return await encodeSingle(codec, payload); } @@ -100,7 +102,9 @@ export async function decodeOptionalSingle( codec: PayloadCodec, payload: Payload | null | undefined ): Promise { - if (!payload) return payload; + if (payload === null) return null; + if (payload === undefined) return undefined; + return await decodeSingle(codec, payload); } @@ -127,12 +131,33 @@ export async function encodeToPayloads( return payloads ? await payloadCodec.encode(payloads) : undefined; } +/** + * Run {@link PayloadCodec.decode} and then {@link PayloadConverter.fromPayload} on values in `map`. + */ +export async function decodeMapFromPayloads( + converter: LoadedDataConverter, + map: Record | null | undefined +): Promise | undefined> { + if (!map) return undefined; + const { payloadConverter, payloadCodec } = converter; + return Object.fromEntries( + await Promise.all( + Object.entries(map).map(async ([k, payload]): Promise<[K, unknown]> => { + const [decodedPayload] = await payloadCodec.decode([payload as Payload]); + const value = payloadConverter.fromPayload(decodedPayload); + return [k as K, value]; + }) + ) + ) as Record; +} + /** Run {@link PayloadCodec.encode} on all values in `map` */ export async function encodeMap( codec: PayloadCodec, map: Record | null | undefined ): Promise | null | undefined> { - if (!map) return map; + if (map === null) return null; + if (map === undefined) return undefined; return Object.fromEntries( await Promise.all( Object.entries(map).map(async ([k, payload]): Promise<[K, EncodedPayload]> => { @@ -143,11 +168,11 @@ export async function encodeMap( } /** - * Run {@link PayloadConverter.toPayload} and {@link PayloadCodec.encode} on values in `map`. + * Run {@link PayloadConverter.toPayload} and then {@link PayloadCodec.encode} on values in `map`. */ export async function encodeMapToPayloads( converter: LoadedDataConverter, - map: Record + map: Record ): Promise> { const { payloadConverter, payloadCodec } = converter; return Object.fromEntries( @@ -228,7 +253,8 @@ export async function encodeOptionalFailure( codec: PayloadCodec, failure: ProtoFailure | null | undefined ): Promise { - if (!failure) return failure; + if (failure === null) return null; + if (failure === undefined) return undefined; return await encodeFailure(codec, failure); } @@ -239,7 +265,8 @@ export async function decodeOptionalFailure( codec: PayloadCodec, failure: ProtoFailure | null | undefined ): Promise { - if (!failure) return failure; + if (failure === null) return null; + if (failure === undefined) return undefined; return await decodeFailure(codec, failure); } @@ -301,7 +328,8 @@ export async function decodeFailure(_codec: PayloadCodec, failure: ProtoFailure) export function noopEncodeMap( map: Record | null | undefined ): Record | null | undefined { - if (!map) return map; + if (map === null) return null; + if (map === undefined) return undefined; return map as Record; } @@ -312,6 +340,7 @@ export function noopEncodeMap( export function noopDecodeMap( map: Record | null | undefined ): Record | null | undefined { - if (!map) return map; + if (map === null) return null; + if (map === undefined) return undefined; return map as Record; } diff --git a/packages/test/src/integration-tests.ts b/packages/test/src/integration-tests.ts index 80dbb70fc..3df3840b3 100644 --- a/packages/test/src/integration-tests.ts +++ b/packages/test/src/integration-tests.ts @@ -538,18 +538,38 @@ export function runIntegrationTests(codec?: PayloadCodec): void { t.regex(event.workflowTaskCompletedEventAttributes!.binaryChecksum!, /@temporalio\/worker@\d+\.\d+\.\d+/); }); - test('WorkflowOptions are passed correctly with defaults', async (t) => { + test('WorkflowHandle.describe result is wrapped', async (t) => { const { client } = t.context; const workflow = await client.start(workflows.argsAndReturn, { args: ['hey', undefined, Buffer.from('def')], taskQueue: 'test', workflowId: uuid4(), + searchAttributes: { + CustomKeywordField: 'test-value', + }, + memo: { + note: 'foo', + }, }); await workflow.result(); const execution = await workflow.describe(); t.deepEqual(execution.type, 'argsAndReturn'); - t.deepEqual(execution.memo, new iface.temporal.api.common.v1.Memo({ fields: {} })); + t.deepEqual(execution.memo, { note: 'foo' }); t.true(execution.startTime instanceof Date); + t.is(execution.searchAttributes!.CustomKeywordField, 'test-value'); + t.regex((execution.searchAttributes!.BinaryChecksums as string[])[0], /@temporalio\/worker@/); + }); + + test('WorkflowOptions are passed correctly with defaults', async (t) => { + const { client } = t.context; + const workflow = await client.start(workflows.argsAndReturn, { + args: ['hey', undefined, Buffer.from('def')], + taskQueue: 'test', + workflowId: uuid4(), + }); + await workflow.result(); + const execution = await workflow.describe(); + t.deepEqual(execution.type, 'argsAndReturn'); t.deepEqual(Object.keys(execution.raw.workflowExecutionInfo!.searchAttributes!.indexedFields!), [ 'BinaryChecksums', ]); From f2e63b8887b7da82e06be2765974ec216bae4f1f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Loren=20=F0=9F=A4=93?= Date: Sat, 12 Mar 2022 12:12:52 -0500 Subject: [PATCH 3/3] Fix tests --- packages/test/src/integration-tests.ts | 4 +++- packages/test/src/test-integration-codec.ts | 20 ++++++++------------ 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/packages/test/src/integration-tests.ts b/packages/test/src/integration-tests.ts index 3df3840b3..8fadf5f52 100644 --- a/packages/test/src/integration-tests.ts +++ b/packages/test/src/integration-tests.ts @@ -612,7 +612,9 @@ export function runIntegrationTests(codec?: PayloadCodec): void { ); t.deepEqual(await fromPayload(execution.raw.workflowExecutionInfo!.memo!.fields!.a!), 'b'); t.deepEqual( - await fromPayload(execution.raw.workflowExecutionInfo!.searchAttributes!.indexedFields!.CustomIntField!), + await defaultPayloadConverter.fromPayload( + execution.raw.workflowExecutionInfo!.searchAttributes!.indexedFields!.CustomIntField! + ), 3 ); t.is(execution.raw.executionConfig?.taskQueue?.name, 'test2'); diff --git a/packages/test/src/test-integration-codec.ts b/packages/test/src/test-integration-codec.ts index 0b19574fb..9315656e2 100644 --- a/packages/test/src/test-integration-codec.ts +++ b/packages/test/src/test-integration-codec.ts @@ -4,21 +4,17 @@ import { runIntegrationTests } from './integration-tests'; class TestPayloadCodec implements PayloadCodec { async encode(payloads: Payload[]): Promise { - return payloads.map((payload) => { - if (payload.data) { - payload.data = payload.data.map((byte) => byte + 1); - } - return payload; - }); + return payloads.map((payload) => ({ + ...payload, + data: payload.data?.map((byte) => byte + 1), + })); } async decode(payloads: Payload[]): Promise { - return payloads.map((payload) => { - if (payload.data) { - payload.data = payload.data.map((byte) => byte - 1); - } - return payload; - }); + return payloads.map((payload) => ({ + ...payload, + data: payload.data?.map((byte) => byte - 1), + })); } }