Skip to content

Commit 5384215

Browse files
committed
Cleanup logging in the request scheduling path
1 parent b7d35b6 commit 5384215

File tree

4 files changed

+35
-50
lines changed

4 files changed

+35
-50
lines changed

pkg/epp/backend/metrics/logger.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func StartMetricsLogger(ctx context.Context, datastore Datastore, refreshPrometh
7878
return time.Since(pm.GetMetrics().UpdateTime) > metricsValidityPeriod
7979
})
8080
s := fmt.Sprintf("Current Pods and metrics gathered. Fresh metrics: %+v, Stale metrics: %+v", podsWithFreshMetrics, podsWithStaleMetrics)
81-
logger.Info(s)
81+
logger.V(logutil.VERBOSE).Info(s)
8282
}
8383
}
8484
}()
@@ -89,7 +89,7 @@ func flushPrometheusMetricsOnce(logger logr.Logger, datastore Datastore) {
8989
pool, err := datastore.PoolGet()
9090
if err != nil {
9191
// No inference pool or not initialize.
92-
logger.V(logutil.VERBOSE).Info("pool is not initialized, skipping flushing metrics")
92+
logger.V(logutil.DEFAULT).Info("pool is not initialized, skipping flushing metrics")
9393
return
9494
}
9595

pkg/epp/handlers/response.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ func (s *Server) HandleStreaming(
202202
) error {
203203
responseText := string(body.ResponseBody.Body)
204204
if strings.Contains(responseText, streamingEndMsg) {
205-
parsedResp := ParseRespForUsage(ctx, responseText, loggerVerbose)
205+
parsedResp := ParseRespForUsage(ctx, responseText)
206206
reqCtx.Usage = parsedResp.Usage
207207
}
208208

@@ -230,7 +230,6 @@ func (s *Server) HandleStreaming(
230230
func ParseRespForUsage(
231231
ctx context.Context,
232232
responseText string,
233-
loggerVerbose logr.Logger,
234233
) Response {
235234
response := Response{}
236235

@@ -246,7 +245,8 @@ func ParseRespForUsage(
246245

247246
byteSlice := []byte(content)
248247
if err := json.Unmarshal(byteSlice, &response); err != nil {
249-
loggerVerbose.Error(err, "unmarshaling response body")
248+
logger := log.FromContext(ctx)
249+
logger.V(logutil.DEFAULT).Error(err, "unmarshaling response body")
250250
continue
251251
}
252252
}

pkg/epp/handlers/streamingserver.go

+28-43
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ type StreamingServer struct {
6565
func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
6666
ctx := srv.Context()
6767
logger := log.FromContext(ctx)
68-
loggerVerbose := logger.V(logutil.VERBOSE)
69-
loggerVerbose.Info("Processing")
68+
loggerTrace := logger.V(logutil.TRACE)
69+
loggerTrace.Info("Processing")
7070

7171
// Create request context to share states during life time of an HTTP request.
7272
// See https://github.com/envoyproxy/envoy/issues/17540.
@@ -103,21 +103,21 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
103103
if recvErr != nil {
104104
// This error occurs very frequently, though it doesn't seem to have any impact.
105105
// TODO Figure out if we can remove this noise.
106-
loggerVerbose.Error(err, "Cannot receive stream request")
106+
logger.V(logutil.DEFAULT).Error(err, "Cannot receive stream request")
107107
return status.Errorf(codes.Unknown, "cannot receive stream request: %v", err)
108108
}
109109

110110
switch v := req.Request.(type) {
111111
case *extProcPb.ProcessingRequest_RequestHeaders:
112112
err = s.HandleRequestHeaders(ctx, reqCtx, v)
113113
case *extProcPb.ProcessingRequest_RequestBody:
114-
loggerVerbose.Info("Incoming body chunk", "body", string(v.RequestBody.Body), "EoS", v.RequestBody.EndOfStream)
114+
loggerTrace.Info("Incoming body chunk", "EoS", v.RequestBody.EndOfStream)
115115
// In the stream case, we can receive multiple request bodies.
116116
body = append(body, v.RequestBody.Body...)
117117

118118
// Message is buffered, we can read and decode.
119119
if v.RequestBody.EndOfStream {
120-
loggerVerbose.Info("decoding")
120+
loggerTrace.Info("decoding")
121121
err = json.Unmarshal(body, &requestBody)
122122
if err != nil {
123123
logger.V(logutil.DEFAULT).Error(err, "Error unmarshaling request body")
@@ -133,22 +133,19 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
133133
metrics.RecordRequestCounter(reqCtx.Model, reqCtx.ResolvedTargetModel)
134134
metrics.RecordRequestSizes(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestSize)
135135
}
136-
loggerVerbose.Info("Request context after HandleRequestBody", "context", reqCtx)
137136
}
138137
case *extProcPb.ProcessingRequest_RequestTrailers:
139138
// This is currently unused.
140139
case *extProcPb.ProcessingRequest_ResponseHeaders:
141-
loggerVerbose.Info("got response headers", "headers", v.ResponseHeaders.Headers.GetHeaders())
142140
for _, header := range v.ResponseHeaders.Headers.GetHeaders() {
143141
value := string(header.RawValue)
144142

145-
logger.V(logutil.TRACE).Info("header", "key", header.Key, "value", value)
143+
loggerTrace.Info("header", "key", header.Key, "value", value)
146144
if header.Key == "status" && value != "200" {
147145
reqCtx.ResponseStatusCode = errutil.ModelServerError
148146
} else if header.Key == "content-type" && strings.Contains(value, "text/event-stream") {
149147
reqCtx.modelServerStreaming = true
150-
loggerVerbose.Info("model server is streaming response")
151-
logger.Error(nil, "made it here")
148+
loggerTrace.Info("model server is streaming response")
152149
}
153150
}
154151
reqCtx.RequestState = ResponseRecieved
@@ -179,7 +176,7 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
179176
responseText := string(v.ResponseBody.Body)
180177
s.HandleResponseBodyModelStreaming(ctx, reqCtx, responseText)
181178
if v.ResponseBody.EndOfStream {
182-
loggerVerbose.Info("streaming is completed")
179+
loggerTrace.Info("stream completed")
183180

184181
reqCtx.ResponseCompleteTimestamp = time.Now()
185182
metrics.RecordRequestLatencies(ctx, reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestReceivedTimestamp, reqCtx.ResponseCompleteTimestamp)
@@ -207,6 +204,7 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
207204

208205
// Message is buffered, we can read and decode.
209206
if v.ResponseBody.EndOfStream {
207+
loggerTrace.Info("stream completed")
210208
// Don't send a 500 on a response error. Just let the message passthrough and log our error for debugging purposes.
211209
// We assume the body is valid JSON, err messages are not guaranteed to be json, and so capturing and sending a 500 obfuscates the response message.
212210
// using the standard 'err' var will send an immediate error response back to the caller.
@@ -226,7 +224,6 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
226224
metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Usage.PromptTokens)
227225
metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Usage.CompletionTokens)
228226
}
229-
loggerVerbose.Info("Request context after HandleResponseBody", "context", reqCtx)
230227
}
231228
}
232229
case *extProcPb.ProcessingRequest_ResponseTrailers:
@@ -246,27 +243,28 @@ func (s *StreamingServer) Process(srv extProcPb.ExternalProcessor_ProcessServer)
246243
}
247244
return nil
248245
}
249-
loggerVerbose.Info("checking", "request state", reqCtx.RequestState)
250-
if err := reqCtx.updateStateAndSendIfNeeded(srv, loggerVerbose); err != nil {
246+
loggerTrace.Info("checking", "request state", reqCtx.RequestState)
247+
if err := reqCtx.updateStateAndSendIfNeeded(srv, logger); err != nil {
251248
return err
252249
}
253250
}
254251
}
255252

256253
// updateStateAndSendIfNeeded checks state and can send mutiple responses in a single pass, but only if ordered properly.
257254
// Order of requests matter in FULL_DUPLEX_STREAMING. For both request and response, the order of response sent back MUST be: Header->Body->Trailer, with trailer being optional.
258-
func (r *RequestContext) updateStateAndSendIfNeeded(srv extProcPb.ExternalProcessor_ProcessServer, loggerVerbose logr.Logger) error {
255+
func (r *RequestContext) updateStateAndSendIfNeeded(srv extProcPb.ExternalProcessor_ProcessServer, logger logr.Logger) error {
256+
loggerTrace := logger.V(logutil.TRACE)
259257
// No switch statement as we could send multiple responses in one pass.
260258
if r.RequestState == RequestReceived && r.reqHeaderResp != nil {
261-
loggerVerbose.Info("Request header response", "obj", r.reqHeaderResp)
259+
loggerTrace.Info("Sending request header response", "obj", r.reqHeaderResp)
262260
if err := srv.Send(r.reqHeaderResp); err != nil {
263-
loggerVerbose.Error(err, "error sending response")
261+
logger.V(logutil.DEFAULT).Error(err, "error sending response")
264262
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
265263
}
266264
r.RequestState = HeaderRequestResponseComplete
267265
}
268266
if r.RequestState == HeaderRequestResponseComplete && r.reqBodyResp != nil {
269-
loggerVerbose.Info("Request body response", "obj", r.reqBodyResp)
267+
loggerTrace.Info("Sending request body response")
270268
if err := srv.Send(r.reqBodyResp); err != nil {
271269
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
272270
}
@@ -281,14 +279,14 @@ func (r *RequestContext) updateStateAndSendIfNeeded(srv extProcPb.ExternalProces
281279
}
282280
}
283281
if r.RequestState == ResponseRecieved && r.respHeaderResp != nil {
284-
loggerVerbose.Info("Response header response", "obj", r.respHeaderResp)
282+
loggerTrace.Info("Sending response header response", "obj", r.respHeaderResp)
285283
if err := srv.Send(r.respHeaderResp); err != nil {
286284
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
287285
}
288286
r.RequestState = HeaderResponseResponseComplete
289287
}
290288
if r.RequestState == HeaderResponseResponseComplete && r.respBodyResp != nil {
291-
loggerVerbose.Info("Response body response", "obj", r.respBodyResp)
289+
loggerTrace.Info("Sending response body response")
292290
if err := srv.Send(r.respBodyResp); err != nil {
293291
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
294292
}
@@ -298,7 +296,7 @@ func (r *RequestContext) updateStateAndSendIfNeeded(srv extProcPb.ExternalProces
298296
r.RequestState = BodyResponseResponsesComplete
299297
}
300298
// Dump the response so a new stream message can begin
301-
r.reqBodyResp = nil
299+
r.respBodyResp = nil
302300
}
303301
if r.RequestState == BodyResponseResponsesComplete && r.respTrailerResp != nil {
304302
// Trailers in requests are not guaranteed
@@ -318,15 +316,13 @@ func (s *StreamingServer) HandleRequestBody(
318316
) (*RequestContext, error) {
319317
var requestBodyBytes []byte
320318
logger := log.FromContext(ctx)
321-
loggerVerbose := logger.V(logutil.VERBOSE)
322-
loggerVerbose.Info("Handling request body")
323319

324320
// Resolve target models.
325321
model, ok := requestBodyMap["model"].(string)
326322
if !ok {
327323
return reqCtx, errutil.Error{Code: errutil.BadRequest, Msg: "model not found in request"}
328324
}
329-
loggerVerbose.Info("Model requested", "model", model)
325+
330326
modelName := model
331327

332328
// NOTE: The nil checking for the modelObject means that we DO allow passthrough currently.
@@ -347,7 +343,7 @@ func (s *StreamingServer) HandleRequestBody(
347343
ResolvedTargetModel: modelName,
348344
Critical: datastore.IsCritical(modelObj),
349345
}
350-
loggerVerbose.Info("LLM request assembled", "request", llmReq)
346+
logger.V(logutil.DEBUG).Info("LLM request assembled", "model", llmReq.Model, "targetModel", llmReq.ResolvedTargetModel, "critical", llmReq.Critical)
351347

352348
var err error
353349
// Update target models in the body.
@@ -360,7 +356,6 @@ func (s *StreamingServer) HandleRequestBody(
360356
logger.V(logutil.DEFAULT).Error(err, "Error marshaling request body")
361357
return reqCtx, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("error marshaling request body: %v", err)}
362358
}
363-
loggerVerbose.Info("Updated request body marshalled", "body", string(requestBodyBytes))
364359

365360
target, err := s.scheduler.Schedule(ctx, llmReq)
366361
if err != nil {
@@ -377,15 +372,16 @@ func (s *StreamingServer) HandleRequestBody(
377372
endpoint := targetPod.Address + ":" + strconv.Itoa(int(pool.Spec.TargetPortNumber))
378373

379374
logger.V(logutil.DEFAULT).Info("Request handled",
380-
"model", llmReq.Model, "targetModel", llmReq.ResolvedTargetModel, "endpoint", targetPod)
375+
"model", llmReq.Model, "targetModel", llmReq.ResolvedTargetModel, "endpoint", targetPod, "endpoint metrics",
376+
fmt.Sprintf("%+v", target))
381377

382378
reqCtx.Model = llmReq.Model
383379
reqCtx.ResolvedTargetModel = llmReq.ResolvedTargetModel
384380
reqCtx.RequestSize = len(requestBodyBytes)
385381
reqCtx.TargetPod = targetPod.NamespacedName.String()
386382
reqCtx.TargetEndpoint = endpoint
387383

388-
s.populateRequestHeaderResponse(ctx, reqCtx, endpoint, len(requestBodyBytes))
384+
s.populateRequestHeaderResponse(reqCtx, endpoint, len(requestBodyBytes))
389385

390386
reqCtx.reqBodyResp = &extProcPb.ProcessingResponse{
391387
// The Endpoint Picker supports two approaches to communicating the target endpoint, as a request header
@@ -416,8 +412,6 @@ func (s *StreamingServer) HandleResponseBody(
416412
response map[string]interface{},
417413
) (*RequestContext, error) {
418414
logger := log.FromContext(ctx)
419-
loggerVerbose := logger.V(logutil.VERBOSE)
420-
loggerVerbose.Info("Processing HandleResponseBody")
421415
responseBytes, err := json.Marshal(response)
422416
if err != nil {
423417
logger.V(logutil.DEFAULT).Error(err, "error marshalling responseBody")
@@ -431,7 +425,7 @@ func (s *StreamingServer) HandleResponseBody(
431425
TotalTokens: int(usg["total_tokens"].(float64)),
432426
}
433427
reqCtx.Usage = usage
434-
loggerVerbose.Info("Response generated", "usage", reqCtx.Usage)
428+
logger.V(logutil.VERBOSE).Info("Response generated", "usage", reqCtx.Usage)
435429
}
436430
reqCtx.ResponseSize = len(responseBytes)
437431
// ResponseComplete is to indicate the response is complete. In non-streaming
@@ -469,12 +463,8 @@ func (s *StreamingServer) HandleResponseBodyModelStreaming(
469463
reqCtx *RequestContext,
470464
responseText string,
471465
) {
472-
logger := log.FromContext(ctx)
473-
loggerVerbose := logger.V(logutil.VERBOSE)
474-
loggerVerbose.Info("Processing HandleResponseBody")
475-
476466
if strings.Contains(responseText, streamingEndMsg) {
477-
resp := ParseRespForUsage(ctx, responseText, loggerVerbose)
467+
resp := ParseRespForUsage(ctx, responseText)
478468
metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, resp.Usage.PromptTokens)
479469
metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, resp.Usage.CompletionTokens)
480470
}
@@ -495,13 +485,12 @@ func (s *StreamingServer) HandleRequestHeaders(ctx context.Context, reqCtx *Requ
495485
return err
496486
}
497487
endpoint := pod.Address + ":" + strconv.Itoa(int(pool.Spec.TargetPortNumber))
498-
s.populateRequestHeaderResponse(ctx, reqCtx, endpoint, 0)
488+
s.populateRequestHeaderResponse(reqCtx, endpoint, 0)
499489
}
500490
return nil
501491
}
502492

503-
func (s *StreamingServer) populateRequestHeaderResponse(ctx context.Context, reqCtx *RequestContext, endpoint string, requestBodyLength int) {
504-
logger := log.FromContext(ctx)
493+
func (s *StreamingServer) populateRequestHeaderResponse(reqCtx *RequestContext, endpoint string, requestBodyLength int) {
505494
headers := []*configPb.HeaderValueOption{
506495
{
507496
Header: &configPb.HeaderValue{
@@ -520,10 +509,6 @@ func (s *StreamingServer) populateRequestHeaderResponse(ctx context.Context, req
520509
},
521510
})
522511
}
523-
// Print headers for debugging
524-
for _, header := range headers {
525-
logger.V(logutil.DEBUG).Info("Request body header", "key", header.Header.Key, "value", header.Header.RawValue)
526-
}
527512

528513
targetEndpointValue := &structpb.Struct{
529514
Fields: map[string]*structpb.Value{

pkg/epp/scheduling/scheduler.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -125,13 +125,13 @@ func (s *Scheduler) Schedule(ctx context.Context, req *LLMRequest) (targetPod ba
125125
logger := log.FromContext(ctx).WithValues("request", req)
126126
podMetrics := s.datastore.PodGetAll()
127127

128-
logger.V(logutil.VERBOSE).Info(fmt.Sprintf("Scheduling a request. Metrics: %+v", podMetrics))
128+
logger.V(logutil.DEBUG).Info(fmt.Sprintf("Scheduling a request. Metrics: %+v", podMetrics))
129129
pods, err := s.filter.Filter(logger, req, podMetrics)
130130
if err != nil || len(pods) == 0 {
131131
return nil, fmt.Errorf(
132132
"failed to apply filter, resulted %v pods, this should never happen: %w", len(pods), err)
133133
}
134-
logger.V(logutil.VERBOSE).Info(fmt.Sprintf("Selecting a random pod from %d candidates: %+v", len(pods), pods))
134+
logger.V(logutil.DEBUG).Info(fmt.Sprintf("Selecting a random pod from %d candidates: %+v", len(pods), pods))
135135
i := rand.Intn(len(pods))
136136
return pods[i], nil
137137
}

0 commit comments

Comments
 (0)