Skip to content

Commit c82795b

Browse files
stainless-app[bot]stainless-bot
authored andcommitted
fix: optimize sse chunk reading off-by-one error (#1339)
1 parent 212710d commit c82795b

File tree

4 files changed

+161
-127
lines changed

4 files changed

+161
-127
lines changed

Diff for: src/internal/decoders/line.ts

+31
Original file line numberDiff line numberDiff line change
@@ -143,3 +143,34 @@ function findNewlineIndex(
143143

144144
return null;
145145
}
146+
147+
export function findDoubleNewlineIndex(buffer: Uint8Array): number {
148+
// This function searches the buffer for the end patterns (\r\r, \n\n, \r\n\r\n)
149+
// and returns the index right after the first occurrence of any pattern,
150+
// or -1 if none of the patterns are found.
151+
const newline = 0x0a; // \n
152+
const carriage = 0x0d; // \r
153+
154+
for (let i = 0; i < buffer.length - 1; i++) {
155+
if (buffer[i] === newline && buffer[i + 1] === newline) {
156+
// \n\n
157+
return i + 2;
158+
}
159+
if (buffer[i] === carriage && buffer[i + 1] === carriage) {
160+
// \r\r
161+
return i + 2;
162+
}
163+
if (
164+
buffer[i] === carriage &&
165+
buffer[i + 1] === newline &&
166+
i + 3 < buffer.length &&
167+
buffer[i + 2] === carriage &&
168+
buffer[i + 3] === newline
169+
) {
170+
// \r\n\r\n
171+
return i + 4;
172+
}
173+
}
174+
175+
return -1;
176+
}

Diff for: src/streaming.ts

+1-47
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { ReadableStream, type Response } from './_shims/index';
22
import { OpenAIError } from './error';
3-
import { LineDecoder } from './internal/decoders/line';
3+
import { findDoubleNewlineIndex, LineDecoder } from './internal/decoders/line';
44
import { ReadableStreamToAsyncIterable } from './internal/stream-utils';
55

66
import { APIError } from './error';
@@ -259,37 +259,6 @@ async function* iterSSEChunks(iterator: AsyncIterableIterator<Bytes>): AsyncGene
259259
}
260260
}
261261

262-
function findDoubleNewlineIndex(buffer: Uint8Array): number {
263-
// This function searches the buffer for the end patterns (\r\r, \n\n, \r\n\r\n)
264-
// and returns the index right after the first occurrence of any pattern,
265-
// or -1 if none of the patterns are found.
266-
const newline = 0x0a; // \n
267-
const carriage = 0x0d; // \r
268-
269-
for (let i = 0; i < buffer.length - 2; i++) {
270-
if (buffer[i] === newline && buffer[i + 1] === newline) {
271-
// \n\n
272-
return i + 2;
273-
}
274-
if (buffer[i] === carriage && buffer[i + 1] === carriage) {
275-
// \r\r
276-
return i + 2;
277-
}
278-
if (
279-
buffer[i] === carriage &&
280-
buffer[i + 1] === newline &&
281-
i + 3 < buffer.length &&
282-
buffer[i + 2] === carriage &&
283-
buffer[i + 3] === newline
284-
) {
285-
// \r\n\r\n
286-
return i + 4;
287-
}
288-
}
289-
290-
return -1;
291-
}
292-
293262
class SSEDecoder {
294263
private data: string[];
295264
private event: string | null;
@@ -345,21 +314,6 @@ class SSEDecoder {
345314
}
346315
}
347316

348-
/** This is an internal helper function that's just used for testing */
349-
export function _decodeChunks(chunks: string[], { flush }: { flush: boolean } = { flush: false }): string[] {
350-
const decoder = new LineDecoder();
351-
const lines: string[] = [];
352-
for (const chunk of chunks) {
353-
lines.push(...decoder.decode(chunk));
354-
}
355-
356-
if (flush) {
357-
lines.push(...decoder.flush());
358-
}
359-
360-
return lines;
361-
}
362-
363317
function partition(str: string, delimiter: string): [string, string, string] {
364318
const index = str.indexOf(delimiter);
365319
if (index !== -1) {

Diff for: tests/internal/decoders/line.test.ts

+128
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
import { findDoubleNewlineIndex, LineDecoder } from 'openai/internal/decoders/line';
2+
3+
function decodeChunks(chunks: string[], { flush }: { flush: boolean } = { flush: false }): string[] {
4+
const decoder = new LineDecoder();
5+
const lines: string[] = [];
6+
for (const chunk of chunks) {
7+
lines.push(...decoder.decode(chunk));
8+
}
9+
10+
if (flush) {
11+
lines.push(...decoder.flush());
12+
}
13+
14+
return lines;
15+
}
16+
17+
describe('line decoder', () => {
18+
test('basic', () => {
19+
// baz is not included because the line hasn't ended yet
20+
expect(decodeChunks(['foo', ' bar\nbaz'])).toEqual(['foo bar']);
21+
});
22+
23+
test('basic with \\r', () => {
24+
expect(decodeChunks(['foo', ' bar\r\nbaz'])).toEqual(['foo bar']);
25+
expect(decodeChunks(['foo', ' bar\r\nbaz'], { flush: true })).toEqual(['foo bar', 'baz']);
26+
});
27+
28+
test('trailing new lines', () => {
29+
expect(decodeChunks(['foo', ' bar', 'baz\n', 'thing\n'])).toEqual(['foo barbaz', 'thing']);
30+
});
31+
32+
test('trailing new lines with \\r', () => {
33+
expect(decodeChunks(['foo', ' bar', 'baz\r\n', 'thing\r\n'])).toEqual(['foo barbaz', 'thing']);
34+
});
35+
36+
test('escaped new lines', () => {
37+
expect(decodeChunks(['foo', ' bar\\nbaz\n'])).toEqual(['foo bar\\nbaz']);
38+
});
39+
40+
test('escaped new lines with \\r', () => {
41+
expect(decodeChunks(['foo', ' bar\\r\\nbaz\n'])).toEqual(['foo bar\\r\\nbaz']);
42+
});
43+
44+
test('\\r & \\n split across multiple chunks', () => {
45+
expect(decodeChunks(['foo\r', '\n', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
46+
});
47+
48+
test('single \\r', () => {
49+
expect(decodeChunks(['foo\r', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
50+
});
51+
52+
test('double \\r', () => {
53+
expect(decodeChunks(['foo\r', 'bar\r'], { flush: true })).toEqual(['foo', 'bar']);
54+
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
55+
// implementation detail that we don't yield the single \r line until a new \r or \n is encountered
56+
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: false })).toEqual(['foo']);
57+
});
58+
59+
test('double \\r then \\r\\n', () => {
60+
expect(decodeChunks(['foo\r', '\r', '\r', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
61+
expect(decodeChunks(['foo\n', '\n', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
62+
});
63+
64+
test('double newline', () => {
65+
expect(decodeChunks(['foo\n\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
66+
expect(decodeChunks(['foo', '\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
67+
expect(decodeChunks(['foo\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
68+
expect(decodeChunks(['foo', '\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
69+
});
70+
71+
test('multi-byte characters across chunks', () => {
72+
const decoder = new LineDecoder();
73+
74+
// bytes taken from the string 'известни' and arbitrarily split
75+
// so that some multi-byte characters span multiple chunks
76+
expect(decoder.decode(new Uint8Array([0xd0]))).toHaveLength(0);
77+
expect(decoder.decode(new Uint8Array([0xb8, 0xd0, 0xb7, 0xd0]))).toHaveLength(0);
78+
expect(
79+
decoder.decode(new Uint8Array([0xb2, 0xd0, 0xb5, 0xd1, 0x81, 0xd1, 0x82, 0xd0, 0xbd, 0xd0, 0xb8])),
80+
).toHaveLength(0);
81+
82+
const decoded = decoder.decode(new Uint8Array([0xa]));
83+
expect(decoded).toEqual(['известни']);
84+
});
85+
86+
test('flushing trailing newlines', () => {
87+
expect(decodeChunks(['foo\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
88+
});
89+
90+
test('flushing empty buffer', () => {
91+
expect(decodeChunks([], { flush: true })).toEqual([]);
92+
});
93+
});
94+
95+
describe('findDoubleNewlineIndex', () => {
96+
test('finds \\n\\n', () => {
97+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\n\nbar'))).toBe(5);
98+
expect(findDoubleNewlineIndex(new TextEncoder().encode('\n\nbar'))).toBe(2);
99+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\n\n'))).toBe(5);
100+
expect(findDoubleNewlineIndex(new TextEncoder().encode('\n\n'))).toBe(2);
101+
});
102+
103+
test('finds \\r\\r', () => {
104+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\rbar'))).toBe(5);
105+
expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\rbar'))).toBe(2);
106+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\r'))).toBe(5);
107+
expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\r'))).toBe(2);
108+
});
109+
110+
test('finds \\r\\n\\r\\n', () => {
111+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n\r\nbar'))).toBe(7);
112+
expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\n\r\nbar'))).toBe(4);
113+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n\r\n'))).toBe(7);
114+
expect(findDoubleNewlineIndex(new TextEncoder().encode('\r\n\r\n'))).toBe(4);
115+
});
116+
117+
test('returns -1 when no double newline found', () => {
118+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\nbar'))).toBe(-1);
119+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\rbar'))).toBe(-1);
120+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\nbar'))).toBe(-1);
121+
expect(findDoubleNewlineIndex(new TextEncoder().encode(''))).toBe(-1);
122+
});
123+
124+
test('handles incomplete patterns', () => {
125+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n\r'))).toBe(-1);
126+
expect(findDoubleNewlineIndex(new TextEncoder().encode('foo\r\n'))).toBe(-1);
127+
});
128+
});

Diff for: tests/streaming.test.ts

+1-80
Original file line numberDiff line numberDiff line change
@@ -1,86 +1,7 @@
11
import { Response } from 'node-fetch';
22
import { PassThrough } from 'stream';
33
import assert from 'assert';
4-
import { _iterSSEMessages, _decodeChunks as decodeChunks } from 'openai/streaming';
5-
import { LineDecoder } from 'openai/internal/decoders/line';
6-
7-
describe('line decoder', () => {
8-
test('basic', () => {
9-
// baz is not included because the line hasn't ended yet
10-
expect(decodeChunks(['foo', ' bar\nbaz'])).toEqual(['foo bar']);
11-
});
12-
13-
test('basic with \\r', () => {
14-
expect(decodeChunks(['foo', ' bar\r\nbaz'])).toEqual(['foo bar']);
15-
expect(decodeChunks(['foo', ' bar\r\nbaz'], { flush: true })).toEqual(['foo bar', 'baz']);
16-
});
17-
18-
test('trailing new lines', () => {
19-
expect(decodeChunks(['foo', ' bar', 'baz\n', 'thing\n'])).toEqual(['foo barbaz', 'thing']);
20-
});
21-
22-
test('trailing new lines with \\r', () => {
23-
expect(decodeChunks(['foo', ' bar', 'baz\r\n', 'thing\r\n'])).toEqual(['foo barbaz', 'thing']);
24-
});
25-
26-
test('escaped new lines', () => {
27-
expect(decodeChunks(['foo', ' bar\\nbaz\n'])).toEqual(['foo bar\\nbaz']);
28-
});
29-
30-
test('escaped new lines with \\r', () => {
31-
expect(decodeChunks(['foo', ' bar\\r\\nbaz\n'])).toEqual(['foo bar\\r\\nbaz']);
32-
});
33-
34-
test('\\r & \\n split across multiple chunks', () => {
35-
expect(decodeChunks(['foo\r', '\n', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
36-
});
37-
38-
test('single \\r', () => {
39-
expect(decodeChunks(['foo\r', 'bar'], { flush: true })).toEqual(['foo', 'bar']);
40-
});
41-
42-
test('double \\r', () => {
43-
expect(decodeChunks(['foo\r', 'bar\r'], { flush: true })).toEqual(['foo', 'bar']);
44-
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
45-
// implementation detail that we don't yield the single \r line until a new \r or \n is encountered
46-
expect(decodeChunks(['foo\r', '\r', 'bar'], { flush: false })).toEqual(['foo']);
47-
});
48-
49-
test('double \\r then \\r\\n', () => {
50-
expect(decodeChunks(['foo\r', '\r', '\r', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
51-
expect(decodeChunks(['foo\n', '\n', '\n', 'bar', '\n'])).toEqual(['foo', '', '', 'bar']);
52-
});
53-
54-
test('double newline', () => {
55-
expect(decodeChunks(['foo\n\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
56-
expect(decodeChunks(['foo', '\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
57-
expect(decodeChunks(['foo\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
58-
expect(decodeChunks(['foo', '\n', '\n', 'bar'], { flush: true })).toEqual(['foo', '', 'bar']);
59-
});
60-
61-
test('multi-byte characters across chunks', () => {
62-
const decoder = new LineDecoder();
63-
64-
// bytes taken from the string 'известни' and arbitrarily split
65-
// so that some multi-byte characters span multiple chunks
66-
expect(decoder.decode(new Uint8Array([0xd0]))).toHaveLength(0);
67-
expect(decoder.decode(new Uint8Array([0xb8, 0xd0, 0xb7, 0xd0]))).toHaveLength(0);
68-
expect(
69-
decoder.decode(new Uint8Array([0xb2, 0xd0, 0xb5, 0xd1, 0x81, 0xd1, 0x82, 0xd0, 0xbd, 0xd0, 0xb8])),
70-
).toHaveLength(0);
71-
72-
const decoded = decoder.decode(new Uint8Array([0xa]));
73-
expect(decoded).toEqual(['известни']);
74-
});
75-
76-
test('flushing trailing newlines', () => {
77-
expect(decodeChunks(['foo\n', '\nbar'], { flush: true })).toEqual(['foo', '', 'bar']);
78-
});
79-
80-
test('flushing empty buffer', () => {
81-
expect(decodeChunks([], { flush: true })).toEqual([]);
82-
});
83-
});
4+
import { _iterSSEMessages } from 'openai/streaming';
845

856
describe('streaming decoding', () => {
867
test('basic', async () => {

0 commit comments

Comments
 (0)