Skip to content

Commit c64e2ba

Browse files
committed
revise vercel streaming
1 parent 5ae6b57 commit c64e2ba

File tree

1 file changed

+72
-56
lines changed

1 file changed

+72
-56
lines changed

templates/types/streaming/fastapi/app/api/routers/vercel_response.py

Lines changed: 72 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -45,51 +45,13 @@ async def content_generator(
4545
chat_data: ChatData,
4646
background_tasks: BackgroundTasks,
4747
):
48-
# Yield the events from the event handler
49-
async def _event_generator():
50-
async for event in event_handler.async_event_gen():
51-
event_response = event.to_response()
52-
if event_response is not None:
53-
yield cls.convert_data(event_response)
54-
55-
# Yield the text response
56-
async def _chat_response_generator():
57-
# Wait for the response from the chat engine
58-
result = await response
59-
60-
# Once we got a source node, start a background task to download the files (if needed)
61-
cls.process_response_nodes(result.source_nodes, background_tasks)
62-
63-
# Yield the source nodes
64-
yield cls.convert_data(
65-
{
66-
"type": "sources",
67-
"data": {
68-
"nodes": [
69-
SourceNodes.from_source_node(node).model_dump()
70-
for node in result.source_nodes
71-
]
72-
},
73-
}
74-
)
75-
76-
final_response = ""
77-
async for token in result.async_response_gen():
78-
final_response += token
79-
yield cls.convert_text(token)
80-
81-
# Generate next questions if next question prompt is configured
82-
question_data = await cls._generate_next_questions(
83-
chat_data.messages, final_response
84-
)
85-
if question_data:
86-
yield cls.convert_data(question_data)
87-
88-
# the text_generator is the leading stream, once it's finished, also finish the event stream
89-
event_handler.is_done = True
48+
chat_response_generator = cls._chat_response_generator(
49+
response, background_tasks, event_handler, chat_data
50+
)
51+
event_generator = cls._event_generator(event_handler)
9052

9153
# Merge the chat response generator and the event generator
92-
combine = stream.merge(_chat_response_generator(), _event_generator())
54+
combine = stream.merge(chat_response_generator, event_generator)
9355
is_stream_started = False
9456
async with combine.stream() as streamer:
9557
async for output in streamer:
@@ -103,17 +65,60 @@ async def _chat_response_generator():
10365
if await request.is_disconnected():
10466
break
10567

106-
@staticmethod
107-
async def _generate_next_questions(chat_history: List[Message], response: str):
108-
questions = await NextQuestionSuggestion.suggest_next_questions(
109-
chat_history, response
110-
)
111-
if questions:
112-
return {
113-
"type": "suggested_questions",
114-
"data": questions,
68+
@classmethod
69+
async def _event_generator(cls, event_handler: EventCallbackHandler):
70+
"""
71+
Yield the events from the event handler
72+
"""
73+
async for event in event_handler.async_event_gen():
74+
event_response = event.to_response()
75+
if event_response is not None:
76+
yield cls.convert_data(event_response)
77+
78+
@classmethod
79+
async def _chat_response_generator(
80+
cls,
81+
response: Awaitable[StreamingAgentChatResponse],
82+
background_tasks: BackgroundTasks,
83+
event_handler: EventCallbackHandler,
84+
chat_data: ChatData,
85+
):
86+
"""
87+
Yield the text response and source nodes from the chat engine
88+
"""
89+
# Wait for the response from the chat engine
90+
result = await response
91+
92+
# Once we got a source node, start a background task to download the files (if needed)
93+
cls._process_response_nodes(result.source_nodes, background_tasks)
94+
95+
# Yield the source nodes
96+
yield cls.convert_data(
97+
{
98+
"type": "sources",
99+
"data": {
100+
"nodes": [
101+
SourceNodes.from_source_node(node).model_dump()
102+
for node in result.source_nodes
103+
]
104+
},
115105
}
116-
return None
106+
)
107+
108+
final_response = ""
109+
async for token in result.async_response_gen():
110+
final_response += token
111+
yield cls.convert_text(token)
112+
113+
# Generate next questions if next question prompt is configured
114+
question_data = await cls._generate_next_questions(
115+
chat_data.messages, final_response
116+
)
117+
if question_data:
118+
yield cls.convert_data(question_data)
119+
120+
# the text_generator is the leading stream, once it's finished, also finish the event stream
121+
event_handler.is_done = True
117122

118123
@classmethod
119124
def convert_text(cls, token: str):
@@ -126,9 +131,8 @@ def convert_data(cls, data: dict):
126131
data_str = json.dumps(data)
127132
return f"{cls.DATA_PREFIX}[{data_str}]\n"
128133

129-
@classmethod
130-
def process_response_nodes(
131-
cls,
134+
@staticmethod
135+
def _process_response_nodes(
132136
source_nodes: List[NodeWithScore],
133137
background_tasks: BackgroundTasks,
134138
):
@@ -144,3 +148,15 @@ def process_response_nodes(
144148
"LlamaCloud is not configured. Skipping post processing of nodes"
145149
)
146150
pass
151+
152+
@staticmethod
153+
async def _generate_next_questions(chat_history: List[Message], response: str):
154+
questions = await NextQuestionSuggestion.suggest_next_questions(
155+
chat_history, response
156+
)
157+
if questions:
158+
return {
159+
"type": "suggested_questions",
160+
"data": questions,
161+
}
162+
return None

0 commit comments

Comments
 (0)