diff --git a/pkg/epp/backend/metrics/logger.go b/pkg/epp/backend/metrics/logger.go index 74735755..8c73d488 100644 --- a/pkg/epp/backend/metrics/logger.go +++ b/pkg/epp/backend/metrics/logger.go @@ -78,7 +78,7 @@ func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometh return time.Since(pm.GetMetrics().UpdateTime) > metricsValidityPeriod }) s := fmt.Sprintf("Current Pods and metrics gathered. Fresh metrics: %+v, Stale metrics: %+v", podsWithFreshMetrics, podsWithStaleMetrics) - logger.Info(s) + logger.V(logutil.VERBOSE).Info(s) } } }() @@ -89,7 +89,7 @@ func flushPrometheusMetricsOnce(logger logr.Logger, datastore Datastore) { pool, err := datastore.PoolGet() if err != nil { // No inference pool or not initialize. - logger.V(logutil.VERBOSE).Info("pool is not initialized, skipping flushing metrics") + logger.V(logutil.DEFAULT).Info("pool is not initialized, skipping flushing metrics") return } diff --git a/pkg/epp/handlers/response.go b/pkg/epp/handlers/response.go index 79ad7a6a..cf64f4a4 100644 --- a/pkg/epp/handlers/response.go +++ b/pkg/epp/handlers/response.go @@ -202,7 +202,7 @@ func (s *Server) HandleStreaming( ) error { responseText := string(body.ResponseBody.Body) if strings.Contains(responseText, streamingEndMsg) { - parsedResp := ParseRespForUsage(ctx, responseText, loggerVerbose) + parsedResp := ParseRespForUsage(ctx, responseText) reqCtx.Usage = parsedResp.Usage } @@ -230,7 +230,6 @@ func (s *Server) HandleStreaming( func ParseRespForUsage( ctx context.Context, responseText string, - loggerVerbose logr.Logger, ) Response { response := Response{} @@ -246,7 +245,8 @@ func ParseRespForUsage( byteSlice := []byte(content) if err := json.Unmarshal(byteSlice, &response); err != nil { - loggerVerbose.Error(err, "unmarshaling response body") + logger := log.FromContext(ctx) + logger.V(logutil.DEFAULT).Error(err, "unmarshaling response body") continue } } diff --git a/pkg/epp/handlers/streamingserver.go b/pkg/epp/handlers/streamingserver.go index 64f9c03b..d704578a 100644 --- a/pkg/epp/handlers/streamingserver.go +++ b/pkg/epp/handlers/streamingserver.go @@ -65,8 +65,8 @@ type StreamingServer struct { func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { ctx := srv.Context() logger := log.FromContext(ctx) - loggerVerbose := logger.V(logutil.VERBOSE) - loggerVerbose.Info("Processing") + loggerTrace := logger.V(logutil.TRACE) + loggerTrace.Info("Processing") // Create request context to share states during life time of an HTTP request. // See https://github.com/envoyproxy/envoy/issues/17540. @@ -103,7 +103,7 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) if recvErr != nil { // This error occurs very frequently, though it doesn't seem to have any impact. // TODO Figure out if we can remove this noise. - loggerVerbose.Error(err, "Cannot receive stream request") + logger.V(logutil.DEFAULT).Error(err, "Cannot receive stream request") return status.Errorf(codes.Unknown, "cannot receive stream request: %v", err) } @@ -111,13 +111,13 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) case *extProcPb.ProcessingRequest_RequestHeaders: err = s.HandleRequestHeaders(ctx, reqCtx, v) case *extProcPb.ProcessingRequest_RequestBody: - loggerVerbose.Info("Incoming body chunk", "body", string(v.RequestBody.Body), "EoS", v.RequestBody.EndOfStream) + loggerTrace.Info("Incoming body chunk", "EoS", v.RequestBody.EndOfStream) // In the stream case, we can receive multiple request bodies. body = append(body, v.RequestBody.Body...) // Message is buffered, we can read and decode. if v.RequestBody.EndOfStream { - loggerVerbose.Info("decoding") + loggerTrace.Info("decoding") err = json.Unmarshal(body, &requestBody) if err != nil { logger.V(logutil.DEFAULT).Error(err, "Error unmarshaling request body") @@ -133,22 +133,19 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) metrics.RecordRequestCounter(reqCtx.Model, reqCtx.ResolvedTargetModel) metrics.RecordRequestSizes(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestSize) } - loggerVerbose.Info("Request context after HandleRequestBody", "context", reqCtx) } case *extProcPb.ProcessingRequest_RequestTrailers: // This is currently unused. case *extProcPb.ProcessingRequest_ResponseHeaders: - loggerVerbose.Info("got response headers", "headers", v.ResponseHeaders.Headers.GetHeaders()) for _, header := range v.ResponseHeaders.Headers.GetHeaders() { value := string(header.RawValue) - logger.V(logutil.TRACE).Info("header", "key", header.Key, "value", value) + loggerTrace.Info("header", "key", header.Key, "value", value) if header.Key == "status" && value != "200" { reqCtx.ResponseStatusCode = errutil.ModelServerError } else if header.Key == "content-type" && strings.Contains(value, "text/event-stream") { reqCtx.modelServerStreaming = true - loggerVerbose.Info("model server is streaming response") - logger.Error(nil, "made it here") + loggerTrace.Info("model server is streaming response") } } reqCtx.RequestState = ResponseRecieved @@ -179,7 +176,7 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) responseText := string(v.ResponseBody.Body) s.HandleResponseBodyModelStreaming(ctx, reqCtx, responseText) if v.ResponseBody.EndOfStream { - loggerVerbose.Info("streaming is completed") + loggerTrace.Info("stream completed") reqCtx.ResponseCompleteTimestamp = time.Now() metrics.RecordRequestLatencies(ctx, reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestReceivedTimestamp, reqCtx.ResponseCompleteTimestamp) @@ -207,6 +204,7 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) // Message is buffered, we can read and decode. if v.ResponseBody.EndOfStream { + loggerTrace.Info("stream completed") // Don't send a 500 on a response error. Just let the message passthrough and log our error for debugging purposes. // We assume the body is valid JSON, err messages are not guaranteed to be json, and so capturing and sending a 500 obfuscates the response message. // using the standard 'err' var will send an immediate error response back to the caller. @@ -226,7 +224,6 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Usage.PromptTokens) metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Usage.CompletionTokens) } - loggerVerbose.Info("Request context after HandleResponseBody", "context", reqCtx) } } case *extProcPb.ProcessingRequest_ResponseTrailers: @@ -246,8 +243,8 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) } return nil } - loggerVerbose.Info("checking", "request state", reqCtx.RequestState) - if err := reqCtx.updateStateAndSendIfNeeded(srv, loggerVerbose); err != nil { + loggerTrace.Info("checking", "request state", reqCtx.RequestState) + if err := reqCtx.updateStateAndSendIfNeeded(srv, logger); err != nil { return err } } @@ -255,18 +252,19 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) // updateStateAndSendIfNeeded checks state and can send mutiple responses in a single pass, but only if ordered properly. // Order of requests matter in FULL_DUPLEX_STREAMING. For both request and response, the order of response sent back MUST be: Header->Body->Trailer, with trailer being optional. -func (r *RequestContext) updateStateAndSendIfNeeded(srv extProcPb.ExternalProcessor_ProcessServer, loggerVerbose logr.Logger) error { +func (r *RequestContext) updateStateAndSendIfNeeded(srv extProcPb.ExternalProcessor_ProcessServer, logger logr.Logger) error { + loggerTrace := logger.V(logutil.TRACE) // No switch statement as we could send multiple responses in one pass. if r.RequestState == RequestReceived && r.reqHeaderResp != nil { - loggerVerbose.Info("Request header response", "obj", r.reqHeaderResp) + loggerTrace.Info("Sending request header response", "obj", r.reqHeaderResp) if err := srv.Send(r.reqHeaderResp); err != nil { - loggerVerbose.Error(err, "error sending response") + logger.V(logutil.DEFAULT).Error(err, "error sending response") return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err) } r.RequestState = HeaderRequestResponseComplete } if r.RequestState == HeaderRequestResponseComplete && r.reqBodyResp != nil { - loggerVerbose.Info("Request body response", "obj", r.reqBodyResp) + loggerTrace.Info("Sending request body response") if err := srv.Send(r.reqBodyResp); err != nil { return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err) } @@ -281,14 +279,14 @@ func (r *RequestContext) updateStateAndSendIfNeeded(srv extProcPb.ExternalProces } } if r.RequestState == ResponseRecieved && r.respHeaderResp != nil { - loggerVerbose.Info("Response header response", "obj", r.respHeaderResp) + loggerTrace.Info("Sending response header response", "obj", r.respHeaderResp) if err := srv.Send(r.respHeaderResp); err != nil { return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err) } r.RequestState = HeaderResponseResponseComplete } if r.RequestState == HeaderResponseResponseComplete && r.respBodyResp != nil { - loggerVerbose.Info("Response body response", "obj", r.respBodyResp) + loggerTrace.Info("Sending response body response") if err := srv.Send(r.respBodyResp); err != nil { return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err) } @@ -298,7 +296,7 @@ func (r *RequestContext) updateStateAndSendIfNeeded(srv extProcPb.ExternalProces r.RequestState = BodyResponseResponsesComplete } // Dump the response so a new stream message can begin - r.reqBodyResp = nil + r.respBodyResp = nil } if r.RequestState == BodyResponseResponsesComplete && r.respTrailerResp != nil { // Trailers in requests are not guaranteed @@ -318,15 +316,13 @@ func (s *StreamingServer) HandleRequestBody( ) (*RequestContext, error) { var requestBodyBytes []byte logger := log.FromContext(ctx) - loggerVerbose := logger.V(logutil.VERBOSE) - loggerVerbose.Info("Handling request body") // Resolve target models. model, ok := requestBodyMap["model"].(string) if !ok { return reqCtx, errutil.Error{Code: errutil.BadRequest, Msg: "model not found in request"} } - loggerVerbose.Info("Model requested", "model", model) + modelName := model // NOTE: The nil checking for the modelObject means that we DO allow passthrough currently. @@ -347,7 +343,7 @@ func (s *StreamingServer) HandleRequestBody( ResolvedTargetModel: modelName, Critical: datastore.IsCritical(modelObj), } - loggerVerbose.Info("LLM request assembled", "request", llmReq) + logger.V(logutil.DEBUG).Info("LLM request assembled", "model", llmReq.Model, "targetModel", llmReq.ResolvedTargetModel, "critical", llmReq.Critical) var err error // Update target models in the body. @@ -360,7 +356,6 @@ func (s *StreamingServer) HandleRequestBody( logger.V(logutil.DEFAULT).Error(err, "Error marshaling request body") return reqCtx, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("error marshaling request body: %v", err)} } - loggerVerbose.Info("Updated request body marshalled", "body", string(requestBodyBytes)) target, err := s.scheduler.Schedule(ctx, llmReq) if err != nil { @@ -377,7 +372,8 @@ func (s *StreamingServer) HandleRequestBody( endpoint := targetPod.Address + ":" + strconv.Itoa(int(pool.Spec.TargetPortNumber)) logger.V(logutil.DEFAULT).Info("Request handled", - "model", llmReq.Model, "targetModel", llmReq.ResolvedTargetModel, "endpoint", targetPod) + "model", llmReq.Model, "targetModel", llmReq.ResolvedTargetModel, "endpoint", targetPod, "endpoint metrics", + fmt.Sprintf("%+v", target)) reqCtx.Model = llmReq.Model reqCtx.ResolvedTargetModel = llmReq.ResolvedTargetModel @@ -385,7 +381,7 @@ func (s *StreamingServer) HandleRequestBody( reqCtx.TargetPod = targetPod.NamespacedName.String() reqCtx.TargetEndpoint = endpoint - s.populateRequestHeaderResponse(ctx, reqCtx, endpoint, len(requestBodyBytes)) + s.populateRequestHeaderResponse(reqCtx, endpoint, len(requestBodyBytes)) reqCtx.reqBodyResp = &extProcPb.ProcessingResponse{ // The Endpoint Picker supports two approaches to communicating the target endpoint, as a request header @@ -416,8 +412,6 @@ func (s *StreamingServer) HandleResponseBody( response map[string]interface{}, ) (*RequestContext, error) { logger := log.FromContext(ctx) - loggerVerbose := logger.V(logutil.VERBOSE) - loggerVerbose.Info("Processing HandleResponseBody") responseBytes, err := json.Marshal(response) if err != nil { logger.V(logutil.DEFAULT).Error(err, "error marshalling responseBody") @@ -431,7 +425,7 @@ func (s *StreamingServer) HandleResponseBody( TotalTokens: int(usg["total_tokens"].(float64)), } reqCtx.Usage = usage - loggerVerbose.Info("Response generated", "usage", reqCtx.Usage) + logger.V(logutil.VERBOSE).Info("Response generated", "usage", reqCtx.Usage) } reqCtx.ResponseSize = len(responseBytes) // ResponseComplete is to indicate the response is complete. In non-streaming @@ -469,12 +463,8 @@ func (s *StreamingServer) HandleResponseBodyModelStreaming( reqCtx *RequestContext, responseText string, ) { - logger := log.FromContext(ctx) - loggerVerbose := logger.V(logutil.VERBOSE) - loggerVerbose.Info("Processing HandleResponseBody") - if strings.Contains(responseText, streamingEndMsg) { - resp := ParseRespForUsage(ctx, responseText, loggerVerbose) + resp := ParseRespForUsage(ctx, responseText) metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, resp.Usage.PromptTokens) metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, resp.Usage.CompletionTokens) } @@ -495,13 +485,12 @@ func (s *StreamingServer) HandleRequestHeaders(ctx context.Context, reqCtx *Requ return err } endpoint := pod.Address + ":" + strconv.Itoa(int(pool.Spec.TargetPortNumber)) - s.populateRequestHeaderResponse(ctx, reqCtx, endpoint, 0) + s.populateRequestHeaderResponse(reqCtx, endpoint, 0) } return nil } -func (s *StreamingServer) populateRequestHeaderResponse(ctx context.Context, reqCtx *RequestContext, endpoint string, requestBodyLength int) { - logger := log.FromContext(ctx) +func (s *StreamingServer) populateRequestHeaderResponse(reqCtx *RequestContext, endpoint string, requestBodyLength int) { headers := []*configPb.HeaderValueOption{ { Header: &configPb.HeaderValue{ @@ -520,10 +509,6 @@ func (s *StreamingServer) populateRequestHeaderResponse(ctx context.Context, req }, }) } - // Print headers for debugging - for _, header := range headers { - logger.V(logutil.DEBUG).Info("Request body header", "key", header.Header.Key, "value", header.Header.RawValue) - } targetEndpointValue := &structpb.Struct{ Fields: map[string]*structpb.Value{ diff --git a/pkg/epp/scheduling/scheduler.go b/pkg/epp/scheduling/scheduler.go index c861996a..63d829a1 100644 --- a/pkg/epp/scheduling/scheduler.go +++ b/pkg/epp/scheduling/scheduler.go @@ -125,13 +125,13 @@ func (s *Scheduler) Schedule(ctx context.Context, req *LLMRequest) (targetPod ba logger := log.FromContext(ctx).WithValues("request", req) podMetrics := s.datastore.PodGetAll() - logger.V(logutil.VERBOSE).Info(fmt.Sprintf("Scheduling a request. Metrics: %+v", podMetrics)) + logger.V(logutil.DEBUG).Info(fmt.Sprintf("Scheduling a request. Metrics: %+v", podMetrics)) pods, err := s.filter.Filter(logger, req, podMetrics) if err != nil || len(pods) == 0 { return nil, fmt.Errorf( "failed to apply filter, resulted %v pods, this should never happen: %w", len(pods), err) } - logger.V(logutil.VERBOSE).Info(fmt.Sprintf("Selecting a random pod from %d candidates: %+v", len(pods), pods)) + logger.V(logutil.DEBUG).Info(fmt.Sprintf("Selecting a random pod from %d candidates: %+v", len(pods), pods)) i := rand.Intn(len(pods)) return pods[i], nil }