|
1 | 1 | package handlers
|
2 | 2 |
|
3 | 3 | import (
|
| 4 | + "context" |
4 | 5 | "io"
|
5 | 6 | "time"
|
6 | 7 |
|
7 | 8 | extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
|
8 | 9 | envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
|
9 |
| - "github.com/go-logr/logr" |
10 | 10 | "google.golang.org/grpc/codes"
|
11 | 11 | "google.golang.org/grpc/status"
|
12 | 12 | "sigs.k8s.io/controller-runtime/pkg/log"
|
@@ -38,7 +38,7 @@ type Server struct {
|
38 | 38 | }
|
39 | 39 |
|
40 | 40 | type Scheduler interface {
|
41 |
| - Schedule(logger logr.Logger, b *scheduling.LLMRequest) (targetPod backend.Pod, err error) |
| 41 | + Schedule(ctx context.Context, b *scheduling.LLMRequest) (targetPod backend.Pod, err error) |
42 | 42 | }
|
43 | 43 |
|
44 | 44 | // PodProvider is an interface to provide set of pods in the backend and information such as metrics.
|
@@ -83,23 +83,23 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
|
83 | 83 | switch v := req.Request.(type) {
|
84 | 84 | case *extProcPb.ProcessingRequest_RequestHeaders:
|
85 | 85 | reqCtx.RequestReceivedTimestamp = time.Now()
|
86 |
| - resp = HandleRequestHeaders(logger, reqCtx, req) |
| 86 | + resp = HandleRequestHeaders(ctx, reqCtx, req) |
87 | 87 | loggerVerbose.Info("Request context after HandleRequestHeaders", "context", reqCtx)
|
88 | 88 | case *extProcPb.ProcessingRequest_RequestBody:
|
89 |
| - resp, err = s.HandleRequestBody(logger, reqCtx, req) |
| 89 | + resp, err = s.HandleRequestBody(ctx, reqCtx, req) |
90 | 90 | if err == nil {
|
91 | 91 | metrics.RecordRequestCounter(reqCtx.Model, reqCtx.ResolvedTargetModel)
|
92 | 92 | metrics.RecordRequestSizes(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestSize)
|
93 | 93 | }
|
94 | 94 | loggerVerbose.Info("Request context after HandleRequestBody", "context", reqCtx)
|
95 | 95 | case *extProcPb.ProcessingRequest_ResponseHeaders:
|
96 |
| - resp, err = s.HandleResponseHeaders(logger, reqCtx, req) |
| 96 | + resp, err = s.HandleResponseHeaders(ctx, reqCtx, req) |
97 | 97 | loggerVerbose.Info("Request context after HandleResponseHeaders", "context", reqCtx)
|
98 | 98 | case *extProcPb.ProcessingRequest_ResponseBody:
|
99 |
| - resp, err = s.HandleResponseBody(logger, reqCtx, req) |
| 99 | + resp, err = s.HandleResponseBody(ctx, reqCtx, req) |
100 | 100 | if err == nil && reqCtx.ResponseComplete {
|
101 | 101 | reqCtx.ResponseCompleteTimestamp = time.Now()
|
102 |
| - metrics.RecordRequestLatencies(logger, reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestReceivedTimestamp, reqCtx.ResponseCompleteTimestamp) |
| 102 | + metrics.RecordRequestLatencies(ctx, reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestReceivedTimestamp, reqCtx.ResponseCompleteTimestamp) |
103 | 103 | metrics.RecordResponseSizes(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.ResponseSize)
|
104 | 104 | metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.PromptTokens)
|
105 | 105 | metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.CompletionTokens)
|
|
0 commit comments