Skip to content

Commit 6fc5426

Browse files
lorensrbergundy
andauthored
fix: Use default data converter for search attributes (#511)
* Use defaultDataConverter for searchAttributes * chore: Get static checking of encoding Failures and Completions (#520) * Get static checking of encoding Failures and Completions * Update tests * Fix encoding * Decode safely, without lodash 😄 * Fix failure codec helpers * Prevent {} interface from being replaced * Move codec types to internal-non-workflow-common Co-authored-by: Roey Berman <[email protected]>
1 parent d1ca03b commit 6fc5426

File tree

10 files changed

+494
-259
lines changed

10 files changed

+494
-259
lines changed

packages/client/src/workflow-client.ts

+3-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { status as grpcStatus } from '@grpc/grpc-js';
22
import {
33
CancelledFailure,
44
DataConverter,
5+
defaultDataConverter,
56
LoadedDataConverter,
67
RetryState,
78
TerminatedFailure,
@@ -606,7 +607,7 @@ export class WorkflowClient {
606607
: undefined,
607608
searchAttributes: options.searchAttributes
608609
? {
609-
indexedFields: await encodeMapToPayloads(this.options.loadedDataConverter, options.searchAttributes),
610+
indexedFields: await encodeMapToPayloads(defaultDataConverter, options.searchAttributes),
610611
}
611612
: undefined,
612613
cronSchedule: options.cronSchedule,
@@ -645,7 +646,7 @@ export class WorkflowClient {
645646
memo: opts.memo ? { fields: await encodeMapToPayloads(this.options.loadedDataConverter, opts.memo) } : undefined,
646647
searchAttributes: opts.searchAttributes
647648
? {
648-
indexedFields: await encodeMapToPayloads(this.options.loadedDataConverter, opts.searchAttributes),
649+
indexedFields: await encodeMapToPayloads(defaultDataConverter, opts.searchAttributes),
649650
}
650651
: undefined,
651652
cronSchedule: opts.cronSchedule,

packages/common/src/converter/data-converter.ts

+7-2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import { PayloadCodec } from './payload-codec';
2-
import { PayloadConverter } from './payload-converter';
1+
import { defaultPayloadCodec, PayloadCodec } from './payload-codec';
2+
import { defaultPayloadConverter, PayloadConverter } from './payload-converter';
33

44
/**
55
* When your data (arguments and return values) is sent over the wire and stored by Temporal Server, it is encoded in
@@ -46,3 +46,8 @@ export interface LoadedDataConverter {
4646
payloadConverter: PayloadConverter;
4747
payloadCodec: PayloadCodec;
4848
}
49+
50+
export const defaultDataConverter: LoadedDataConverter = {
51+
payloadConverter: defaultPayloadConverter,
52+
payloadCodec: defaultPayloadCodec,
53+
};

packages/common/src/converter/types.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
import type * as iface from '@temporalio/proto/lib/coresdk';
2-
import { TextEncoder, TextDecoder } from './encoding';
1+
import type { coresdk } from '@temporalio/proto/lib/coresdk';
2+
import { TextDecoder, TextEncoder } from './encoding';
33

4-
export type Payload = iface.coresdk.common.IPayload;
4+
export type Payload = coresdk.common.IPayload;
55

66
/**
77
* Transform an *ascii* string into a Uint8Array

packages/internal-non-workflow-common/package.json

+1-2
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@
1414
"license": "MIT",
1515
"dependencies": {
1616
"@temporalio/common": "file:../common",
17-
"@temporalio/internal-workflow-common": "file:../internal-workflow-common",
18-
"lodash": "^4.17.21"
17+
"@temporalio/internal-workflow-common": "file:../internal-workflow-common"
1918
},
2019
"bugs": {
2120
"url": "https://github.com/temporalio/sdk-typescript/issues"

packages/internal-non-workflow-common/src/codec-helpers.ts

+203-79
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,13 @@ import {
1212
toPayload,
1313
toPayloads,
1414
} from '@temporalio/common';
15-
import { clone, setWith } from 'lodash';
15+
16+
import { DecodedPayload, DecodedProtoFailure, EncodedPayload, EncodedProtoFailure } from './codec-types';
17+
18+
export interface TypecheckedPayloadCodec {
19+
encode(payloads: Payload[]): Promise<EncodedPayload[]>;
20+
decode(payloads: Payload[]): Promise<DecodedPayload[]>;
21+
}
1622

1723
/**
1824
* Decode `payloads` and then return {@link fromPayloadsAtIndex}.
@@ -52,13 +58,58 @@ export async function decodeOptionalFailureToOptionalError(
5258
return failure ? failureToError(await decodeFailure(payloadCodec, failure), payloadConverter) : undefined;
5359
}
5460

61+
/** Run {@link PayloadCodec.encode} on `payloads` */
62+
export async function encodeOptional(
63+
codec: PayloadCodec,
64+
payloads: Payload[] | null | undefined
65+
): Promise<EncodedPayload[] | null | undefined> {
66+
if (!payloads) return payloads;
67+
return (await codec.encode(payloads)) as EncodedPayload[];
68+
}
69+
70+
/** Run {@link PayloadCodec.decode} on `payloads` */
71+
export async function decodeOptional(
72+
codec: PayloadCodec,
73+
payloads: Payload[] | null | undefined
74+
): Promise<DecodedPayload[] | null | undefined> {
75+
if (!payloads) return payloads;
76+
return (await codec.decode(payloads)) as DecodedPayload[];
77+
}
78+
79+
async function encodeSingle(codec: PayloadCodec, payload: Payload): Promise<EncodedPayload> {
80+
const encodedPayloads = await codec.encode([payload]);
81+
return encodedPayloads[0] as EncodedPayload;
82+
}
83+
84+
/** Run {@link PayloadCodec.encode} on a single Payload */
85+
export async function encodeOptionalSingle(
86+
codec: PayloadCodec,
87+
payload: Payload | null | undefined
88+
): Promise<EncodedPayload | null | undefined> {
89+
if (!payload) return payload;
90+
return await encodeSingle(codec, payload);
91+
}
92+
93+
async function decodeSingle(codec: PayloadCodec, payload: Payload): Promise<DecodedPayload> {
94+
const decodedPayloads = await codec.decode([payload]);
95+
return decodedPayloads[0] as DecodedPayload;
96+
}
97+
98+
/** Run {@link PayloadCodec.decode} on a single Payload */
99+
export async function decodeOptionalSingle(
100+
codec: PayloadCodec,
101+
payload: Payload | null | undefined
102+
): Promise<DecodedPayload | null | undefined> {
103+
if (!payload) return payload;
104+
return await decodeSingle(codec, payload);
105+
}
106+
55107
/**
56108
* Run {@link PayloadConverter.toPayload} on value, and then encode it.
57109
*/
58110
export async function encodeToPayload(converter: LoadedDataConverter, value: unknown): Promise<Payload> {
59111
const { payloadConverter, payloadCodec } = converter;
60-
const [encoded] = await payloadCodec.encode([toPayload(payloadConverter, value)]);
61-
return encoded;
112+
return await encodeSingle(payloadCodec, toPayload(payloadConverter, value));
62113
}
63114

64115
/**
@@ -76,6 +127,21 @@ export async function encodeToPayloads(
76127
return payloads ? await payloadCodec.encode(payloads) : undefined;
77128
}
78129

130+
/** Run {@link PayloadCodec.encode} on all values in `map` */
131+
export async function encodeMap<K extends string>(
132+
codec: PayloadCodec,
133+
map: Record<K, Payload> | null | undefined
134+
): Promise<Record<K, EncodedPayload> | null | undefined> {
135+
if (!map) return map;
136+
return Object.fromEntries(
137+
await Promise.all(
138+
Object.entries(map).map(async ([k, payload]): Promise<[K, EncodedPayload]> => {
139+
return [k as K, await encodeSingle(codec, payload as Payload)];
140+
})
141+
)
142+
) as Record<K, EncodedPayload>;
143+
}
144+
79145
/**
80146
* Run {@link PayloadConverter.toPayload} and {@link PayloadCodec.encode} on values in `map`.
81147
*/
@@ -107,87 +173,145 @@ export async function encodeErrorToFailure(dataConverter: LoadedDataConverter, e
107173
/**
108174
* Return a new {@link ProtoFailure} with `codec.encode()` run on all the {@link Payload}s.
109175
*/
110-
export async function encodeFailure(codec: PayloadCodec, failure: ProtoFailure): Promise<ProtoFailure> {
111-
const encodedFailure = { ...failure };
112-
if (failure.cause) {
113-
encodedFailure.cause = await encodeFailure(codec, failure.cause);
114-
}
176+
export async function encodeFailure(_codec: PayloadCodec, failure: ProtoFailure): Promise<EncodedProtoFailure> {
177+
const codec = _codec as TypecheckedPayloadCodec;
178+
return {
179+
...failure,
180+
cause: failure.cause ? await encodeFailure(codec, failure.cause) : null,
181+
applicationFailureInfo: failure.applicationFailureInfo
182+
? {
183+
...failure.applicationFailureInfo,
184+
details: failure.applicationFailureInfo.details
185+
? {
186+
payloads: await codec.encode(failure.applicationFailureInfo.details.payloads ?? []),
187+
}
188+
: undefined,
189+
}
190+
: undefined,
191+
timeoutFailureInfo: failure.timeoutFailureInfo
192+
? {
193+
...failure.timeoutFailureInfo,
194+
lastHeartbeatDetails: failure.timeoutFailureInfo.lastHeartbeatDetails
195+
? {
196+
payloads: await codec.encode(failure.timeoutFailureInfo.lastHeartbeatDetails.payloads ?? []),
197+
}
198+
: undefined,
199+
}
200+
: undefined,
201+
canceledFailureInfo: failure.canceledFailureInfo
202+
? {
203+
...failure.canceledFailureInfo,
204+
details: failure.canceledFailureInfo.details
205+
? {
206+
payloads: await codec.encode(failure.canceledFailureInfo.details.payloads ?? []),
207+
}
208+
: undefined,
209+
}
210+
: undefined,
211+
resetWorkflowFailureInfo: failure.resetWorkflowFailureInfo
212+
? {
213+
...failure.resetWorkflowFailureInfo,
214+
lastHeartbeatDetails: failure.resetWorkflowFailureInfo.lastHeartbeatDetails
215+
? {
216+
payloads: await codec.encode(failure.resetWorkflowFailureInfo.lastHeartbeatDetails.payloads ?? []),
217+
}
218+
: undefined,
219+
}
220+
: undefined,
221+
};
222+
}
115223

116-
if (encodedFailure.applicationFailureInfo?.details?.payloads?.length) {
117-
setWith(
118-
encodedFailure,
119-
'applicationFailureInfo.details.payloads',
120-
await codec.encode(encodedFailure.applicationFailureInfo.details.payloads),
121-
clone
122-
);
123-
}
124-
if (encodedFailure.timeoutFailureInfo?.lastHeartbeatDetails?.payloads?.length) {
125-
setWith(
126-
encodedFailure,
127-
'timeoutFailureInfo.lastHeartbeatDetails.payloads',
128-
await codec.encode(encodedFailure.timeoutFailureInfo.lastHeartbeatDetails.payloads),
129-
clone
130-
);
131-
}
132-
if (encodedFailure.canceledFailureInfo?.details?.payloads?.length) {
133-
setWith(
134-
encodedFailure,
135-
'canceledFailureInfo.details.payloads',
136-
await codec.encode(encodedFailure.canceledFailureInfo.details.payloads),
137-
clone
138-
);
139-
}
140-
if (encodedFailure.resetWorkflowFailureInfo?.lastHeartbeatDetails?.payloads?.length) {
141-
setWith(
142-
encodedFailure,
143-
'resetWorkflowFailureInfo.lastHeartbeatDetails.payloads',
144-
await codec.encode(encodedFailure.resetWorkflowFailureInfo.lastHeartbeatDetails.payloads),
145-
clone
146-
);
147-
}
148-
return encodedFailure;
224+
/**
225+
* Return a new {@link ProtoFailure} with `codec.encode()` run on all the {@link Payload}s.
226+
*/
227+
export async function encodeOptionalFailure(
228+
codec: PayloadCodec,
229+
failure: ProtoFailure | null | undefined
230+
): Promise<EncodedProtoFailure | null | undefined> {
231+
if (!failure) return failure;
232+
return await encodeFailure(codec, failure);
233+
}
234+
235+
/**
236+
* Return a new {@link ProtoFailure} with `codec.encode()` run on all the {@link Payload}s.
237+
*/
238+
export async function decodeOptionalFailure(
239+
codec: PayloadCodec,
240+
failure: ProtoFailure | null | undefined
241+
): Promise<DecodedProtoFailure | null | undefined> {
242+
if (!failure) return failure;
243+
return await decodeFailure(codec, failure);
149244
}
150245

151246
/**
152247
* Return a new {@link ProtoFailure} with `codec.decode()` run on all the {@link Payload}s.
153248
*/
154-
export async function decodeFailure(codec: PayloadCodec, failure: ProtoFailure): Promise<ProtoFailure> {
155-
const decodedFailure = { ...failure };
156-
if (failure.cause) {
157-
decodedFailure.cause = await decodeFailure(codec, failure.cause);
158-
}
249+
export async function decodeFailure(_codec: PayloadCodec, failure: ProtoFailure): Promise<DecodedProtoFailure> {
250+
const codec = _codec as TypecheckedPayloadCodec;
251+
return {
252+
...failure,
253+
cause: failure.cause ? await decodeFailure(codec, failure.cause) : null,
254+
applicationFailureInfo: failure.applicationFailureInfo
255+
? {
256+
...failure.applicationFailureInfo,
257+
details: failure.applicationFailureInfo.details
258+
? {
259+
payloads: await codec.decode(failure.applicationFailureInfo.details.payloads ?? []),
260+
}
261+
: undefined,
262+
}
263+
: undefined,
264+
timeoutFailureInfo: failure.timeoutFailureInfo
265+
? {
266+
...failure.timeoutFailureInfo,
267+
lastHeartbeatDetails: failure.timeoutFailureInfo.lastHeartbeatDetails
268+
? {
269+
payloads: await codec.decode(failure.timeoutFailureInfo.lastHeartbeatDetails.payloads ?? []),
270+
}
271+
: undefined,
272+
}
273+
: undefined,
274+
canceledFailureInfo: failure.canceledFailureInfo
275+
? {
276+
...failure.canceledFailureInfo,
277+
details: failure.canceledFailureInfo.details
278+
? {
279+
payloads: await codec.decode(failure.canceledFailureInfo.details.payloads ?? []),
280+
}
281+
: undefined,
282+
}
283+
: undefined,
284+
resetWorkflowFailureInfo: failure.resetWorkflowFailureInfo
285+
? {
286+
...failure.resetWorkflowFailureInfo,
287+
lastHeartbeatDetails: failure.resetWorkflowFailureInfo.lastHeartbeatDetails
288+
? {
289+
payloads: await codec.decode(failure.resetWorkflowFailureInfo.lastHeartbeatDetails.payloads ?? []),
290+
}
291+
: undefined,
292+
}
293+
: undefined,
294+
};
295+
}
159296

160-
if (decodedFailure.applicationFailureInfo?.details?.payloads?.length) {
161-
setWith(
162-
decodedFailure,
163-
'applicationFailureInfo.details.payloads',
164-
await codec.decode(decodedFailure.applicationFailureInfo.details.payloads),
165-
clone
166-
);
167-
}
168-
if (decodedFailure.timeoutFailureInfo?.lastHeartbeatDetails?.payloads?.length) {
169-
setWith(
170-
decodedFailure,
171-
'timeoutFailureInfo.lastHeartbeatDetails.payloads',
172-
await codec.decode(decodedFailure.timeoutFailureInfo.lastHeartbeatDetails.payloads),
173-
clone
174-
);
175-
}
176-
if (decodedFailure.canceledFailureInfo?.details?.payloads?.length) {
177-
setWith(
178-
decodedFailure,
179-
'canceledFailureInfo.details.payloads',
180-
await codec.decode(decodedFailure.canceledFailureInfo.details.payloads),
181-
clone
182-
);
183-
}
184-
if (decodedFailure.resetWorkflowFailureInfo?.lastHeartbeatDetails?.payloads?.length) {
185-
setWith(
186-
decodedFailure,
187-
'resetWorkflowFailureInfo.lastHeartbeatDetails.payloads',
188-
await codec.decode(decodedFailure.resetWorkflowFailureInfo.lastHeartbeatDetails.payloads),
189-
clone
190-
);
191-
}
192-
return decodedFailure;
297+
/**
298+
* Mark all values in the map as encoded.
299+
* Use this for headers and searchAttributes, which we don't encode.
300+
*/
301+
export function noopEncodeMap<K extends string>(
302+
map: Record<K, Payload> | null | undefined
303+
): Record<K, EncodedPayload> | null | undefined {
304+
if (!map) return map;
305+
return map as Record<K, EncodedPayload>;
306+
}
307+
308+
/**
309+
* Mark all values in the map as decoded.
310+
* Use this for headers and searchAttributes, which we don't encode.
311+
*/
312+
export function noopDecodeMap<K extends string>(
313+
map: Record<K, Payload> | null | undefined
314+
): Record<K, DecodedPayload> | null | undefined {
315+
if (!map) return map;
316+
return map as Record<K, DecodedPayload>;
193317
}

0 commit comments

Comments
 (0)