Skip to content

Commit 8863824

Browse files
fix: optimize sse chunk reading off-by-one error (#1339)
1 parent 272f8f3 commit 8863824

File tree

4 files changed

+161
-127
lines changed

4 files changed

+161
-127
lines changed

Diff for: src/internal/decoders/line.ts

+31
Original file line numberDiff line numberDiff line change
@@ -143,3 +143,34 @@ function findNewlineIndex(
143143

144144
return null;
145145
}
146+
147+
export function findDoubleNewlineIndex(buffer: Uint8Array): number {
148+
// This function searches the buffer for the end patterns (\r\r, \n\n, \r\n\r\n)
149+
// and returns the index right after the first occurrence of any pattern,
150+
// or -1 if none of the patterns are found.
151+
const newline = 0x0a; // \n
152+
const carriage = 0x0d; // \r
153+
154+
for (let i = 0; i < buffer.length - 1; i++) {
155+
if (buffer[i] === newline && buffer[i + 1] === newline) {
156+
// \n\n
157+
return i + 2;
158+
}
159+
if (buffer[i] === carriage && buffer[i + 1] === carriage) {
160+
// \r\r
161+
return i + 2;
162+
}
163+
if (
164+
buffer[i] === carriage &&
165+
buffer[i + 1] === newline &&
166+
i + 3 < buffer.length &&
167+
buffer[i + 2] === carriage &&
168+
buffer[i + 3] === newline
169+
) {
170+
// \r\n\r\n
171+
return i + 4;
172+
}
173+
}
174+
175+
return -1;
176+
}

Diff for: src/streaming.ts

+1-47
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { ReadableStream, type Response } from './_shims/index';
22
import { OpenAIError } from './error';
3-
import { LineDecoder } from './internal/decoders/line';
3+
import { findDoubleNewlineIndex, LineDecoder } from './internal/decoders/line';
44
import { ReadableStreamToAsyncIterable } from './internal/stream-utils';
55

66
import { APIError } from './error';
@@ -243,37 +243,6 @@ async function* iterSSEChunks(iterator: AsyncIterableIterator<Bytes>): AsyncGene
243243
}
244244
}
245245

246-
function findDoubleNewlineIndex(buffer: Uint8Array): number {
247-
// This function searches the buffer for the end patterns (\r\r, \n\n, \r\n\r\n)
248-
// and returns the index right after the first occurrence of any pattern,
249-
// or -1 if none of the patterns are found.
250-
const newline = 0x0a; // \n
251-
const carriage = 0x0d; // \r
252-
253-
for (let i = 0; i < buffer.length - 2; i++) {
254-
if (buffer[i] === newline && buffer[i + 1] === newline) {
255-
// \n\n
256-
return i + 2;
257-
}
258-
if (buffer[i] === carriage && buffer[i + 1] === carriage) {
259-
// \r\r
260-
return i + 2;
261-
}
262-
if (
263-
buffer[i] === carriage &&
264-
buffer[i + 1] === newline &&
265-
i + 3 < buffer.length &&
266-
buffer[i + 2] === carriage &&
267-
buffer[i + 3] === newline
268-
) {
269-
// \r\n\r\n
270-
return i + 4;
271-
}
272-
}
273-
274-
return -1;
275-
}
276-
277246
class SSEDecoder {
278247
private data: string[];
279248
private event: string | null;
@@ -329,21 +298,6 @@ class SSEDecoder {
329298
}
330299
}
331300

332-
/** This is an internal helper function that's just used for testing */
333-
export function _decodeChunks(chunks: string[], { flush }: { flush: boolean } = { flush: false }): string[] {
334-
const decoder = new LineDecoder();
335-
const lines: string[] = [];
336-
for (const chunk of chunks) {
337-
lines.push(...decoder.decode(chunk));
338-
}
339-
340-
if (flush) {
341-
lines.push(...decoder.flush());
342-
}
343-
344-
return lines;
345-
}
346-
347301
function partition(str: string, delimiter: string): [string, string, string] {
348302
const index = str.indexOf(delimiter);
349303
if (index !== -1) {

Diff for: tests/internal/decoders/line.test.ts

+128
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
import { findDoubleNewlineIndex, LineDecoder } from 'openai/internal/decoders/line';
2+
3+
function decodeChunks(chunks: string[], { flush }: { flush: boolean } = { flush: false }): string[] {
4+
const decoder = new LineDecoder();
5+
const lines: string[] = [];
6+
for (const chunk of chunks) {
7+
lines.push(...decoder.decode(chunk));
8+
}
9+
10+
if (flush) {
11+
lines.push(...decoder.flush());
12+
}
13+
14+
return lines;
15+
}
16+
17+
describe('line decoder', () => {
18+
test('basic', () => {
19+
// baz is not included because the line hasn't ended yet
20+
expect(decodeChunks(['foo', ' bar\nbaz'])).toEqual(['foo bar']);
21+
});
22+
23+
test('basic with \\r', () => {
24+
expect(decodeChunks(['foo', ' bar\r\nbaz'])).toEqual(['foo bar']);
25+
expect(decodeChunks(['foo', ' bar\r\nbaz'], { flush: true })).toEqual(['foo bar', 'baz']);
26+
});
27+
28+
test('trailing new lines', () => {
29+
expect(decodeChunks(['foo', ' bar', 'baz\n', 'thing\n'])).toEqual(['foo barbaz', 'thing']);
30+
});
31+
32+
test('trailing new lines with \\r', () => {
33+
expect(decodeChunks(['foo', ' bar', 'baz\r\n', 'thing\r\n'])).toEqual(['foo barbaz', 'thing']);
34+
});
35+
36+
test('escaped new lines', () => {
37+
expect(decodeChunks(['foo', ' bar\\nbaz\n'])).toEqual(['foo bar\\nbaz']);
38+
});
39+
40+
test('escaped new lines with \\r', () => {
41+
expect(decodeChunks(['foo', ' bar\\r\\nbaz\n'])).toEqual(['foo bar\\r\\nbaz']);
42+
});
43+
44+
test('\\r & \\n split across multiple chunks', () => {
45+
expect(decodeChunks(['foo\r', '\n', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
46+
});
47+
48+
test('single \\r', () => {
49+
expect(decodeChunks(['foo\r', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
50+
});
51+
52+
test('double \\r', () => {
53+
expect(decodeChunks(['foo\r', 'bar\r'], { flush: true })).toEqual(['foo', 'bar']);
54+
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
55+
// implementation detail that we don't yield the single \r line until a new \r or \n is encountered
56+
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: false })).toEqual(['foo']);
57+
});
58+
59+
test('double \\r then \\r\\n', () => {
60+
expect(decodeChunks(['foo\r', '\r', '\r', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
61+
expect(decodeChunks(['foo\n', '\n', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
62+
});
63+
64+
test('double newline', () => {
65+
expect(decodeChunks(['foo\n\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
66+
expect(decodeChunks(['foo', '\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
67+
expect(decodeChunks(['foo\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
68+
expect(decodeChunks(['foo', '\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
69+
});
70+
71+
test('multi-byte characters across chunks', () => {
72+
const decoder = new LineDecoder();
73+
74+
// bytes taken from the string 'известни' and arbitrarily split
75+
// so that some multi-byte characters span multiple chunks
76+
expect(decoder.decode(new Uint8Array([0xd0]))).toHaveLength(0);
77+
expect(decoder.decode(new Uint8Array([0xb8, 0xd0, 0xb7, 0xd0]))).toHaveLength(0);
78+
expect(
79+
decoder.decode(new Uint8Array([0xb2, 0xd0, 0xb5, 0xd1, 0x81, 0xd1, 0x82, 0xd0, 0xbd, 0xd0, 0xb8])),
80+
).toHaveLength(0);
81+
82+
const decoded = decoder.decode(new Uint8Array([0xa]));
83+
expect(decoded).toEqual(['известни']);
84+
});
85+
86+
test('flushing trailing newlines', () => {
87+
expect(decodeChunks(['foo\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
88+
});
89+
90+
test('flushing empty buffer', () => {
91+
expect(decodeChunks([], { flush: true })).toEqual([]);
92+
});
93+
});
94+
95+
describe('findDoubleNewlineIndex', () => {
96+
test('finds \\n\\n', () => {
97+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\n\nbar'))).toBe(5);
98+
expect(findDoubleNewlineIndex(new TextEncoder().encode('\n\nbar'))).toBe(2);
99+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\n\n'))).toBe(5);
100+
expect(findDoubleNewlineIndex(new TextEncoder().encode('\n\n'))).toBe(2);
101+
});
102+
103+
test('finds \\r\\r', () => {
104+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\rbar'))).toBe(5);
105+
expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\rbar'))).toBe(2);
106+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\r'))).toBe(5);
107+
expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\r'))).toBe(2);
108+
});
109+
110+
test('finds \\r\\n\\r\\n', () => {
111+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n\r\nbar'))).toBe(7);
112+
expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\n\r\nbar'))).toBe(4);
113+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n\r\n'))).toBe(7);
114+
expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\n\r\n'))).toBe(4);
115+
});
116+
117+
test('returns -1 when no double newline found', () => {
118+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\nbar'))).toBe(-1);
119+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\rbar'))).toBe(-1);
120+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\nbar'))).toBe(-1);
121+
expect(findDoubleNewlineIndex(new TextEncoder().encode(''))).toBe(-1);
122+
});
123+
124+
test('handles incomplete patterns', () => {
125+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n\r'))).toBe(-1);
126+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n'))).toBe(-1);
127+
});
128+
});

Diff for: tests/streaming.test.ts

+1-80
Original file line numberDiff line numberDiff line change
@@ -1,86 +1,7 @@
11
import { Response } from 'node-fetch';
22
import { PassThrough } from 'stream';
33
import assert from 'assert';
4-
import { _iterSSEMessages, _decodeChunks as decodeChunks } from 'openai/streaming';
5-
import { LineDecoder } from 'openai/internal/decoders/line';
6-
7-
describe('line decoder', () => {
8-
test('basic', () => {
9-
// baz is not included because the line hasn't ended yet
10-
expect(decodeChunks(['foo', ' bar\nbaz'])).toEqual(['foo bar']);
11-
});
12-
13-
test('basic with \\r', () => {
14-
expect(decodeChunks(['foo', ' bar\r\nbaz'])).toEqual(['foo bar']);
15-
expect(decodeChunks(['foo', ' bar\r\nbaz'], { flush: true })).toEqual(['foo bar', 'baz']);
16-
});
17-
18-
test('trailing new lines', () => {
19-
expect(decodeChunks(['foo', ' bar', 'baz\n', 'thing\n'])).toEqual(['foo barbaz', 'thing']);
20-
});
21-
22-
test('trailing new lines with \\r', () => {
23-
expect(decodeChunks(['foo', ' bar', 'baz\r\n', 'thing\r\n'])).toEqual(['foo barbaz', 'thing']);
24-
});
25-
26-
test('escaped new lines', () => {
27-
expect(decodeChunks(['foo', ' bar\\nbaz\n'])).toEqual(['foo bar\\nbaz']);
28-
});
29-
30-
test('escaped new lines with \\r', () => {
31-
expect(decodeChunks(['foo', ' bar\\r\\nbaz\n'])).toEqual(['foo bar\\r\\nbaz']);
32-
});
33-
34-
test('\\r & \\n split across multiple chunks', () => {
35-
expect(decodeChunks(['foo\r', '\n', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
36-
});
37-
38-
test('single \\r', () => {
39-
expect(decodeChunks(['foo\r', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
40-
});
41-
42-
test('double \\r', () => {
43-
expect(decodeChunks(['foo\r', 'bar\r'], { flush: true })).toEqual(['foo', 'bar']);
44-
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
45-
// implementation detail that we don't yield the single \r line until a new \r or \n is encountered
46-
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: false })).toEqual(['foo']);
47-
});
48-
49-
test('double \\r then \\r\\n', () => {
50-
expect(decodeChunks(['foo\r', '\r', '\r', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
51-
expect(decodeChunks(['foo\n', '\n', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
52-
});
53-
54-
test('double newline', () => {
55-
expect(decodeChunks(['foo\n\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
56-
expect(decodeChunks(['foo', '\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
57-
expect(decodeChunks(['foo\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
58-
expect(decodeChunks(['foo', '\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
59-
});
60-
61-
test('multi-byte characters across chunks', () => {
62-
const decoder = new LineDecoder();
63-
64-
// bytes taken from the string 'известни' and arbitrarily split
65-
// so that some multi-byte characters span multiple chunks
66-
expect(decoder.decode(new Uint8Array([0xd0]))).toHaveLength(0);
67-
expect(decoder.decode(new Uint8Array([0xb8, 0xd0, 0xb7, 0xd0]))).toHaveLength(0);
68-
expect(
69-
decoder.decode(new Uint8Array([0xb2, 0xd0, 0xb5, 0xd1, 0x81, 0xd1, 0x82, 0xd0, 0xbd, 0xd0, 0xb8])),
70-
).toHaveLength(0);
71-
72-
const decoded = decoder.decode(new Uint8Array([0xa]));
73-
expect(decoded).toEqual(['известни']);
74-
});
75-
76-
test('flushing trailing newlines', () => {
77-
expect(decodeChunks(['foo\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
78-
});
79-
80-
test('flushing empty buffer', () => {
81-
expect(decodeChunks([], { flush: true })).toEqual([]);
82-
});
83-
});
4+
import { _iterSSEMessages } from 'openai/streaming';
845

856
describe('streaming decoding', () => {
867
test('basic', async () => {

0 commit comments

Comments
 (0)