From 37d0b25b6e82cd381e5d1aa6e28f1a1311d02353 Mon Sep 17 00:00:00 2001 From: Robert Craigie Date: Thu, 22 May 2025 13:45:20 +0100 Subject: [PATCH 1/2] docs(readme): fix async example --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index f7e0eb6467..b523311f0e 100644 --- a/README.md +++ b/README.md @@ -180,7 +180,7 @@ async def main(): stream=True, ) - for event in stream: + async for event in stream: print(event) From f1ad1070c1d648e18021fc9adc6dc98f50805f70 Mon Sep 17 00:00:00 2001 From: Robert Craigie Date: Thu, 22 May 2025 13:55:13 +0100 Subject: [PATCH 2/2] fix(streaming): invert logic for assistant stream parsing --- src/openai/_streaming.py | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/src/openai/_streaming.py b/src/openai/_streaming.py index f5621f92a7..4f57845399 100644 --- a/src/openai/_streaming.py +++ b/src/openai/_streaming.py @@ -59,7 +59,7 @@ def __stream__(self) -> Iterator[_T]: if sse.data.startswith("[DONE]"): break - if sse.event is None or sse.event.startswith("response.") or sse.event.startswith("transcript."): + if sse.event == "error": data = sse.json() if is_mapping(data) and data.get("error"): message = None @@ -75,12 +75,13 @@ def __stream__(self) -> Iterator[_T]: body=data["error"], ) - yield process_data(data=data, cast_to=cast_to, response=response) + if sse.event and sse.event.startswith('thread.'): + # the assistants API uses a different event shape structure + yield process_data(data={"data": sse.json(), "event": sse.event}, cast_to=cast_to, response=response) else: data = sse.json() - - if sse.event == "error" and is_mapping(data) and data.get("error"): + if is_mapping(data) and data.get("error"): message = None error = data.get("error") if is_mapping(error): @@ -94,7 +95,7 @@ def __stream__(self) -> Iterator[_T]: body=data["error"], ) - yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response) + yield process_data(data=data, cast_to=cast_to, response=response) # Ensure the entire stream is consumed for _sse in iterator: @@ -161,7 +162,7 @@ async def __stream__(self) -> AsyncIterator[_T]: if sse.data.startswith("[DONE]"): break - if sse.event is None or sse.event.startswith("response.") or sse.event.startswith("transcript."): + if sse.event == "error": data = sse.json() if is_mapping(data) and data.get("error"): message = None @@ -177,12 +178,13 @@ async def __stream__(self) -> AsyncIterator[_T]: body=data["error"], ) - yield process_data(data=data, cast_to=cast_to, response=response) + if sse.event and sse.event.startswith('thread.'): + # the assistants API uses a different event shape structure + yield process_data(data={"data": sse.json(), "event": sse.event}, cast_to=cast_to, response=response) else: data = sse.json() - - if sse.event == "error" and is_mapping(data) and data.get("error"): + if is_mapping(data) and data.get("error"): message = None error = data.get("error") if is_mapping(error): @@ -196,7 +198,7 @@ async def __stream__(self) -> AsyncIterator[_T]: body=data["error"], ) - yield process_data(data={"data": data, "event": sse.event}, cast_to=cast_to, response=response) + yield process_data(data=data, cast_to=cast_to, response=response) # Ensure the entire stream is consumed async for _sse in iterator: