Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup logging in the request scheduling path #583

Merged
merged 1 commit into from
Mar 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/epp/backend/metrics/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometh
return time.Since(pm.GetMetrics().UpdateTime) > metricsValidityPeriod
})
s := fmt.Sprintf("Current Pods and metrics gathered. Fresh metrics: %+v, Stale metrics: %+v", podsWithFreshMetrics, podsWithStaleMetrics)
logger.Info(s)
logger.V(logutil.VERBOSE).Info(s)
}
}
}()
Expand All @@ -89,7 +89,7 @@ func flushPrometheusMetricsOnce(logger logr.Logger, datastore Datastore) {
pool, err := datastore.PoolGet()
if err != nil {
// No inference pool or not initialize.
logger.V(logutil.VERBOSE).Info("pool is not initialized, skipping flushing metrics")
logger.V(logutil.DEFAULT).Info("pool is not initialized, skipping flushing metrics")
return
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/epp/handlers/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (s *Server) HandleStreaming(
) error {
responseText := string(body.ResponseBody.Body)
if strings.Contains(responseText, streamingEndMsg) {
parsedResp := ParseRespForUsage(ctx, responseText, loggerVerbose)
parsedResp := ParseRespForUsage(ctx, responseText)
reqCtx.Usage = parsedResp.Usage
}

Expand Down Expand Up @@ -230,7 +230,6 @@ func (s *Server) HandleStreaming(
func ParseRespForUsage(
ctx context.Context,
responseText string,
loggerVerbose logr.Logger,
) Response {
response := Response{}

Expand All @@ -246,7 +245,8 @@ func ParseRespForUsage(

byteSlice := []byte(content)
if err := json.Unmarshal(byteSlice, &response); err != nil {
loggerVerbose.Error(err, "unmarshaling response body")
logger := log.FromContext(ctx)
logger.V(logutil.DEFAULT).Error(err, "unmarshaling response body")
continue
}
}
Expand Down
71 changes: 28 additions & 43 deletions pkg/epp/handlers/streamingserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ type StreamingServer struct {
func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
ctx := srv.Context()
logger := log.FromContext(ctx)
loggerVerbose := logger.V(logutil.VERBOSE)
loggerVerbose.Info("Processing")
loggerTrace := logger.V(logutil.TRACE)
loggerTrace.Info("Processing")

// Create request context to share states during life time of an HTTP request.
// See https://github.com/envoyproxy/envoy/issues/17540.
Expand Down Expand Up @@ -103,21 +103,21 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
if recvErr != nil {
// This error occurs very frequently, though it doesn't seem to have any impact.
// TODO Figure out if we can remove this noise.
loggerVerbose.Error(err, "Cannot receive stream request")
logger.V(logutil.DEFAULT).Error(err, "Cannot receive stream request")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we bumping this up to be a tad more noisy so we drive towards resolution?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is what I am thinking, is this error expected to happen frequently? what triggers it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think I support that.

I think @liu-cong originally made that comment. I remember sifting through the libraries we are calling and I ended up in the grpc codebase with inconclusive answers.

Either way we should get some visibility into this to resolve +1

return status.Errorf(codes.Unknown, "cannot receive stream request: %v", err)
}

switch v := req.Request.(type) {
case *extProcPb.ProcessingRequest_RequestHeaders:
err = s.HandleRequestHeaders(ctx, reqCtx, v)
case *extProcPb.ProcessingRequest_RequestBody:
loggerVerbose.Info("Incoming body chunk", "body", string(v.RequestBody.Body), "EoS", v.RequestBody.EndOfStream)
loggerTrace.Info("Incoming body chunk", "EoS", v.RequestBody.EndOfStream)
// In the stream case, we can receive multiple request bodies.
body = append(body, v.RequestBody.Body...)

// Message is buffered, we can read and decode.
if v.RequestBody.EndOfStream {
loggerVerbose.Info("decoding")
loggerTrace.Info("decoding")
err = json.Unmarshal(body, &requestBody)
if err != nil {
logger.V(logutil.DEFAULT).Error(err, "Error unmarshaling request body")
Expand All @@ -133,22 +133,19 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
metrics.RecordRequestCounter(reqCtx.Model, reqCtx.ResolvedTargetModel)
metrics.RecordRequestSizes(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestSize)
}
loggerVerbose.Info("Request context after HandleRequestBody", "context", reqCtx)
}
case *extProcPb.ProcessingRequest_RequestTrailers:
// This is currently unused.
case *extProcPb.ProcessingRequest_ResponseHeaders:
loggerVerbose.Info("got response headers", "headers", v.ResponseHeaders.Headers.GetHeaders())
for _, header := range v.ResponseHeaders.Headers.GetHeaders() {
value := string(header.RawValue)

logger.V(logutil.TRACE).Info("header", "key", header.Key, "value", value)
loggerTrace.Info("header", "key", header.Key, "value", value)
if header.Key == "status" && value != "200" {
reqCtx.ResponseStatusCode = errutil.ModelServerError
} else if header.Key == "content-type" && strings.Contains(value, "text/event-stream") {
reqCtx.modelServerStreaming = true
loggerVerbose.Info("model server is streaming response")
logger.Error(nil, "made it here")
loggerTrace.Info("model server is streaming response")
}
}
reqCtx.RequestState = ResponseRecieved
Expand Down Expand Up @@ -179,7 +176,7 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
responseText := string(v.ResponseBody.Body)
s.HandleResponseBodyModelStreaming(ctx, reqCtx, responseText)
if v.ResponseBody.EndOfStream {
loggerVerbose.Info("streaming is completed")
loggerTrace.Info("stream completed")

reqCtx.ResponseCompleteTimestamp = time.Now()
metrics.RecordRequestLatencies(ctx, reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestReceivedTimestamp, reqCtx.ResponseCompleteTimestamp)
Expand Down Expand Up @@ -207,6 +204,7 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)

// Message is buffered, we can read and decode.
if v.ResponseBody.EndOfStream {
loggerTrace.Info("stream completed")
// Don't send a 500 on a response error. Just let the message passthrough and log our error for debugging purposes.
// We assume the body is valid JSON, err messages are not guaranteed to be json, and so capturing and sending a 500 obfuscates the response message.
// using the standard 'err' var will send an immediate error response back to the caller.
Expand All @@ -226,7 +224,6 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Usage.PromptTokens)
metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Usage.CompletionTokens)
}
loggerVerbose.Info("Request context after HandleResponseBody", "context", reqCtx)
}
}
case *extProcPb.ProcessingRequest_ResponseTrailers:
Expand All @@ -246,27 +243,28 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
}
return nil
}
loggerVerbose.Info("checking", "request state", reqCtx.RequestState)
if err := reqCtx.updateStateAndSendIfNeeded(srv, loggerVerbose); err != nil {
loggerTrace.Info("checking", "request state", reqCtx.RequestState)
if err := reqCtx.updateStateAndSendIfNeeded(srv, logger); err != nil {
return err
}
}
}

// updateStateAndSendIfNeeded checks state and can send mutiple responses in a single pass, but only if ordered properly.
// Order of requests matter in FULL_DUPLEX_STREAMING. For both request and response, the order of response sent back MUST be: Header->Body->Trailer, with trailer being optional.
func (r *RequestContext) updateStateAndSendIfNeeded(srv extProcPb.ExternalProcessor_ProcessServer, loggerVerbose logr.Logger) error {
func (r *RequestContext) updateStateAndSendIfNeeded(srv extProcPb.ExternalProcessor_ProcessServer, logger logr.Logger) error {
loggerTrace := logger.V(logutil.TRACE)
// No switch statement as we could send multiple responses in one pass.
if r.RequestState == RequestReceived && r.reqHeaderResp != nil {
loggerVerbose.Info("Request header response", "obj", r.reqHeaderResp)
loggerTrace.Info("Sending request header response", "obj", r.reqHeaderResp)
if err := srv.Send(r.reqHeaderResp); err != nil {
loggerVerbose.Error(err, "error sending response")
logger.V(logutil.DEFAULT).Error(err, "error sending response")
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
}
r.RequestState = HeaderRequestResponseComplete
}
if r.RequestState == HeaderRequestResponseComplete && r.reqBodyResp != nil {
loggerVerbose.Info("Request body response", "obj", r.reqBodyResp)
loggerTrace.Info("Sending request body response")
if err := srv.Send(r.reqBodyResp); err != nil {
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
}
Expand All @@ -281,14 +279,14 @@ func (r *RequestContext) updateStateAndSendIfNeeded(srv extProcPb.ExternalProces
}
}
if r.RequestState == ResponseRecieved && r.respHeaderResp != nil {
loggerVerbose.Info("Response header response", "obj", r.respHeaderResp)
loggerTrace.Info("Sending response header response", "obj", r.respHeaderResp)
if err := srv.Send(r.respHeaderResp); err != nil {
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
}
r.RequestState = HeaderResponseResponseComplete
}
if r.RequestState == HeaderResponseResponseComplete && r.respBodyResp != nil {
loggerVerbose.Info("Response body response", "obj", r.respBodyResp)
loggerTrace.Info("Sending response body response")
if err := srv.Send(r.respBodyResp); err != nil {
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
}
Expand All @@ -298,7 +296,7 @@ func (r *RequestContext) updateStateAndSendIfNeeded(srv extProcPb.ExternalProces
r.RequestState = BodyResponseResponsesComplete
}
// Dump the response so a new stream message can begin
r.reqBodyResp = nil
r.respBodyResp = nil
}
if r.RequestState == BodyResponseResponsesComplete && r.respTrailerResp != nil {
// Trailers in requests are not guaranteed
Expand All @@ -318,15 +316,13 @@ func (s *StreamingServer) HandleRequestBody(
) (*RequestContext, error) {
var requestBodyBytes []byte
logger := log.FromContext(ctx)
loggerVerbose := logger.V(logutil.VERBOSE)
loggerVerbose.Info("Handling request body")

// Resolve target models.
model, ok := requestBodyMap["model"].(string)
if !ok {
return reqCtx, errutil.Error{Code: errutil.BadRequest, Msg: "model not found in request"}
}
loggerVerbose.Info("Model requested", "model", model)

modelName := model

// NOTE: The nil checking for the modelObject means that we DO allow passthrough currently.
Expand All @@ -347,7 +343,7 @@ func (s *StreamingServer) HandleRequestBody(
ResolvedTargetModel: modelName,
Critical: datastore.IsCritical(modelObj),
}
loggerVerbose.Info("LLM request assembled", "request", llmReq)
logger.V(logutil.DEBUG).Info("LLM request assembled", "model", llmReq.Model, "targetModel", llmReq.ResolvedTargetModel, "critical", llmReq.Critical)

var err error
// Update target models in the body.
Expand All @@ -360,7 +356,6 @@ func (s *StreamingServer) HandleRequestBody(
logger.V(logutil.DEFAULT).Error(err, "Error marshaling request body")
return reqCtx, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("error marshaling request body: %v", err)}
}
loggerVerbose.Info("Updated request body marshalled", "body", string(requestBodyBytes))

target, err := s.scheduler.Schedule(ctx, llmReq)
if err != nil {
Expand All @@ -377,15 +372,16 @@ func (s *StreamingServer) HandleRequestBody(
endpoint := targetPod.Address + ":" + strconv.Itoa(int(pool.Spec.TargetPortNumber))

logger.V(logutil.DEFAULT).Info("Request handled",
"model", llmReq.Model, "targetModel", llmReq.ResolvedTargetModel, "endpoint", targetPod)
"model", llmReq.Model, "targetModel", llmReq.ResolvedTargetModel, "endpoint", targetPod, "endpoint metrics",
fmt.Sprintf("%+v", target))

reqCtx.Model = llmReq.Model
reqCtx.ResolvedTargetModel = llmReq.ResolvedTargetModel
reqCtx.RequestSize = len(requestBodyBytes)
reqCtx.TargetPod = targetPod.NamespacedName.String()
reqCtx.TargetEndpoint = endpoint

s.populateRequestHeaderResponse(ctx, reqCtx, endpoint, len(requestBodyBytes))
s.populateRequestHeaderResponse(reqCtx, endpoint, len(requestBodyBytes))

reqCtx.reqBodyResp = &extProcPb.ProcessingResponse{
// The Endpoint Picker supports two approaches to communicating the target endpoint, as a request header
Expand Down Expand Up @@ -416,8 +412,6 @@ func (s *StreamingServer) HandleResponseBody(
response map[string]interface{},
) (*RequestContext, error) {
logger := log.FromContext(ctx)
loggerVerbose := logger.V(logutil.VERBOSE)
loggerVerbose.Info("Processing HandleResponseBody")
responseBytes, err := json.Marshal(response)
if err != nil {
logger.V(logutil.DEFAULT).Error(err, "error marshalling responseBody")
Expand All @@ -431,7 +425,7 @@ func (s *StreamingServer) HandleResponseBody(
TotalTokens: int(usg["total_tokens"].(float64)),
}
reqCtx.Usage = usage
loggerVerbose.Info("Response generated", "usage", reqCtx.Usage)
logger.V(logutil.VERBOSE).Info("Response generated", "usage", reqCtx.Usage)
}
reqCtx.ResponseSize = len(responseBytes)
// ResponseComplete is to indicate the response is complete. In non-streaming
Expand Down Expand Up @@ -469,12 +463,8 @@ func (s *StreamingServer) HandleResponseBodyModelStreaming(
reqCtx *RequestContext,
responseText string,
) {
logger := log.FromContext(ctx)
loggerVerbose := logger.V(logutil.VERBOSE)
loggerVerbose.Info("Processing HandleResponseBody")

if strings.Contains(responseText, streamingEndMsg) {
resp := ParseRespForUsage(ctx, responseText, loggerVerbose)
resp := ParseRespForUsage(ctx, responseText)
metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, resp.Usage.PromptTokens)
metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, resp.Usage.CompletionTokens)
}
Expand All @@ -495,13 +485,12 @@ func (s *StreamingServer) HandleRequestHeaders(ctx context.Context, reqCtx *Requ
return err
}
endpoint := pod.Address + ":" + strconv.Itoa(int(pool.Spec.TargetPortNumber))
s.populateRequestHeaderResponse(ctx, reqCtx, endpoint, 0)
s.populateRequestHeaderResponse(reqCtx, endpoint, 0)
}
return nil
}

func (s *StreamingServer) populateRequestHeaderResponse(ctx context.Context, reqCtx *RequestContext, endpoint string, requestBodyLength int) {
logger := log.FromContext(ctx)
func (s *StreamingServer) populateRequestHeaderResponse(reqCtx *RequestContext, endpoint string, requestBodyLength int) {
headers := []*configPb.HeaderValueOption{
{
Header: &configPb.HeaderValue{
Expand All @@ -520,10 +509,6 @@ func (s *StreamingServer) populateRequestHeaderResponse(ctx context.Context, req
},
})
}
// Print headers for debugging
for _, header := range headers {
logger.V(logutil.DEBUG).Info("Request body header", "key", header.Header.Key, "value", header.Header.RawValue)
}

targetEndpointValue := &structpb.Struct{
Fields: map[string]*structpb.Value{
Expand Down
4 changes: 2 additions & 2 deletions pkg/epp/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,13 @@ func (s *Scheduler) Schedule(ctx context.Context, req *LLMRequest) (targetPod ba
logger := log.FromContext(ctx).WithValues("request", req)
podMetrics := s.datastore.PodGetAll()

logger.V(logutil.VERBOSE).Info(fmt.Sprintf("Scheduling a request. Metrics: %+v", podMetrics))
logger.V(logutil.DEBUG).Info(fmt.Sprintf("Scheduling a request. Metrics: %+v", podMetrics))
pods, err := s.filter.Filter(logger, req, podMetrics)
if err != nil || len(pods) == 0 {
return nil, fmt.Errorf(
"failed to apply filter, resulted %v pods, this should never happen: %w", len(pods), err)
}
logger.V(logutil.VERBOSE).Info(fmt.Sprintf("Selecting a random pod from %d candidates: %+v", len(pods), pods))
logger.V(logutil.DEBUG).Info(fmt.Sprintf("Selecting a random pod from %d candidates: %+v", len(pods), pods))
i := rand.Intn(len(pods))
return pods[i], nil
}