diff --git a/pkg/epp/handlers/response.go b/pkg/epp/handlers/response.go index f9396acf..44ea6d6a 100644 --- a/pkg/epp/handlers/response.go +++ b/pkg/epp/handlers/response.go @@ -20,9 +20,11 @@ import ( "context" "encoding/json" "fmt" + "strings" configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "github.com/go-logr/logr" "sigs.k8s.io/controller-runtime/pkg/log" errutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/error" logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" @@ -67,11 +69,25 @@ func (s *Server) HandleResponseHeaders( // } // } for _, header := range h.ResponseHeaders.Headers.GetHeaders() { + var statusFound, typeFound bool if header.Key == "status" { code := header.RawValue[0] if string(code) != "200" { reqCtx.ResponseStatusCode = errutil.ModelServerError + statusFound = true } + } + if header.Key == "content-type" { + contentType := header.RawValue + if strings.Contains(string(contentType), "text/event-stream") { + reqCtx.Streaming = true + } else { + reqCtx.Streaming = false + } + typeFound = true + } + + if statusFound && typeFound { break } } @@ -132,22 +148,19 @@ func (s *Server) HandleResponseBody( ) (*extProcPb.ProcessingResponse, error) { logger := log.FromContext(ctx) loggerVerbose := logger.V(logutil.VERBOSE) - loggerVerbose.Info("Processing HandleResponseBody") body := req.Request.(*extProcPb.ProcessingRequest_ResponseBody) - res := Response{} - if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil { - return nil, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)} + if reqCtx.Streaming { + logger.V(logutil.DEBUG).Info("Processing HandleResponseBody") + if err := s.HandleStreaming(ctx, reqCtx, body, loggerVerbose); err != nil { + return nil, err + } + } else { + loggerVerbose.Info("Processing HandleResponseBody") + if err := s.HandleNonStreaming(ctx, reqCtx, body, loggerVerbose); err != nil { + return nil, err + } } - reqCtx.Response = res - reqCtx.ResponseSize = len(body.ResponseBody.Body) - // ResponseComplete is to indicate the response is complete. In non-streaming - // case, it will be set to be true once the response is processed; in - // streaming case, it will be set to be true once the last chunk is processed. - // TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178) - // will add the processing for streaming case. - reqCtx.ResponseComplete = true - loggerVerbose.Info("Response generated", "response", res) resp := &extProcPb.ProcessingResponse{ Response: &extProcPb.ProcessingResponse_ResponseBody{ @@ -159,6 +172,76 @@ func (s *Server) HandleResponseBody( return resp, nil } +func (s *Server) HandleNonStreaming( + ctx context.Context, + reqCtx *RequestContext, + body *extProcPb.ProcessingRequest_ResponseBody, + loggerVerbose logr.Logger, +) error { + loggerVerbose.Info("Processing HandleResponseBody") + + res := Response{} + if err := json.Unmarshal(body.ResponseBody.Body, &res); err != nil { + return errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("unmarshaling response body: %v", err)} + } + reqCtx.Response = res + reqCtx.ResponseSize = len(body.ResponseBody.Body) + reqCtx.ResponseComplete = true + loggerVerbose.Info("Response generated", "response", res) + return nil +} + +func (s *Server) HandleStreaming( + ctx context.Context, + reqCtx *RequestContext, + body *extProcPb.ProcessingRequest_ResponseBody, + loggerVerbose logr.Logger, +) error { + respPrefix := "data: " + responseText := string(body.ResponseBody.Body) + // Example message if "stream_options": {"include_usage": "true"} is included in the request: + // data: {"id":"...","object":"text_completion","created":1739400043,"model":"tweet-summary-0","choices":[], + // "usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}} + // + // data: [DONE] + // + // Noticed that vLLM returns two entries in one response. + // We need to strip the `data:` prefix and next Data: [DONE] from the message to fetch response data. + // + // If include_usage is not included in the request, `data: [DONE]` is returned separately, which + // indicates end of streaming. + if strings.Contains(responseText, "data: [DONE]") { + response := Response{} + + lines := strings.Split(responseText, "\n") + for _, line := range lines { + if !strings.HasPrefix(line, respPrefix) { + continue + } + content := strings.TrimPrefix(line, respPrefix) + if content == "[DONE]" { + continue + } + + byteSlice := []byte(content) + if err := json.Unmarshal(byteSlice, &response); err != nil { + loggerVerbose.Error(err, "unmarshaling response body") + continue + } + } + reqCtx.Response = response + } + + if body.ResponseBody.EndOfStream { + loggerVerbose.Info("Streaming is completed") + reqCtx.ResponseComplete = true + } else { + reqCtx.ResponseSize += len(body.ResponseBody.Body) + } + + return nil +} + type Response struct { Usage Usage `json:"usage"` } diff --git a/pkg/epp/handlers/response_test.go b/pkg/epp/handlers/response_test.go index 01f02d09..8b6f16a7 100644 --- a/pkg/epp/handlers/response_test.go +++ b/pkg/epp/handlers/response_test.go @@ -49,6 +49,13 @@ const ( } } ` + + streamingBodyWithoutUsage = `data: {"id":"cmpl-41764c93-f9d2-4f31-be08-3ba04fa25394","object":"text_completion","created":1740002445,"model":"tweet-summary-0","choices":[],"usage":null} + ` + + streamingBodyWithUsage = `data: {"id":"cmpl-41764c93-f9d2-4f31-be08-3ba04fa25394","object":"text_completion","created":1740002445,"model":"tweet-summary-0","choices":[],"usage":{"prompt_tokens":7,"total_tokens":17,"completion_tokens":10}} +data: [DONE] + ` ) func TestHandleResponseBody(t *testing.T) { @@ -57,6 +64,7 @@ func TestHandleResponseBody(t *testing.T) { tests := []struct { name string req *extProcPb.ProcessingRequest_ResponseBody + reqCtx *RequestContext want Response wantErr bool }{ @@ -84,12 +92,47 @@ func TestHandleResponseBody(t *testing.T) { }, wantErr: true, }, + { + name: "streaming request without usage", + req: &extProcPb.ProcessingRequest_ResponseBody{ + ResponseBody: &extProcPb.HttpBody{ + Body: []byte(streamingBodyWithoutUsage), + }, + }, + reqCtx: &RequestContext{ + Streaming: true, + }, + wantErr: false, + // In the middle of streaming response, so request context response is not set yet. + }, + { + name: "streaming request with usage", + req: &extProcPb.ProcessingRequest_ResponseBody{ + ResponseBody: &extProcPb.HttpBody{ + Body: []byte(streamingBodyWithUsage), + }, + }, + reqCtx: &RequestContext{ + Streaming: true, + }, + wantErr: false, + want: Response{ + Usage: Usage{ + PromptTokens: 7, + TotalTokens: 17, + CompletionTokens: 10, + }, + }, + }, } for _, test := range tests { t.Run(test.name, func(t *testing.T) { server := &Server{} - reqCtx := &RequestContext{} + reqCtx := test.reqCtx + if reqCtx == nil { + reqCtx = &RequestContext{} + } _, err := server.HandleResponseBody(ctx, reqCtx, &extProcPb.ProcessingRequest{Request: test.req}) if err != nil { if !test.wantErr { diff --git a/pkg/epp/handlers/server.go b/pkg/epp/handlers/server.go index bbdbe83e..93d099b9 100644 --- a/pkg/epp/handlers/server.go +++ b/pkg/epp/handlers/server.go @@ -124,7 +124,11 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.PromptTokens) metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.CompletionTokens) } - loggerVerbose.Info("Request context after HandleResponseBody", "context", reqCtx) + if reqCtx.Streaming { + logger.V(logutil.DEBUG).Info("Request context after HandleResponseBody", "context", reqCtx) + } else { + loggerVerbose.Info("Request context after HandleResponseBody", "context", reqCtx) + } default: logger.V(logutil.DEFAULT).Error(nil, "Unknown Request type", "request", v) return status.Error(codes.Unknown, "unknown request type") @@ -138,7 +142,11 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { } } - loggerVerbose.Info("Response generated", "response", resp) + if !reqCtx.Streaming { + loggerVerbose.Info("Response generated", "response", resp) + } else { + logger.V(logutil.DEBUG).Info("Response generated", "response", resp) + } if err := srv.Send(resp); err != nil { logger.V(logutil.DEFAULT).Error(err, "Send failed") return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err) @@ -213,4 +221,5 @@ type RequestContext struct { ResponseSize int ResponseComplete bool ResponseStatusCode string + Streaming bool } diff --git a/site-src/guides/metrics.md b/site-src/guides/metrics.md index f793734d..a904145d 100644 --- a/site-src/guides/metrics.md +++ b/site-src/guides/metrics.md @@ -4,14 +4,7 @@ This guide describes the current state of exposed metrics and how to scrape them ## Requirements -Response metrics are only supported in non-streaming mode, with the follow up [issue](https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178) to address streaming mode. - -Currently there are two options: -- If requests don't use response streaming, then you can enable `Buffered` mode for response in `EnvoyExtensionPolicy`, this will buffer the response body at the proxy and forward it to the endpoint picker, which allows the endpoint picker to report response metrics. - -- If requests use response streaming, then it is not recommended to enable `Buffered` mode, the response body processing mode should be left empty in the `EnvoyExtensionPolicy` (default). In this case response bodies will not be forwarded to the endpoint picker, and therefore response metrics will not be reported. - - +To have response metrics, set the body mode to `Buffered` or `Streamed`: ``` apiVersion: gateway.envoyproxy.io/v1alpha1 kind: EnvoyExtensionPolicy @@ -32,6 +25,19 @@ spec: body: Buffered ``` +If you want to include usage metrics for vLLM model server streaming request, send the request with `include_usage`: + +``` +curl -i ${IP}:${PORT}/v1/completions -H 'Content-Type: application/json' -d '{ +"model": "tweet-summary", +"prompt": "whats your fav movie?", +"max_tokens": 10, +"temperature": 0, +"stream": true, +"stream_options": {"include_usage": "true"} +}' +``` + ## Exposed metrics | **Metric name** | **Metric Type** |