Skip to content

Commit 0df3fd4

Browse files
committed
Add Workflow result interceptor to Client
1 parent 94a364e commit 0df3fd4

File tree

3 files changed

+62
-10
lines changed

3 files changed

+62
-10
lines changed

packages/client/src/interceptors.ts

+18-4
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,16 @@
44
* @module
55
*/
66

7-
import { Next, Headers } from '@temporalio/internal-workflow-common';
7+
import { Headers, Next, Workflow, WorkflowResultType } from '@temporalio/internal-workflow-common';
88
import { temporal } from '@temporalio/proto';
9-
import { CompiledWorkflowOptions } from './workflow-options';
109
import {
1110
DescribeWorkflowExecutionResponse,
1211
RequestCancelWorkflowExecutionResponse,
1312
TerminateWorkflowExecutionResponse,
14-
WorkflowExecution
13+
WorkflowExecution,
1514
} from './types';
15+
import { WorkflowResultOptions } from './workflow-client';
16+
import { CompiledWorkflowOptions } from './workflow-options';
1617

1718
export { Next, Headers };
1819

@@ -65,7 +66,13 @@ export interface WorkflowCancelInput {
6566
/** Input for WorkflowClientCallsInterceptor.describe */
6667
export interface WorkflowDescribeInput {
6768
readonly workflowExecution: WorkflowExecution;
68-
readonly firstExecutionRunId?: string;
69+
}
70+
71+
/** Input for WorkflowClientCallsInterceptor.result */
72+
export interface WorkflowResultInput {
73+
readonly workflowExecution: WorkflowExecution;
74+
readonly runIdForResult?: string;
75+
readonly resultOptions?: WorkflowResultOptions;
6976
}
7077

7178
/**
@@ -109,6 +116,13 @@ export interface WorkflowClientCallsInterceptor {
109116
* Intercept a service call to describeWorkflowExecution
110117
*/
111118
describe?: (input: WorkflowDescribeInput, next: Next<this, 'describe'>) => Promise<DescribeWorkflowExecutionResponse>;
119+
/**
120+
* Intercept a call to @{link WorkflowClient.result}
121+
*/
122+
result?: <T extends Workflow>(
123+
input: WorkflowResultInput,
124+
next: Next<this, 'result'>
125+
) => Promise<WorkflowResultType<T>>;
112126
}
113127

114128
interface WorkflowClientCallsInterceptorFactoryInput {

packages/client/src/workflow-client.ts

+20-2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import {
4444
WorkflowClientInterceptors,
4545
WorkflowDescribeInput,
4646
WorkflowQueryInput,
47+
WorkflowResultInput,
4748
WorkflowSignalInput,
4849
WorkflowSignalWithStartInput,
4950
WorkflowStartInput,
@@ -727,6 +728,18 @@ export class WorkflowClient {
727728
}
728729
}
729730

731+
/**
732+
* Uses given input to call `WorkflowClient.result`
733+
*
734+
* Used as the final function of the result interceptor chain
735+
*/
736+
protected async _resultWorkflowHandler<T extends Workflow>(
737+
input: WorkflowResultInput
738+
): Promise<WorkflowResultType<T>> {
739+
return this.result(input.workflowExecution.workflowId, input.runIdForResult, input.resultOptions);
740+
// this.rethrowGrpcError(err, input.workflowExecution, 'Failed to describe workflow');
741+
}
742+
730743
/**
731744
* Create a new workflow handle for new or existing Workflow execution
732745
*/
@@ -742,7 +755,13 @@ export class WorkflowClient {
742755
client: this,
743756
workflowId,
744757
async result(): Promise<WorkflowResultType<T>> {
745-
return this.client.result(workflowId, runIdForResult, resultOptions);
758+
const next = this.client._resultWorkflowHandler.bind(this.client);
759+
const fn = interceptors.length ? composeInterceptors(interceptors, 'result', next) : next;
760+
return await fn({
761+
workflowExecution: { workflowId, runId },
762+
runIdForResult,
763+
resultOptions,
764+
});
746765
},
747766
async terminate(reason?: string) {
748767
const next = this.client._terminateWorkflowHandler.bind(this.client);
@@ -766,7 +785,6 @@ export class WorkflowClient {
766785
const fn = interceptors.length ? composeInterceptors(interceptors, 'describe', next) : next;
767786
return await fn({
768787
workflowExecution: { workflowId, runId },
769-
firstExecutionRunId,
770788
});
771789
},
772790
async signal<Args extends any[]>(def: SignalDefinition<Args> | string, ...args: Args): Promise<void> {

packages/test/src/test-interceptors.ts

+24-4
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,21 @@
55
* @module
66
*/
77

8-
import { Connection, WorkflowClient, WorkflowFailedError } from '@temporalio/client';
9-
import { ApplicationFailure, TerminatedFailure, toPayload } from '@temporalio/common';
8+
import {
9+
Connection,
10+
WorkflowClient,
11+
WorkflowClientCallsInterceptor,
12+
WorkflowFailedError,
13+
WorkflowResultInput,
14+
} from '@temporalio/client';
15+
import {
16+
ApplicationFailure,
17+
Next,
18+
TerminatedFailure,
19+
toPayload,
20+
Workflow,
21+
WorkflowResultType,
22+
} from '@temporalio/common';
1023
import { Core, DefaultLogger, Worker } from '@temporalio/worker';
1124
import { defaultPayloadConverter, WorkflowInfo } from '@temporalio/workflow';
1225
import test from 'ava';
@@ -83,6 +96,13 @@ if (RUN_INTEGRATION_TESTS) {
8396
const result: string = (await next(input)) as any;
8497
return [...result].reverse().join('');
8598
},
99+
async result<T extends Workflow>(
100+
input: WorkflowResultInput,
101+
next: Next<WorkflowClientCallsInterceptor, 'result'>
102+
) {
103+
const result = await next(input);
104+
return (result + '-intercepted') as WorkflowResultType<T>;
105+
},
86106
}),
87107
],
88108
},
@@ -99,7 +119,7 @@ if (RUN_INTEGRATION_TESTS) {
99119
wf.query(getSecretQuery).then((result) => t.is(result, '12345')),
100120
]);
101121
const result = await wf.result();
102-
t.is(result, message);
122+
t.is(result, message + '-intercepted');
103123
}
104124
{
105125
const wf = await client.signalWithStart(interceptorExample, {
@@ -109,7 +129,7 @@ if (RUN_INTEGRATION_TESTS) {
109129
signalArgs: ['12345'],
110130
});
111131
const result = await wf.result();
112-
t.is(result, message);
132+
t.is(result, message + '-intercepted');
113133
}
114134
} finally {
115135
worker.shutdown();

0 commit comments

Comments
 (0)