Skip to content

Commit 7ba6f14

Browse files
fix: correctly decode multi-byte characters over multiple chunks (#1316)
1 parent dd8b990 commit 7ba6f14

File tree

3 files changed

+126
-40
lines changed

3 files changed

+126
-40
lines changed

Diff for: src/internal/decoders/line.ts

+69-38
Original file line numberDiff line numberDiff line change
@@ -13,52 +13,58 @@ export class LineDecoder {
1313
static NEWLINE_CHARS = new Set(['\n', '\r']);
1414
static NEWLINE_REGEXP = /\r\n|[\n\r]/g;
1515

16-
buffer: string[];
17-
trailingCR: boolean;
16+
buffer: Uint8Array;
17+
#carriageReturnIndex: number | null;
1818
textDecoder: any; // TextDecoder found in browsers; not typed to avoid pulling in either "dom" or "node" types.
1919

2020
constructor() {
21-
this.buffer = [];
22-
this.trailingCR = false;
21+
this.buffer = new Uint8Array();
22+
this.#carriageReturnIndex = null;
2323
}
2424

2525
decode(chunk: Bytes): string[] {
26-
let text = this.decodeText(chunk);
27-
28-
if (this.trailingCR) {
29-
text = '\r' + text;
30-
this.trailingCR = false;
31-
}
32-
if (text.endsWith('\r')) {
33-
this.trailingCR = true;
34-
text = text.slice(0, -1);
35-
}
36-
37-
if (!text) {
26+
if (chunk == null) {
3827
return [];
3928
}
4029

41-
const trailingNewline = LineDecoder.NEWLINE_CHARS.has(text[text.length - 1] || '');
42-
let lines = text.split(LineDecoder.NEWLINE_REGEXP);
30+
const binaryChunk =
31+
chunk instanceof ArrayBuffer ? new Uint8Array(chunk)
32+
: typeof chunk === 'string' ? new TextEncoder().encode(chunk)
33+
: chunk;
34+
35+
let newData = new Uint8Array(this.buffer.length + binaryChunk.length);
36+
newData.set(this.buffer);
37+
newData.set(binaryChunk, this.buffer.length);
38+
this.buffer = newData;
39+
40+
const lines: string[] = [];
41+
let patternIndex;
42+
while ((patternIndex = findNewlineIndex(this.buffer, this.#carriageReturnIndex)) != null) {
43+
if (patternIndex.carriage && this.#carriageReturnIndex == null) {
44+
// skip until we either get a corresponding `\n`, a new `\r` or nothing
45+
this.#carriageReturnIndex = patternIndex.index;
46+
continue;
47+
}
4348

44-
// if there is a trailing new line then the last entry will be an empty
45-
// string which we don't care about
46-
if (trailingNewline) {
47-
lines.pop();
48-
}
49+
// we got double \r or \rtext\n
50+
if (
51+
this.#carriageReturnIndex != null &&
52+
(patternIndex.index !== this.#carriageReturnIndex + 1 || patternIndex.carriage)
53+
) {
54+
lines.push(this.decodeText(this.buffer.slice(0, this.#carriageReturnIndex - 1)));
55+
this.buffer = this.buffer.slice(this.#carriageReturnIndex);
56+
this.#carriageReturnIndex = null;
57+
continue;
58+
}
4959

50-
if (lines.length === 1 && !trailingNewline) {
51-
this.buffer.push(lines[0]!);
52-
return [];
53-
}
60+
const endIndex =
61+
this.#carriageReturnIndex !== null ? patternIndex.preceding - 1 : patternIndex.preceding;
5462

55-
if (this.buffer.length > 0) {
56-
lines = [this.buffer.join('') + lines[0], ...lines.slice(1)];
57-
this.buffer = [];
58-
}
63+
const line = this.decodeText(this.buffer.slice(0, endIndex));
64+
lines.push(line);
5965

60-
if (!trailingNewline) {
61-
this.buffer = [lines.pop() || ''];
66+
this.buffer = this.buffer.slice(patternIndex.index);
67+
this.#carriageReturnIndex = null;
6268
}
6369

6470
return lines;
@@ -102,13 +108,38 @@ export class LineDecoder {
102108
}
103109

104110
flush(): string[] {
105-
if (!this.buffer.length && !this.trailingCR) {
111+
if (!this.buffer.length) {
106112
return [];
107113
}
114+
return this.decode('\n');
115+
}
116+
}
108117

109-
const lines = [this.buffer.join('')];
110-
this.buffer = [];
111-
this.trailingCR = false;
112-
return lines;
118+
/**
119+
* This function searches the buffer for the end patterns, (\r or \n)
120+
* and returns an object with the index preceding the matched newline and the
121+
* index after the newline char. `null` is returned if no new line is found.
122+
*
123+
* ```ts
124+
* findNewLineIndex('abc\ndef') -> { preceding: 2, index: 3 }
125+
* ```
126+
*/
127+
function findNewlineIndex(
128+
buffer: Uint8Array,
129+
startIndex: number | null,
130+
): { preceding: number; index: number; carriage: boolean } | null {
131+
const newline = 0x0a; // \n
132+
const carriage = 0x0d; // \r
133+
134+
for (let i = startIndex ?? 0; i < buffer.length; i++) {
135+
if (buffer[i] === newline) {
136+
return { preceding: i, index: i + 1, carriage: false };
137+
}
138+
139+
if (buffer[i] === carriage) {
140+
return { preceding: i, index: i + 1, carriage: true };
141+
}
113142
}
143+
144+
return null;
114145
}

Diff for: src/streaming.ts

+5-1
Original file line numberDiff line numberDiff line change
@@ -330,13 +330,17 @@ class SSEDecoder {
330330
}
331331

332332
/** This is an internal helper function that's just used for testing */
333-
export function _decodeChunks(chunks: string[]): string[] {
333+
export function _decodeChunks(chunks: string[], { flush }: { flush: boolean } = { flush: false }): string[] {
334334
const decoder = new LineDecoder();
335335
const lines: string[] = [];
336336
for (const chunk of chunks) {
337337
lines.push(...decoder.decode(chunk));
338338
}
339339

340+
if (flush) {
341+
lines.push(...decoder.flush());
342+
}
343+
340344
return lines;
341345
}
342346

Diff for: tests/streaming.test.ts

+52-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { Response } from 'node-fetch';
22
import { PassThrough } from 'stream';
33
import assert from 'assert';
44
import { _iterSSEMessages, _decodeChunks as decodeChunks } from 'openai/streaming';
5+
import { LineDecoder } from 'openai/internal/decoders/line';
56

67
describe('line decoder', () => {
78
test('basic', () => {
@@ -10,8 +11,8 @@ describe('line decoder', () => {
1011
});
1112

1213
test('basic with \\r', () => {
13-
// baz is not included because the line hasn't ended yet
1414
expect(decodeChunks(['foo', ' bar\r\nbaz'])).toEqual(['foo bar']);
15+
expect(decodeChunks(['foo', ' bar\r\nbaz'], { flush: true })).toEqual(['foo bar', 'baz']);
1516
});
1617

1718
test('trailing new lines', () => {
@@ -29,6 +30,56 @@ describe('line decoder', () => {
2930
test('escaped new lines with \\r', () => {
3031
expect(decodeChunks(['foo', ' bar\\r\\nbaz\n'])).toEqual(['foo bar\\r\\nbaz']);
3132
});
33+
34+
test('\\r & \\n split across multiple chunks', () => {
35+
expect(decodeChunks(['foo\r', '\n', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
36+
});
37+
38+
test('single \\r', () => {
39+
expect(decodeChunks(['foo\r', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
40+
});
41+
42+
test('double \\r', () => {
43+
expect(decodeChunks(['foo\r', 'bar\r'], { flush: true })).toEqual(['foo', 'bar']);
44+
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
45+
// implementation detail that we don't yield the single \r line until a new \r or \n is encountered
46+
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: false })).toEqual(['foo']);
47+
});
48+
49+
test('double \\r then \\r\\n', () => {
50+
expect(decodeChunks(['foo\r', '\r', '\r', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
51+
expect(decodeChunks(['foo\n', '\n', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
52+
});
53+
54+
test('double newline', () => {
55+
expect(decodeChunks(['foo\n\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
56+
expect(decodeChunks(['foo', '\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
57+
expect(decodeChunks(['foo\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
58+
expect(decodeChunks(['foo', '\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
59+
});
60+
61+
test('multi-byte characters across chunks', () => {
62+
const decoder = new LineDecoder();
63+
64+
// bytes taken from the string 'известни' and arbitrarily split
65+
// so that some multi-byte characters span multiple chunks
66+
expect(decoder.decode(new Uint8Array([0xd0]))).toHaveLength(0);
67+
expect(decoder.decode(new Uint8Array([0xb8, 0xd0, 0xb7, 0xd0]))).toHaveLength(0);
68+
expect(
69+
decoder.decode(new Uint8Array([0xb2, 0xd0, 0xb5, 0xd1, 0x81, 0xd1, 0x82, 0xd0, 0xbd, 0xd0, 0xb8])),
70+
).toHaveLength(0);
71+
72+
const decoded = decoder.decode(new Uint8Array([0xa]));
73+
expect(decoded).toEqual(['известни']);
74+
});
75+
76+
test('flushing trailing newlines', () => {
77+
expect(decodeChunks(['foo\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
78+
});
79+
80+
test('flushing empty buffer', () => {
81+
expect(decodeChunks([], { flush: true })).toEqual([]);
82+
});
3283
});
3384

3485
describe('streaming decoding', () => {

0 commit comments

Comments
 (0)