Skip to content

Commit d74d327

Browse files
authored
bugfix: cannot parse content if one streaming body has multi chunks (#1606)
1 parent be27726 commit d74d327

File tree

3 files changed

+82
-81
lines changed

3 files changed

+82
-81
lines changed

plugins/wasm-go/extensions/ai-cache/util.go

+45-42
Original file line numberDiff line numberDiff line change
@@ -101,55 +101,58 @@ func processStreamLastChunk(ctx wrapper.HttpContext, c config.PluginConfig, chun
101101
}
102102

103103
func processSSEMessage(ctx wrapper.HttpContext, c config.PluginConfig, sseMessage string, log wrapper.Log) (string, error) {
104-
subMessages := strings.Split(sseMessage, "\n")
105-
var message string
106-
for _, msg := range subMessages {
107-
if strings.HasPrefix(msg, "data:") {
108-
message = msg
109-
break
104+
content := ""
105+
for _, chunk := range strings.Split(sseMessage, "\n\n") {
106+
log.Infof("chunk _ : %s", chunk)
107+
subMessages := strings.Split(chunk, "\n")
108+
var message string
109+
for _, msg := range subMessages {
110+
if strings.HasPrefix(msg, "data:") {
111+
message = msg
112+
break
113+
}
114+
}
115+
if len(message) < 6 {
116+
return content, fmt.Errorf("[processSSEMessage] invalid message: %s", message)
110117
}
111-
}
112-
if len(message) < 6 {
113-
return "", fmt.Errorf("[processSSEMessage] invalid message: %s", message)
114-
}
115118

116-
// skip the prefix "data:"
117-
bodyJson := message[5:]
119+
// skip the prefix "data:"
120+
bodyJson := message[5:]
118121

119-
if strings.TrimSpace(bodyJson) == "[DONE]" {
120-
return "", nil
121-
}
122+
if strings.TrimSpace(bodyJson) == "[DONE]" {
123+
return content, nil
124+
}
122125

123-
// Extract values from JSON fields
124-
responseBody := gjson.Get(bodyJson, c.CacheStreamValueFrom)
125-
toolCalls := gjson.Get(bodyJson, c.CacheToolCallsFrom)
126+
// Extract values from JSON fields
127+
responseBody := gjson.Get(bodyJson, c.CacheStreamValueFrom)
128+
toolCalls := gjson.Get(bodyJson, c.CacheToolCallsFrom)
126129

127-
if toolCalls.Exists() {
128-
// TODO: Temporarily store the tool_calls value in the context for processing
129-
ctx.SetContext(TOOL_CALLS_CONTEXT_KEY, toolCalls.String())
130-
}
131-
132-
// Check if the ResponseBody field exists
133-
if !responseBody.Exists() {
134-
if ctx.GetContext(CACHE_CONTENT_CONTEXT_KEY) != nil {
135-
log.Debugf("[processSSEMessage] unable to extract content from message; cache content is not nil: %s", message)
136-
return "", nil
130+
if toolCalls.Exists() {
131+
// TODO: Temporarily store the tool_calls value in the context for processing
132+
ctx.SetContext(TOOL_CALLS_CONTEXT_KEY, toolCalls.String())
137133
}
138-
return "", fmt.Errorf("[processSSEMessage] unable to extract content from message; cache content is nil: %s", message)
139-
} else {
140-
tempContentI := ctx.GetContext(CACHE_CONTENT_CONTEXT_KEY)
141134

142-
// If there is no content in the cache, initialize and set the content
143-
if tempContentI == nil {
144-
content := responseBody.String()
145-
ctx.SetContext(CACHE_CONTENT_CONTEXT_KEY, content)
146-
return content, nil
147-
}
135+
// Check if the ResponseBody field exists
136+
if !responseBody.Exists() {
137+
if ctx.GetContext(CACHE_CONTENT_CONTEXT_KEY) != nil {
138+
log.Debugf("[processSSEMessage] unable to extract content from message; cache content is not nil: %s", message)
139+
return content, nil
140+
}
141+
return content, fmt.Errorf("[processSSEMessage] unable to extract content from message; cache content is nil: %s", message)
142+
} else {
143+
tempContentI := ctx.GetContext(CACHE_CONTENT_CONTEXT_KEY)
148144

149-
// Update the content in the cache
150-
appendMsg := responseBody.String()
151-
content := tempContentI.(string) + appendMsg
152-
ctx.SetContext(CACHE_CONTENT_CONTEXT_KEY, content)
153-
return content, nil
145+
// If there is no content in the cache, initialize and set the content
146+
if tempContentI == nil {
147+
content = responseBody.String()
148+
ctx.SetContext(CACHE_CONTENT_CONTEXT_KEY, content)
149+
} else {
150+
// Update the content in the cache
151+
appendMsg := responseBody.String()
152+
content = tempContentI.(string) + appendMsg
153+
ctx.SetContext(CACHE_CONTENT_CONTEXT_KEY, content)
154+
}
155+
}
154156
}
157+
return content, nil
155158
}

plugins/wasm-go/extensions/ai-history/go.sum

+2-4
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,13 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
33
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
44
github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520 h1:IHDghbGQ2DTIXHBHxWfqCYQW1fKjyJ/I7W1pMyUDeEA=
55
github.com/higress-group/nottinygc v0.0.0-20231101025119-e93c4c2f8520/go.mod h1:Nz8ORLaFiLWotg6GeKlJMhv8cci8mM43uEnLA5t8iew=
6-
github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240711023527-ba358c48772f h1:ZIiIBRvIw62gA5MJhuwp1+2wWbqL9IGElQ499rUsYYg=
7-
github.com/higress-group/proxy-wasm-go-sdk v0.0.0-20240711023527-ba358c48772f/go.mod h1:hNFjhrLUIq+kJ9bOcs8QtiplSQ61GZXtd2xHKx4BYRo=
6+
github.com/higress-group/proxy-wasm-go-sdk v1.0.0 h1:BZRNf4R7jr9hwRivg/E29nkVaKEak5MWjBDhWjuHijU=
87
github.com/higress-group/proxy-wasm-go-sdk v1.0.0/go.mod h1:iiSyFbo+rAtbtGt/bsefv8GU57h9CCLYGJA74/tF5/0=
98
github.com/magefile/mage v1.14.0 h1:6QDX3g6z1YvJ4olPhT1wksUcSa/V0a1B+pJb73fBjyo=
109
github.com/magefile/mage v1.14.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=
1110
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
1211
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
13-
github.com/tidwall/gjson v1.14.3 h1:9jvXn7olKEHU1S9vwoMGliaT8jq1vJ7IH/n9zD9Dnlw=
14-
github.com/tidwall/gjson v1.14.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
12+
github.com/tidwall/gjson v1.17.3 h1:bwWLZU7icoKRG+C+0PNwIKC6FCJO/Q3p2pZvuP0jN94=
1513
github.com/tidwall/gjson v1.17.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
1614
github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
1715
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=

plugins/wasm-go/extensions/ai-history/main.go

+35-35
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,12 @@ func onHttpRequestBody(ctx wrapper.HttpContext, config PluginConfig, body []byte
194194
ctx.SetContext(StreamContextKey, struct{}{})
195195
}
196196
identityKey := ctx.GetStringContext(IdentityKey, "")
197+
question := TrimQuote(bodyJson.Get(config.QuestionFrom.RequestBody).String())
198+
if question == "" {
199+
log.Debug("parse question from request body failed")
200+
return types.ActionContinue
201+
}
202+
ctx.SetContext(QuestionContextKey, question)
197203
err := config.redisClient.Get(config.CacheKeyPrefix+identityKey, func(response resp.Value) {
198204
if err := response.Error(); err != nil {
199205
log.Errorf("redis get failed, err:%v", err)
@@ -230,13 +236,6 @@ func onHttpRequestBody(ctx wrapper.HttpContext, config PluginConfig, body []byte
230236
_ = proxywasm.SendHttpResponseWithDetail(200, "OK", [][2]string{{"content-type", "application/json; charset=utf-8"}}, res, -1)
231237
return
232238
}
233-
question := TrimQuote(bodyJson.Get(config.QuestionFrom.RequestBody).String())
234-
if question == "" {
235-
log.Debug("parse question from request body failed")
236-
_ = proxywasm.ResumeHttpRequest()
237-
return
238-
}
239-
ctx.SetContext(QuestionContextKey, question)
240239
fillHistoryCnt := getIntQueryParameter("fill_history_cnt", path, config.FillHistoryCnt) * 2
241240
currJson := bodyJson.Get("messages").String()
242241
var currMessage []ChatHistory
@@ -317,38 +316,39 @@ func getIntQueryParameter(name string, path string, defaultValue int) int {
317316
}
318317

319318
func processSSEMessage(ctx wrapper.HttpContext, config PluginConfig, sseMessage string, log wrapper.Log) string {
320-
subMessages := strings.Split(sseMessage, "\n")
321-
var message string
322-
for _, msg := range subMessages {
323-
if strings.HasPrefix(msg, "data:") {
324-
message = msg
325-
break
319+
content := ""
320+
for _, chunk := range strings.Split(sseMessage, "\n\n") {
321+
subMessages := strings.Split(chunk, "\n")
322+
var message string
323+
for _, msg := range subMessages {
324+
if strings.HasPrefix(msg, "data:") {
325+
message = msg
326+
break
327+
}
326328
}
327-
}
328-
if len(message) < 6 {
329-
log.Errorf("invalid message:%s", message)
330-
return ""
331-
}
332-
// skip the prefix "data:"
333-
bodyJson := message[5:]
334-
if gjson.Get(bodyJson, config.AnswerStreamValueFrom.ResponseBody).Exists() {
335-
tempContentI := ctx.GetContext(AnswerContentContextKey)
336-
if tempContentI == nil {
337-
content := TrimQuote(gjson.Get(bodyJson, config.AnswerStreamValueFrom.ResponseBody).Raw)
338-
ctx.SetContext(AnswerContentContextKey, content)
329+
if len(message) < 6 {
330+
log.Errorf("invalid message:%s", message)
339331
return content
340332
}
341-
append := TrimQuote(gjson.Get(bodyJson, config.AnswerStreamValueFrom.ResponseBody).Raw)
342-
content := tempContentI.(string) + append
343-
ctx.SetContext(AnswerContentContextKey, content)
344-
return content
345-
} else if gjson.Get(bodyJson, "choices.0.delta.content.tool_calls").Exists() {
346-
// TODO: compatible with other providers
347-
ctx.SetContext(ToolCallsContextKey, struct{}{})
348-
return ""
333+
// skip the prefix "data:"
334+
bodyJson := message[5:]
335+
if gjson.Get(bodyJson, config.AnswerStreamValueFrom.ResponseBody).Exists() {
336+
tempContentI := ctx.GetContext(AnswerContentContextKey)
337+
if tempContentI == nil {
338+
content = TrimQuote(gjson.Get(bodyJson, config.AnswerStreamValueFrom.ResponseBody).Raw)
339+
ctx.SetContext(AnswerContentContextKey, content)
340+
} else {
341+
append := TrimQuote(gjson.Get(bodyJson, config.AnswerStreamValueFrom.ResponseBody).Raw)
342+
content = tempContentI.(string) + append
343+
ctx.SetContext(AnswerContentContextKey, content)
344+
}
345+
} else if gjson.Get(bodyJson, "choices.0.delta.content.tool_calls").Exists() {
346+
// TODO: compatible with other providers
347+
ctx.SetContext(ToolCallsContextKey, struct{}{})
348+
}
349+
log.Debugf("unknown message:%s", bodyJson)
349350
}
350-
log.Debugf("unknown message:%s", bodyJson)
351-
return ""
351+
return content
352352
}
353353

354354
func onHttpResponseHeaders(ctx wrapper.HttpContext, config PluginConfig, log wrapper.Log) types.Action {

0 commit comments

Comments
 (0)