Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding logging const and updating usage #236

Merged
merged 1 commit into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/ext-proc/backend/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"

"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -93,7 +94,7 @@ func RandomWeightedDraw(model *v1alpha1.InferenceModel, seed int64) string {
for _, model := range model.Spec.TargetModels {
weights += *model.Weight
}
klog.V(3).Infof("Weights for Model(%v) total to: %v", model.Name, weights)
klog.V(logutil.VERBOSE).Infof("Weights for Model(%v) total to: %v", model.Name, weights)
randomVal := r.Int31n(weights)
for _, model := range model.Spec.TargetModels {
if randomVal < *model.Weight {
Expand Down
13 changes: 7 additions & 6 deletions pkg/ext-proc/backend/endpointslice_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"strconv"

"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
Expand All @@ -29,7 +30,7 @@ type EndpointSliceReconciler struct {
}

func (c *EndpointSliceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
klog.V(2).Info("Reconciling EndpointSlice ", req.NamespacedName)
klog.V(logutil.DEFAULT).Info("Reconciling EndpointSlice ", req.NamespacedName)

endpointSlice := &discoveryv1.EndpointSlice{}
if err := c.Get(ctx, req.NamespacedName, endpointSlice); err != nil {
Expand All @@ -52,14 +53,14 @@ func (c *EndpointSliceReconciler) updateDatastore(
podMap := make(map[Pod]bool)

for _, endpoint := range slice.Endpoints {
klog.V(2).Infof("Zone: %v \n endpoint: %+v \n", c.Zone, endpoint)
klog.V(logutil.DEFAULT).Infof("Zone: %v \n endpoint: %+v \n", c.Zone, endpoint)
if c.validPod(endpoint) {
pod := Pod{
Name: endpoint.TargetRef.Name,
Address: endpoint.Addresses[0] + ":" + strconv.Itoa(int(inferencePool.Spec.TargetPortNumber)),
}
podMap[pod] = true
klog.V(2).Infof("Storing pod %v", pod)
klog.V(logutil.DEFAULT).Infof("Storing pod %v", pod)
c.Datastore.pods.Store(pod, true)
}
}
Expand All @@ -71,7 +72,7 @@ func (c *EndpointSliceReconciler) updateDatastore(
return false
}
if _, ok := podMap[pod]; !ok {
klog.V(2).Infof("Removing pod %v", pod)
klog.V(logutil.DEFAULT).Infof("Removing pod %v", pod)
c.Datastore.pods.Delete(pod)
}
return true
Expand All @@ -83,7 +84,7 @@ func (c *EndpointSliceReconciler) SetupWithManager(mgr ctrl.Manager) error {
inferencePoolAvailable := func(object client.Object) bool {
_, err := c.Datastore.getInferencePool()
if err != nil {
klog.V(2).Infof("Skipping reconciling EndpointSlice because the InferencePool is not available yet: %v", err)
klog.V(logutil.DEFAULT).Infof("Skipping reconciling EndpointSlice because the InferencePool is not available yet: %v", err)
}
return err == nil
}
Expand All @@ -99,7 +100,7 @@ func (c *EndpointSliceReconciler) SetupWithManager(mgr ctrl.Manager) error {
wantLabel := c.ServiceName
if gotLabel != wantLabel {
namesapcedName := endpointSlice.ObjectMeta.Namespace + "/" + endpointSlice.ObjectMeta.Name
klog.V(2).Infof("Skipping EndpointSlice %v because its service owner label %v doesn't match the pool service name %v", namesapcedName, gotLabel, wantLabel)
klog.V(logutil.DEFAULT).Infof("Skipping EndpointSlice %v because its service owner label %v doesn't match the pool service name %v", namesapcedName, gotLabel, wantLabel)
}
return gotLabel == wantLabel
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ext-proc/backend/inferencemodel_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -49,7 +50,7 @@ func (c *InferenceModelReconciler) updateDatastore(infModel *v1alpha1.InferenceM
c.Datastore.InferenceModels.Store(infModel.Spec.ModelName, infModel)
return
}
klog.V(2).Infof("Removing/Not adding inference model: %v", infModel.Spec.ModelName)
klog.V(logutil.DEFAULT).Infof("Removing/Not adding inference model: %v", infModel.Spec.ModelName)
// If we get here. The model is not relevant to this pool, remove.
c.Datastore.InferenceModels.Delete(infModel.Spec.ModelName)
}
11 changes: 6 additions & 5 deletions pkg/ext-proc/backend/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"go.uber.org/multierr"
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
klog "k8s.io/klog/v2"
)

Expand Down Expand Up @@ -79,13 +80,13 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio
for {
time.Sleep(refreshMetricsInterval)
if err := p.refreshMetricsOnce(); err != nil {
klog.V(4).Infof("Failed to refresh metrics: %v", err)
klog.V(logutil.DEBUG).Infof("Failed to refresh metrics: %v", err)
}
}
}()

// Periodically print out the pods and metrics for DEBUGGING.
if klog.V(4).Enabled() {
if klog.V(logutil.DEBUG).Enabled() {
go func() {
for {
time.Sleep(5 * time.Second)
Expand Down Expand Up @@ -134,12 +135,12 @@ func (p *Provider) refreshMetricsOnce() error {
defer func() {
d := time.Since(start)
// TODO: add a metric instead of logging
klog.V(4).Infof("Refreshed metrics in %v", d)
klog.V(logutil.DEBUG).Infof("Refreshed metrics in %v", d)
}()
var wg sync.WaitGroup
errCh := make(chan error)
processOnePod := func(key, value any) bool {
klog.V(4).Infof("Processing pod %v and metric %v", key, value)
klog.V(logutil.DEBUG).Infof("Processing pod %v and metric %v", key, value)
pod := key.(Pod)
existing := value.(*PodMetrics)
wg.Add(1)
Expand All @@ -151,7 +152,7 @@ func (p *Provider) refreshMetricsOnce() error {
return
}
p.UpdatePodMetrics(pod, updated)
klog.V(4).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics)
klog.V(logutil.DEBUG).Infof("Updated metrics for pod %s: %v", pod, updated.Metrics)
}()
return true
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ext-proc/backend/vllm/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/prometheus/common/expfmt"
"go.uber.org/multierr"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
klog "k8s.io/klog/v2"
)

Expand Down Expand Up @@ -170,6 +171,6 @@ func getLatestMetric(metricFamilies map[string]*dto.MetricFamily, metricName str
latest = m
}
}
klog.V(4).Infof("Got metric value %+v for metric %v", latest, metricName)
klog.V(logutil.DEBUG).Infof("Got metric value %+v for metric %v", latest, metricName)
return latest, nil
}
19 changes: 10 additions & 9 deletions pkg/ext-proc/handlers/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ import (
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling"
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
klog "k8s.io/klog/v2"
)

// HandleRequestBody handles body of the request to the backend server, such as parsing the "model"
// parameter.
// Envoy sends the request body to ext proc before sending the request to the backend server.
func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) {
klog.V(3).Infof("Handling request body")
klog.V(logutil.VERBOSE).Infof("Handling request body")

// Unmarshal request body (must be JSON).
v := req.Request.(*extProcPb.ProcessingRequest_RequestBody)
Expand All @@ -26,14 +27,14 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
klog.Errorf("Error unmarshaling request body: %v", err)
return nil, fmt.Errorf("error unmarshaling request body: %v", err)
}
klog.V(3).Infof("Request body: %v", rb)
klog.V(logutil.VERBOSE).Infof("Request body: %v", rb)

// Resolve target models.
model, ok := rb["model"].(string)
if !ok {
return nil, errors.New("model not found in request")
}
klog.V(3).Infof("Model requested: %v", model)
klog.V(logutil.VERBOSE).Infof("Model requested: %v", model)
modelName := model

// NOTE: The nil checking for the modelObject means that we DO allow passthrough currently.
Expand All @@ -54,7 +55,7 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
ResolvedTargetModel: modelName,
Critical: backend.IsCritical(modelObj),
}
klog.V(3).Infof("LLM Request: %+v", llmReq)
klog.V(logutil.VERBOSE).Infof("LLM Request: %+v", llmReq)

requestBody := v.RequestBody.Body
var err error
Expand All @@ -66,14 +67,14 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
klog.Errorf("Error marshaling request body: %v", err)
return nil, fmt.Errorf("error marshaling request body: %v", err)
}
klog.V(3).Infof("Updated body: %v", string(requestBody))
klog.V(logutil.VERBOSE).Infof("Updated body: %v", string(requestBody))
}

targetPod, err := s.scheduler.Schedule(llmReq)
if err != nil {
return nil, fmt.Errorf("failed to find target pod: %w", err)
}
klog.V(3).Infof("Selected target model %v in target pod: %v\n", llmReq.ResolvedTargetModel, targetPod)
klog.V(logutil.VERBOSE).Infof("Selected target model %v in target pod: %v\n", llmReq.ResolvedTargetModel, targetPod)

reqCtx.Model = llmReq.Model
reqCtx.ResolvedTargetModel = llmReq.ResolvedTargetModel
Expand All @@ -99,7 +100,7 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
}
// Print headers for debugging
for _, header := range headers {
klog.V(3).Infof("[request_body] Header Key: %s, Header Value: %s\n", header.Header.Key, header.Header.RawValue)
klog.V(logutil.VERBOSE).Infof("[request_body] Header Key: %s, Header Value: %s\n", header.Header.Key, header.Header.RawValue)
}

resp := &extProcPb.ProcessingResponse{
Expand All @@ -122,10 +123,10 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
}

func HandleRequestHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) *extProcPb.ProcessingResponse {
klog.V(3).Info("Handling request headers ...")
klog.V(logutil.VERBOSE).Info("Handling request headers ...")
r := req.Request
h := r.(*extProcPb.ProcessingRequest_RequestHeaders)
klog.V(3).Infof("Headers: %+v\n", h)
klog.V(logutil.VERBOSE).Infof("Headers: %+v\n", h)

resp := &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_RequestHeaders{
Expand Down
9 changes: 5 additions & 4 deletions pkg/ext-proc/handlers/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ import (

configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
klog "k8s.io/klog/v2"
)

// HandleResponseHeaders processes response headers from the backend model server.
func (s *Server) HandleResponseHeaders(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) {
klog.V(3).Info("Processing ResponseHeaders")
klog.V(logutil.VERBOSE).Info("Processing ResponseHeaders")
h := req.Request.(*extProcPb.ProcessingRequest_ResponseHeaders)
klog.V(3).Infof("Headers before: %+v\n", h)
klog.V(logutil.VERBOSE).Infof("Headers before: %+v\n", h)

resp := &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ResponseHeaders{
Expand Down Expand Up @@ -65,7 +66,7 @@ func (s *Server) HandleResponseHeaders(reqCtx *RequestContext, req *extProcPb.Pr
}
}*/
func (s *Server) HandleResponseBody(reqCtx *RequestContext, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) {
klog.V(3).Info("Processing HandleResponseBody")
klog.V(logutil.VERBOSE).Info("Processing HandleResponseBody")
body := req.Request.(*extProcPb.ProcessingRequest_ResponseBody)

res := Response{}
Expand All @@ -80,7 +81,7 @@ func (s *Server) HandleResponseBody(reqCtx *RequestContext, req *extProcPb.Proce
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/178)
// will add the processing for streaming case.
reqCtx.ResponseComplete = true
klog.V(3).Infof("Response: %+v", res)
klog.V(logutil.VERBOSE).Infof("Response: %+v", res)

resp := &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ResponseBody{
Expand Down
15 changes: 8 additions & 7 deletions pkg/ext-proc/handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling"
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
klog "k8s.io/klog/v2"
)

Expand Down Expand Up @@ -50,7 +51,7 @@ type ModelDataStore interface {
}

func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
klog.V(3).Info("Processing")
klog.V(logutil.VERBOSE).Info("Processing")
ctx := srv.Context()
// Create request context to share states during life time of an HTTP request.
// See https://github.com/envoyproxy/envoy/issues/17540.
Expand All @@ -70,7 +71,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
if err != nil {
// This error occurs very frequently, though it doesn't seem to have any impact.
// TODO Figure out if we can remove this noise.
klog.V(3).Infof("cannot receive stream request: %v", err)
klog.V(logutil.VERBOSE).Infof("cannot receive stream request: %v", err)
return status.Errorf(codes.Unknown, "cannot receive stream request: %v", err)
}

Expand All @@ -79,17 +80,17 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
case *extProcPb.ProcessingRequest_RequestHeaders:
reqCtx.RequestReceivedTimestamp = time.Now()
resp = HandleRequestHeaders(reqCtx, req)
klog.V(3).Infof("Request context after HandleRequestHeaders: %+v", reqCtx)
klog.V(logutil.VERBOSE).Infof("Request context after HandleRequestHeaders: %+v", reqCtx)
case *extProcPb.ProcessingRequest_RequestBody:
resp, err = s.HandleRequestBody(reqCtx, req)
if err == nil {
metrics.RecordRequestCounter(reqCtx.Model, reqCtx.ResolvedTargetModel)
metrics.RecordRequestSizes(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestSize)
}
klog.V(3).Infof("Request context after HandleRequestBody: %+v", reqCtx)
klog.V(logutil.VERBOSE).Infof("Request context after HandleRequestBody: %+v", reqCtx)
case *extProcPb.ProcessingRequest_ResponseHeaders:
resp, err = s.HandleResponseHeaders(reqCtx, req)
klog.V(3).Infof("Request context after HandleResponseHeaders: %+v", reqCtx)
klog.V(logutil.VERBOSE).Infof("Request context after HandleResponseHeaders: %+v", reqCtx)
case *extProcPb.ProcessingRequest_ResponseBody:
resp, err = s.HandleResponseBody(reqCtx, req)
if err == nil && reqCtx.ResponseComplete {
Expand All @@ -99,7 +100,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.PromptTokens)
metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.CompletionTokens)
}
klog.V(3).Infof("Request context after HandleResponseBody: %+v", reqCtx)
klog.V(logutil.VERBOSE).Infof("Request context after HandleResponseBody: %+v", reqCtx)
default:
klog.Errorf("Unknown Request type %+v", v)
return status.Error(codes.Unknown, "unknown request type")
Expand All @@ -124,7 +125,7 @@ func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
}
}

klog.V(3).Infof("response: %v", resp)
klog.V(logutil.VERBOSE).Infof("response: %v", resp)
if err := srv.Send(resp); err != nil {
klog.Errorf("send error %v", err)
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
Expand Down
7 changes: 4 additions & 3 deletions pkg/ext-proc/scheduling/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math"

"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
klog "k8s.io/klog/v2"
)

Expand Down Expand Up @@ -41,7 +42,7 @@ func (f *filter) Name() string {
}

func (f *filter) Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend.PodMetrics, error) {
klog.V(3).Infof("Running filter %q on request %v with %v pods", f.name, req, len(pods))
klog.V(logutil.VERBOSE).Infof("Running filter %q on request %v with %v pods", f.name, req, len(pods))

filtered, err := f.filter(req, pods)

Expand All @@ -54,7 +55,7 @@ func (f *filter) Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend
if f.nextOnSuccess != nil {
next = f.nextOnSuccess
}
klog.V(3).Infof("onSuccess %q -> %q, filtered: %v", f.name, next.Name(), len(filtered))
klog.V(logutil.VERBOSE).Infof("onSuccess %q -> %q, filtered: %v", f.name, next.Name(), len(filtered))
// On success, pass the filtered result to the next filter.
return next.Filter(req, filtered)
} else {
Expand All @@ -65,7 +66,7 @@ func (f *filter) Filter(req *LLMRequest, pods []*backend.PodMetrics) ([]*backend
if f.nextOnFailure != nil {
next = f.nextOnFailure
}
klog.V(3).Infof("onFailure %q -> %q", f.name, next.Name())
klog.V(logutil.VERBOSE).Infof("onFailure %q -> %q", f.name, next.Name())
// On failure, pass the initial set of pods to the next filter.
return next.Filter(req, pods)
}
Expand Down
Loading