-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathstreaming.go
98 lines (82 loc) · 2.18 KB
/
streaming.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package llm
import (
"context"
"io"
"strings"
)
// StreamHandler defines how to handle streaming tokens, tool calls,
// and completion events from an LLM.
type StreamHandler interface {
// Called once right before tokens start streaming.
OnStart()
// Called whenever the LLM produces a new token (partial output).
OnToken(token string)
// If the LLM triggers an external tool call, you can capture it here.
OnToolCall(toolCall ToolCall)
// Called when the LLM produces a final complete message.
OnComplete(message OutputMessage)
// Called if an error occurs during the streaming process.
OnError(err error)
}
func StreamChatCompletion(
ctx context.Context,
req ChatCompletionRequest,
handler StreamHandler,
model LLM,
) error {
stream, err := model.CreateChatCompletionStream(ctx, req)
if err != nil {
handler.OnError(err)
return err
}
handler.OnStart()
var fullContent strings.Builder
var toolCalls []ToolCall
defer func() {
// In case you need to close the stream
_ = stream.Close()
}()
for {
chunk, err := stream.Recv() // however you read from your streaming LLM
if err != nil {
if isEOF(err) {
// Done reading
break
}
handler.OnError(err)
return err
}
// // Usually the chunk includes tokens. For example:
for _, c := range chunk.Choices {
// If there's a partial delta (like with OpenAI's usage of .Delta)
if len(c.Message.Content) > 0 {
handler.OnToken(c.Message.Content)
fullContent.WriteString(c.Message.Content)
}
if len(c.Message.ToolCalls) > 0 {
toolCalls = append(toolCalls, c.Message.ToolCalls...)
}
// If there's a tool call signaled
if c.FinishReason == FinishReasonToolCalls {
// pass in your llm.ToolCall
lastToolCall := toolCalls[len(toolCalls)-1]
handler.OnToolCall(lastToolCall)
}
// If there's a final completion event
if c.FinishReason != FinishReasonNull {
// We got the final message, call OnComplete with the final message
msg := OutputMessage{
Role: "assistant",
Content: fullContent.String(),
ToolCalls: toolCalls,
}
handler.OnComplete(msg)
return nil
}
}
}
return nil
}
func isEOF(err error) bool {
return err == io.EOF
}