diff --git a/pkg/ext-proc/backend/datastore.go b/pkg/ext-proc/backend/datastore.go index 569fb1bd..b466a2ed 100644 --- a/pkg/ext-proc/backend/datastore.go +++ b/pkg/ext-proc/backend/datastore.go @@ -6,6 +6,7 @@ import ( "sync" "inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1" + logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" corev1 "k8s.io/api/core/v1" "k8s.io/klog/v2" ) @@ -93,7 +94,7 @@ func RandomWeightedDraw(model *v1alpha1.InferenceModel, seed int64) string { for _, model := range model.Spec.TargetModels { weights += *model.Weight } - klog.V(3).Infof("Weights for Model(%v) total to: %v", model.Name, weights) + klog.V(logutil.VERBOSE).Infof("Weights for Model(%v) total to: %v", model.Name, weights) randomVal := r.Int31n(weights) for _, model := range model.Spec.TargetModels { if randomVal < *model.Weight { diff --git a/pkg/ext-proc/backend/endpointslice_reconciler.go b/pkg/ext-proc/backend/endpointslice_reconciler.go index 2539e721..55560b5f 100644 --- a/pkg/ext-proc/backend/endpointslice_reconciler.go +++ b/pkg/ext-proc/backend/endpointslice_reconciler.go @@ -5,6 +5,7 @@ import ( "strconv" "inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1" + logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" discoveryv1 "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/record" @@ -29,7 +30,7 @@ type EndpointSliceReconciler struct { } func (c *EndpointSliceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - klog.V(2).Info("Reconciling EndpointSlice ", req.NamespacedName) + klog.V(logutil.DEFAULT).Info("Reconciling EndpointSlice ", req.NamespacedName) endpointSlice := &discoveryv1.EndpointSlice{} if err := c.Get(ctx, req.NamespacedName, endpointSlice); err != nil { @@ -52,14 +53,14 @@ func (c *EndpointSliceReconciler) updateDatastore( podMap := make(map[Pod]bool) for _, endpoint := range slice.Endpoints { - klog.V(2).Infof("Zone: %v \n endpoint: %+v \n", c.Zone, endpoint) + klog.V(logutil.DEFAULT).Infof("Zone: %v \n endpoint: %+v \n", c.Zone, endpoint) if c.validPod(endpoint) { pod := Pod{ Name: endpoint.TargetRef.Name, Address: endpoint.Addresses[0] + ":" + strconv.Itoa(int(inferencePool.Spec.TargetPortNumber)), } podMap[pod] = true - klog.V(2).Infof("Storing pod %v", pod) + klog.V(logutil.DEFAULT).Infof("Storing pod %v", pod) c.Datastore.pods.Store(pod, true) } } @@ -71,7 +72,7 @@ func (c *EndpointSliceReconciler) updateDatastore( return false } if _, ok := podMap[pod]; !ok { - klog.V(2).Infof("Removing pod %v", pod) + klog.V(logutil.DEFAULT).Infof("Removing pod %v", pod) c.Datastore.pods.Delete(pod) } return true @@ -83,7 +84,7 @@ func (c *EndpointSliceReconciler) SetupWithManager(mgr ctrl.Manager) error { inferencePoolAvailable := func(object client.Object) bool { _, err := c.Datastore.getInferencePool() if err != nil { - klog.V(2).Infof("Skipping reconciling EndpointSlice because the InferencePool is not available yet: %v", err) + klog.V(logutil.DEFAULT).Infof("Skipping reconciling EndpointSlice because the InferencePool is not available yet: %v", err) } return err == nil } @@ -99,7 +100,7 @@ func (c *EndpointSliceReconciler) SetupWithManager(mgr ctrl.Manager) error { wantLabel := c.ServiceName if gotLabel != wantLabel { namesapcedName := endpointSlice.ObjectMeta.Namespace + "/" + endpointSlice.ObjectMeta.Name - klog.V(2).Infof("Skipping EndpointSlice %v because its service owner label %v doesn't match the pool service name %v", namesapcedName, gotLabel, wantLabel) + klog.V(logutil.DEFAULT).Infof("Skipping EndpointSlice %v because its service owner label %v doesn't match the pool service name %v", namesapcedName, gotLabel, wantLabel) } return gotLabel == wantLabel } diff --git a/pkg/ext-proc/backend/inferencemodel_reconciler.go b/pkg/ext-proc/backend/inferencemodel_reconciler.go index d8882e32..c4aeb58e 100644 --- a/pkg/ext-proc/backend/inferencemodel_reconciler.go +++ b/pkg/ext-proc/backend/inferencemodel_reconciler.go @@ -4,6 +4,7 @@ import ( "context" "inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1" + logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" @@ -49,7 +50,7 @@ func (c *InferenceModelReconciler) updateDatastore(infModel *v1alpha1.InferenceM c.Datastore.InferenceModels.Store(infModel.Spec.ModelName, infModel) return } - klog.V(2).Infof("Removing/Not adding inference model: %v", infModel.Spec.ModelName) + klog.V(logutil.DEFAULT).Infof("Removing/Not adding inference model: %v", infModel.Spec.ModelName) // If we get here. The model is not relevant to this pool, remove. c.Datastore.InferenceModels.Delete(infModel.Spec.ModelName) } diff --git a/pkg/ext-proc/backend/provider.go b/pkg/ext-proc/backend/provider.go index f37a3878..a97ca747 100644 --- a/pkg/ext-proc/backend/provider.go +++ b/pkg/ext-proc/backend/provider.go @@ -7,6 +7,7 @@ import ( "time" "go.uber.org/multierr" + logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" klog "k8s.io/klog/v2" ) @@ -79,13 +80,13 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio for { time.Sleep(refreshMetricsInterval) if err := p.refreshMetricsOnce(); err != nil { - klog.V(4).Infof("Failed to refresh metrics: %v", err) + klog.V(logutil.DEBUG).Infof("Failed to refresh metrics: %v", err) } } }() // Periodically print out the pods and metrics for DEBUGGING. - if klog.V(4).Enabled() { + if klog.V(logutil.DEBUG).Enabled() { go func() { for { time.Sleep(5 * time.Second) @@ -134,12 +135,12 @@ func (p *Provider) refreshMetricsOnce() error { defer func() { d := time.Since(start) // TODO: add a metric instead of logging - klog.V(4).Infof("Refreshed metrics in %v", d) + klog.V(logutil.DEBUG).Infof("Refreshed metrics in %v", d) }() var wg sync.WaitGroup errCh := make(chan error) processOnePod := func(key, value any) bool { - klog.V(4).Infof("Processing pod %v and metric %v", key, value) + klog.V(logutil.DEBUG).Infof("Processing pod %v and metric %v", key, value) pod := key.(Pod) existing := value.(*PodMetrics) wg.Add(1) @@ -151,7 +152,7 @@ func (p *Provider) refreshMetricsOnce() error { return } p.UpdatePodMetrics(pod, updated) - klog.V(4).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics) + klog.V(logutil.DEBUG).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics) }() return true } diff --git a/pkg/ext-proc/backend/vllm/metrics.go b/pkg/ext-proc/backend/vllm/metrics.go index 5fff4d8e..68196d11 100644 --- a/pkg/ext-proc/backend/vllm/metrics.go +++ b/pkg/ext-proc/backend/vllm/metrics.go @@ -13,6 +13,7 @@ import ( "github.com/prometheus/common/expfmt" "go.uber.org/multierr" "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" + logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" klog "k8s.io/klog/v2" ) @@ -170,6 +171,6 @@ func getLatestMetric(metricFamilies map[string]*dto.MetricFamily, metricName str latest = m } } - klog.V(4).Infof("Got metric value %+v for metric %v", latest, metricName) + klog.V(logutil.DEBUG).Infof("Got metric value %+v for metric %v", latest, metricName) return latest, nil } diff --git a/pkg/ext-proc/handlers/request.go b/pkg/ext-proc/handlers/request.go index ac6d6357..2293f125 100644 --- a/pkg/ext-proc/handlers/request.go +++ b/pkg/ext-proc/handlers/request.go @@ -10,6 +10,7 @@ import ( extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling" + logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" klog "k8s.io/klog/v2" ) @@ -17,7 +18,7 @@ import ( // parameter. // Envoy sends the request body to ext proc before sending the request to the backend server. func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) { - klog.V(3).Infof("Handling request body") + klog.V(logutil.VERBOSE).Infof("Handling request body") // Unmarshal request body (must be JSON). v := req.Request.(*extProcPb.ProcessingRequest_RequestBody) @@ -26,14 +27,14 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces klog.Errorf("Error unmarshaling request body: %v", err) return nil, fmt.Errorf("error unmarshaling request body: %v", err) } - klog.V(3).Infof("Request body: %v", rb) + klog.V(logutil.VERBOSE).Infof("Request body: %v", rb) // Resolve target models. model, ok := rb["model"].(string) if !ok { return nil, errors.New("model not found in request") } - klog.V(3).Infof("Model requested: %v", model) + klog.V(logutil.VERBOSE).Infof("Model requested: %v", model) modelName := model // NOTE: The nil checking for the modelObject means that we DO allow passthrough currently. @@ -54,7 +55,7 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces ResolvedTargetModel: modelName, Critical: backend.IsCritical(modelObj), } - klog.V(3).Infof("LLM Request: %+v", llmReq) + klog.V(logutil.VERBOSE).Infof("LLM Request: %+v", llmReq) requestBody := v.RequestBody.Body var err error @@ -66,14 +67,14 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces klog.Errorf("Error marshaling request body: %v", err) return nil, fmt.Errorf("error marshaling request body: %v", err) } - klog.V(3).Infof("Updated body: %v", string(requestBody)) + klog.V(logutil.VERBOSE).Infof("Updated body: %v", string(requestBody)) } targetPod, err := s.scheduler.Schedule(llmReq) if err != nil { return nil, fmt.Errorf("failed to find target pod: %w", err) } - klog.V(3).Infof("Selected target model %v in target pod: %v\n", llmReq.ResolvedTargetModel, targetPod) + klog.V(logutil.VERBOSE).Infof("Selected target model %v in target pod: %v\n", llmReq.ResolvedTargetModel, targetPod) reqCtx.Model = llmReq.Model reqCtx.ResolvedTargetModel = llmReq.ResolvedTargetModel @@ -99,7 +100,7 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces } // Print headers for debugging for _, header := range headers { - klog.V(3).Infof("[request_body] Header Key: %s, Header Value: %s\n", header.Header.Key, header.Header.RawValue) + klog.V(logutil.VERBOSE).Infof("[request_body] Header Key: %s, Header Value: %s\n", header.Header.Key, header.Header.RawValue) } resp := &extProcPb.ProcessingResponse{ @@ -122,10 +123,10 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces } func HandleRequestHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) *extProcPb.ProcessingResponse { - klog.V(3).Info("Handling request headers ...") + klog.V(logutil.VERBOSE).Info("Handling request headers ...") r := req.Request h := r.(*extProcPb.ProcessingRequest_RequestHeaders) - klog.V(3).Infof("Headers: %+v\n", h) + klog.V(logutil.VERBOSE).Infof("Headers: %+v\n", h) resp := &extProcPb.ProcessingResponse{ Response: &extProcPb.ProcessingResponse_RequestHeaders{ diff --git a/pkg/ext-proc/handlers/response.go b/pkg/ext-proc/handlers/response.go index 6f8293e3..3b8a9946 100644 --- a/pkg/ext-proc/handlers/response.go +++ b/pkg/ext-proc/handlers/response.go @@ -6,14 +6,15 @@ import ( configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" klog "k8s.io/klog/v2" ) // HandleResponseHeaders processes response headers from the backend model server. func (s *Server) HandleResponseHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) { - klog.V(3).Info("Processing ResponseHeaders") + klog.V(logutil.VERBOSE).Info("Processing ResponseHeaders") h := req.Request.(*extProcPb.ProcessingRequest_ResponseHeaders) - klog.V(3).Infof("Headers before: %+v\n", h) + klog.V(logutil.VERBOSE).Infof("Headers before: %+v\n", h) resp := &extProcPb.ProcessingResponse{ Response: &extProcPb.ProcessingResponse_ResponseHeaders{ @@ -65,7 +66,7 @@ func (s *Server) HandleResponseHeaders(reqCtx *RequestContext, req *extProcPb.Pr } }*/ func (s *Server) HandleResponseBody(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) { - klog.V(3).Info("Processing HandleResponseBody") + klog.V(logutil.VERBOSE).Info("Processing HandleResponseBody") body := req.Request.(*extProcPb.ProcessingRequest_ResponseBody) res := Response{} @@ -80,7 +81,7 @@ func (s *Server) HandleResponseBody(reqCtx *RequestContext, req *extProcPb.Proce // TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178) // will add the processing for streaming case. reqCtx.ResponseComplete = true - klog.V(3).Infof("Response: %+v", res) + klog.V(logutil.VERBOSE).Infof("Response: %+v", res) resp := &extProcPb.ProcessingResponse{ Response: &extProcPb.ProcessingResponse_ResponseBody{ diff --git a/pkg/ext-proc/handlers/server.go b/pkg/ext-proc/handlers/server.go index d152e12f..900d3c8d 100644 --- a/pkg/ext-proc/handlers/server.go +++ b/pkg/ext-proc/handlers/server.go @@ -12,6 +12,7 @@ import ( "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics" "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling" + logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" klog "k8s.io/klog/v2" ) @@ -50,7 +51,7 @@ type ModelDataStore interface { } func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { - klog.V(3).Info("Processing") + klog.V(logutil.VERBOSE).Info("Processing") ctx := srv.Context() // Create request context to share states during life time of an HTTP request. // See https://github.com/envoyproxy/envoy/issues/17540. @@ -70,7 +71,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { if err != nil { // This error occurs very frequently, though it doesn't seem to have any impact. // TODO Figure out if we can remove this noise. - klog.V(3).Infof("cannot receive stream request: %v", err) + klog.V(logutil.VERBOSE).Infof("cannot receive stream request: %v", err) return status.Errorf(codes.Unknown, "cannot receive stream request: %v", err) } @@ -79,17 +80,17 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { case *extProcPb.ProcessingRequest_RequestHeaders: reqCtx.RequestReceivedTimestamp = time.Now() resp = HandleRequestHeaders(reqCtx, req) - klog.V(3).Infof("Request context after HandleRequestHeaders: %+v", reqCtx) + klog.V(logutil.VERBOSE).Infof("Request context after HandleRequestHeaders: %+v", reqCtx) case *extProcPb.ProcessingRequest_RequestBody: resp, err = s.HandleRequestBody(reqCtx, req) if err == nil { metrics.RecordRequestCounter(reqCtx.Model, reqCtx.ResolvedTargetModel) metrics.RecordRequestSizes(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestSize) } - klog.V(3).Infof("Request context after HandleRequestBody: %+v", reqCtx) + klog.V(logutil.VERBOSE).Infof("Request context after HandleRequestBody: %+v", reqCtx) case *extProcPb.ProcessingRequest_ResponseHeaders: resp, err = s.HandleResponseHeaders(reqCtx, req) - klog.V(3).Infof("Request context after HandleResponseHeaders: %+v", reqCtx) + klog.V(logutil.VERBOSE).Infof("Request context after HandleResponseHeaders: %+v", reqCtx) case *extProcPb.ProcessingRequest_ResponseBody: resp, err = s.HandleResponseBody(reqCtx, req) if err == nil && reqCtx.ResponseComplete { @@ -99,7 +100,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.PromptTokens) metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.CompletionTokens) } - klog.V(3).Infof("Request context after HandleResponseBody: %+v", reqCtx) + klog.V(logutil.VERBOSE).Infof("Request context after HandleResponseBody: %+v", reqCtx) default: klog.Errorf("Unknown Request type %+v", v) return status.Error(codes.Unknown, "unknown request type") @@ -124,7 +125,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error { } } - klog.V(3).Infof("response: %v", resp) + klog.V(logutil.VERBOSE).Infof("response: %v", resp) if err := srv.Send(resp); err != nil { klog.Errorf("send error %v", err) return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err) diff --git a/pkg/ext-proc/scheduling/filter.go b/pkg/ext-proc/scheduling/filter.go index 09779d63..d431b076 100644 --- a/pkg/ext-proc/scheduling/filter.go +++ b/pkg/ext-proc/scheduling/filter.go @@ -5,6 +5,7 @@ import ( "math" "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" + logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" klog "k8s.io/klog/v2" ) @@ -41,7 +42,7 @@ func (f *filter) Name() string { } func (f *filter) Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) { - klog.V(3).Infof("Running filter %q on request %v with %v pods", f.name, req, len(pods)) + klog.V(logutil.VERBOSE).Infof("Running filter %q on request %v with %v pods", f.name, req, len(pods)) filtered, err := f.filter(req, pods) @@ -54,7 +55,7 @@ func (f *filter) Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend if f.nextOnSuccess != nil { next = f.nextOnSuccess } - klog.V(3).Infof("onSuccess %q -> %q, filtered: %v", f.name, next.Name(), len(filtered)) + klog.V(logutil.VERBOSE).Infof("onSuccess %q -> %q, filtered: %v", f.name, next.Name(), len(filtered)) // On success, pass the filtered result to the next filter. return next.Filter(req, filtered) } else { @@ -65,7 +66,7 @@ func (f *filter) Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend if f.nextOnFailure != nil { next = f.nextOnFailure } - klog.V(3).Infof("onFailure %q -> %q", f.name, next.Name()) + klog.V(logutil.VERBOSE).Infof("onFailure %q -> %q", f.name, next.Name()) // On failure, pass the initial set of pods to the next filter. return next.Filter(req, pods) } diff --git a/pkg/ext-proc/scheduling/scheduler.go b/pkg/ext-proc/scheduling/scheduler.go index c6a91541..9fc3e663 100644 --- a/pkg/ext-proc/scheduling/scheduler.go +++ b/pkg/ext-proc/scheduling/scheduler.go @@ -8,6 +8,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" + logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" klog "k8s.io/klog/v2" ) @@ -111,13 +112,13 @@ type PodMetricsProvider interface { // Schedule finds the target pod based on metrics and the requested lora adapter. func (s *Scheduler) Schedule(req *LLMRequest) (targetPod backend.Pod, err error) { - klog.V(3).Infof("request: %v; metrics: %+v", req, s.podMetricsProvider.AllPodMetrics()) + klog.V(logutil.VERBOSE).Infof("request: %v; metrics: %+v", req, s.podMetricsProvider.AllPodMetrics()) pods, err := s.filter.Filter(req, s.podMetricsProvider.AllPodMetrics()) if err != nil || len(pods) == 0 { return backend.Pod{}, fmt.Errorf( "failed to apply filter, resulted %v pods, this should never happen: %w", len(pods), err) } - klog.V(3).Infof("Going to randomly select a pod from the candidates: %+v", pods) + klog.V(logutil.VERBOSE).Infof("Going to randomly select a pod from the candidates: %+v", pods) i := rand.Intn(len(pods)) return pods[i].Pod, nil } diff --git a/pkg/ext-proc/util/logging/logging_const.go b/pkg/ext-proc/util/logging/logging_const.go new file mode 100644 index 00000000..57c44cf6 --- /dev/null +++ b/pkg/ext-proc/util/logging/logging_const.go @@ -0,0 +1,7 @@ +package logging + +const ( + DEFAULT = 2 + VERBOSE = 3 + DEBUG = 4 +)