Skip to content

Commit df6d203

Browse files
authored
chore: use EventStreamCodec instead of EventStreamMarshaller (#3750)
1 parent cbe8126 commit df6d203

9 files changed

+40
-35
lines changed

packages/eventstream-handler-node/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
},
2020
"license": "Apache-2.0",
2121
"dependencies": {
22-
"@aws-sdk/eventstream-marshaller": "*",
22+
"@aws-sdk/eventstream-codec": "*",
2323
"@aws-sdk/types": "*",
2424
"tslib": "^2.3.1"
2525
},

packages/eventstream-handler-node/src/EventSigningStream.spec.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
1-
import { EventStreamMarshaller } from "@aws-sdk/eventstream-marshaller";
1+
import { EventStreamCodec } from "@aws-sdk/eventstream-codec";
22
import { Message, MessageHeaders } from "@aws-sdk/types";
33
import { fromUtf8, toUtf8 } from "@aws-sdk/util-utf8-node";
44

55
import { EventSigningStream } from "./EventSigningStream";
66

77
describe("EventSigningStream", () => {
88
const originalDate = Date;
9+
910
afterEach(() => {
1011
Date = originalDate;
1112
});
13+
1214
it("should sign a eventstream payload properly", (done) => {
13-
const marshaller = new EventStreamMarshaller(toUtf8, fromUtf8);
15+
const eventStreamCodec = new EventStreamCodec(toUtf8, fromUtf8);
1416
const inputChunks: Array<Uint8Array> = (
1517
[
1618
{
@@ -22,7 +24,7 @@ describe("EventSigningStream", () => {
2224
body: fromUtf8("bar"),
2325
},
2426
] as Array<Message>
25-
).map((event) => marshaller.marshall(event));
27+
).map((event) => eventStreamCodec.encode(event));
2628
const expected: Array<MessageHeaders> = [
2729
{
2830
":date": { type: "timestamp", value: new Date(1546045446000) },
@@ -57,11 +59,11 @@ describe("EventSigningStream", () => {
5759
const signingStream = new EventSigningStream({
5860
priorSignature: "initial",
5961
eventSigner: { sign: mockEventSigner },
60-
eventMarshaller: marshaller,
62+
eventStreamCodec,
6163
});
6264
const output: Array<MessageHeaders> = [];
6365
signingStream.on("data", (chunk) => {
64-
output.push(marshaller.unmarshall(chunk).headers);
66+
output.push(eventStreamCodec.decode(chunk).headers);
6567
});
6668
signingStream.on("end", () => {
6769
expect(output).toEqual(expected);

packages/eventstream-handler-node/src/EventSigningStream.ts

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
import { EventStreamMarshaller as EventMarshaller } from "@aws-sdk/eventstream-marshaller";
1+
import { EventStreamCodec } from "@aws-sdk/eventstream-codec";
22
import { EventSigner, MessageHeaders } from "@aws-sdk/types";
33
import { Transform, TransformCallback, TransformOptions } from "stream";
44

55
export interface EventSigningStreamOptions extends TransformOptions {
66
priorSignature: string;
77
eventSigner: EventSigner;
8-
eventMarshaller: EventMarshaller;
8+
eventStreamCodec: EventStreamCodec;
99
}
1010

1111
/**
@@ -14,16 +14,19 @@ export interface EventSigningStreamOptions extends TransformOptions {
1414
export class EventSigningStream extends Transform {
1515
private priorSignature: string;
1616
private eventSigner: EventSigner;
17-
private eventMarshaller: EventMarshaller;
17+
private eventStreamCodec: EventStreamCodec;
18+
1819
constructor(options: EventSigningStreamOptions) {
1920
super({
2021
readableObjectMode: true,
2122
writableObjectMode: true,
2223
...options,
2324
});
25+
2426
this.priorSignature = options.priorSignature;
2527
this.eventSigner = options.eventSigner;
26-
this.eventMarshaller = options.eventMarshaller;
28+
this.eventStreamCodec = options.eventStreamCodec;
29+
2730
//TODO: use 'autoDestroy' when targeting Node 11
2831
//reference: https://nodejs.org/dist/latest-v13.x/docs/api/stream.html#stream_new_stream_readable_options
2932
this.on("error", () => {
@@ -43,15 +46,15 @@ export class EventSigningStream extends Transform {
4346
const signature = await this.eventSigner.sign(
4447
{
4548
payload: chunk,
46-
headers: this.eventMarshaller.formatHeaders(dateHeader),
49+
headers: this.eventStreamCodec.formatHeaders(dateHeader),
4750
},
4851
{
4952
priorSignature: this.priorSignature,
5053
signingDate: now,
5154
}
5255
);
5356
this.priorSignature = signature;
54-
const serializedSigned = this.eventMarshaller.marshall({
57+
const serializedSigned = this.eventStreamCodec.encode({
5558
headers: {
5659
...dateHeader,
5760
":chunk-signature": {

packages/eventstream-handler-node/src/EventStreamPayloadHandler.spec.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
1-
import { EventStreamMarshaller } from "@aws-sdk/eventstream-marshaller";
1+
import { EventStreamCodec } from "@aws-sdk/eventstream-codec";
22
import { Decoder, Encoder, EventSigner, FinalizeHandler, FinalizeHandlerArguments, HttpRequest } from "@aws-sdk/types";
3-
import { once } from "events";
43
import { PassThrough, Readable } from "stream";
54

65
import { EventSigningStream } from "./EventSigningStream";
76
import { EventStreamPayloadHandler } from "./EventStreamPayloadHandler";
87

98
jest.mock("./EventSigningStream");
10-
jest.mock("@aws-sdk/eventstream-marshaller");
9+
jest.mock("@aws-sdk/eventstream-codec");
1110

1211
describe(EventStreamPayloadHandler.name, () => {
1312
const mockSigner: EventSigner = {
@@ -19,7 +18,7 @@ describe(EventStreamPayloadHandler.name, () => {
1918

2019
beforeEach(() => {
2120
(EventSigningStream as unknown as jest.Mock).mockImplementation(() => new PassThrough());
22-
(EventStreamMarshaller as jest.Mock).mockImplementation(() => {});
21+
(EventStreamCodec as jest.Mock).mockImplementation(() => {});
2322
});
2423

2524
afterEach(() => {
@@ -88,7 +87,7 @@ describe(EventStreamPayloadHandler.name, () => {
8887
expect(EventSigningStream).toHaveBeenCalledTimes(1);
8988
expect(EventSigningStream).toHaveBeenCalledWith({
9089
priorSignature,
91-
eventMarshaller: expect.anything(),
90+
eventStreamCodec: expect.anything(),
9291
eventSigner: expect.anything(),
9392
});
9493
});

packages/eventstream-handler-node/src/EventStreamPayloadHandler.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { EventStreamMarshaller as EventMarshaller } from "@aws-sdk/eventstream-marshaller";
1+
import { EventStreamCodec } from "@aws-sdk/eventstream-codec";
22
import {
33
Decoder,
44
Encoder,
@@ -31,11 +31,11 @@ export interface EventStreamPayloadHandlerOptions {
3131
*/
3232
export class EventStreamPayloadHandler implements IEventStreamPayloadHandler {
3333
private readonly eventSigner: Provider<EventSigner>;
34-
private readonly eventMarshaller: EventMarshaller;
34+
private readonly eventStreamCodec: EventStreamCodec;
3535

3636
constructor(options: EventStreamPayloadHandlerOptions) {
3737
this.eventSigner = options.eventSigner;
38-
this.eventMarshaller = new EventMarshaller(options.utf8Encoder, options.utf8Decoder);
38+
this.eventStreamCodec = new EventStreamCodec(options.utf8Encoder, options.utf8Decoder);
3939
}
4040

4141
async handle<T extends MetadataBearer>(
@@ -72,7 +72,7 @@ export class EventStreamPayloadHandler implements IEventStreamPayloadHandler {
7272
const priorSignature = (match || [])[1];
7373
const signingStream = new EventSigningStream({
7474
priorSignature,
75-
eventMarshaller: this.eventMarshaller,
75+
eventStreamCodec: this.eventStreamCodec,
7676
eventSigner: await this.eventSigner(),
7777
});
7878

packages/eventstream-serde-universal/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
},
2020
"license": "Apache-2.0",
2121
"dependencies": {
22-
"@aws-sdk/eventstream-marshaller": "*",
22+
"@aws-sdk/eventstream-codec": "*",
2323
"@aws-sdk/types": "*",
2424
"tslib": "^2.3.1"
2525
},

packages/eventstream-serde-universal/src/EventStreamMarshaller.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { EventStreamMarshaller as EventMarshaller } from "@aws-sdk/eventstream-marshaller";
1+
import { EventStreamCodec } from "@aws-sdk/eventstream-codec";
22
import { Decoder, Encoder, EventStreamMarshaller as IEventStreamMarshaller, Message } from "@aws-sdk/types";
33

44
import { getChunkedStream } from "./getChunkedStream";
@@ -12,10 +12,11 @@ export interface EventStreamMarshallerOptions {
1212
}
1313

1414
export class EventStreamMarshaller {
15-
private readonly eventMarshaller: EventMarshaller;
15+
private readonly eventStreamCodec: EventStreamCodec;
1616
private readonly utfEncoder: Encoder;
17+
1718
constructor({ utf8Encoder, utf8Decoder }: EventStreamMarshallerOptions) {
18-
this.eventMarshaller = new EventMarshaller(utf8Encoder, utf8Decoder);
19+
this.eventStreamCodec = new EventStreamCodec(utf8Encoder, utf8Decoder);
1920
this.utfEncoder = utf8Encoder;
2021
}
2122

@@ -25,7 +26,7 @@ export class EventStreamMarshaller {
2526
): AsyncIterable<T> {
2627
const chunkedStream = getChunkedStream(body);
2728
const unmarshalledStream = getUnmarshalledStream(chunkedStream, {
28-
eventMarshaller: this.eventMarshaller,
29+
eventStreamCodec: this.eventStreamCodec,
2930
deserializer,
3031
toUtf8: this.utfEncoder,
3132
});
@@ -37,7 +38,7 @@ export class EventStreamMarshaller {
3738
const self = this;
3839
const serializedIterator = async function* () {
3940
for await (const chunk of input) {
40-
const payloadBuf = self.eventMarshaller.marshall(serializer(chunk));
41+
const payloadBuf = self.eventStreamCodec.encode(serializer(chunk));
4142
yield payloadBuf;
4243
}
4344
// Ending frame

packages/eventstream-serde-universal/src/getUnmarshalledStream.spec.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { EventStreamMarshaller } from "@aws-sdk/eventstream-marshaller";
1+
import { EventStreamCodec } from "@aws-sdk/eventstream-codec";
22
import { Message } from "@aws-sdk/types";
33
import { fromUtf8, toUtf8 } from "@aws-sdk/util-utf8-node";
44

@@ -53,7 +53,7 @@ describe("getUnmarshalledStream", () => {
5353
yield endEventMessage;
5454
};
5555
const unmarshallerStream = getUnmarshalledStream(source(), {
56-
eventMarshaller: new EventStreamMarshaller(toUtf8, fromUtf8),
56+
eventStreamCodec: new EventStreamCodec(toUtf8, fromUtf8),
5757
deserializer: (message) => Promise.resolve(message),
5858
toUtf8,
5959
});
@@ -73,7 +73,7 @@ describe("getUnmarshalledStream", () => {
7373
},
7474
};
7575
const deserStream = getUnmarshalledStream(source, {
76-
eventMarshaller: new EventStreamMarshaller(toUtf8, fromUtf8),
76+
eventStreamCodec: new EventStreamCodec(toUtf8, fromUtf8),
7777
deserializer: () => {
7878
throw new Error("error event");
7979
},
@@ -100,7 +100,7 @@ describe("getUnmarshalledStream", () => {
100100
},
101101
};
102102
const deserStream = getUnmarshalledStream(source, {
103-
eventMarshaller: new EventStreamMarshaller(toUtf8, fromUtf8),
103+
eventStreamCodec: new EventStreamCodec(toUtf8, fromUtf8),
104104
deserializer: (message) =>
105105
Promise.resolve({
106106
$unknown: message,
@@ -126,7 +126,7 @@ describe("getUnmarshalledStream", () => {
126126
yield recordEventMessage;
127127
};
128128
const unmarshallerStream = getUnmarshalledStream(source(), {
129-
eventMarshaller: new EventStreamMarshaller(toUtf8, fromUtf8),
129+
eventStreamCodec: new EventStreamCodec(toUtf8, fromUtf8),
130130
deserializer: (message) =>
131131
Promise.resolve({
132132
$unknown: message,

packages/eventstream-serde-universal/src/getUnmarshalledStream.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
import { EventStreamMarshaller as EventMarshaller } from "@aws-sdk/eventstream-marshaller";
1+
import { EventStreamCodec } from "@aws-sdk/eventstream-codec";
22
import { Encoder, Message } from "@aws-sdk/types";
33

44
export type UnmarshalledStreamOptions<T> = {
5-
eventMarshaller: EventMarshaller;
5+
eventStreamCodec: EventStreamCodec;
66
deserializer: (input: Record<string, Message>) => Promise<T>;
77
toUtf8: Encoder;
88
};
@@ -14,7 +14,7 @@ export function getUnmarshalledStream<T extends Record<string, any>>(
1414
return {
1515
[Symbol.asyncIterator]: async function* () {
1616
for await (const chunk of source) {
17-
const message = options.eventMarshaller.unmarshall(chunk);
17+
const message = options.eventStreamCodec.decode(chunk);
1818
const { value: messageType } = message.headers[":message-type"];
1919
if (messageType === "error") {
2020
// Unmodeled exception in event

0 commit comments

Comments
 (0)