From fbe330d449765eb12efcecf0b8988450c0b88e2c Mon Sep 17 00:00:00 2001 From: ahg-g Date: Fri, 31 Jan 2025 22:48:41 +0000 Subject: [PATCH 1/4] Update default target-pod and inject it into response metadata --- pkg/ext-proc/handlers/request.go | 20 ++++++++++++++----- pkg/ext-proc/handlers/server.go | 14 ++++++------- pkg/ext-proc/main.go | 8 ++++---- pkg/ext-proc/server/runserver.go | 22 ++++++++++----------- pkg/manifests/gateway/patch_policy.yaml | 4 ++-- pkg/manifests/vllm/deployment.yaml | 4 ++-- test/integration/hermetic_test.go | 26 ++++++++++++++++++------- test/testdata/envoy.yaml | 2 +- 8 files changed, 61 insertions(+), 39 deletions(-) diff --git a/pkg/ext-proc/handlers/request.go b/pkg/ext-proc/handlers/request.go index 2293f125..9008aecc 100644 --- a/pkg/ext-proc/handlers/request.go +++ b/pkg/ext-proc/handlers/request.go @@ -8,6 +8,7 @@ import ( configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" + "google.golang.org/protobuf/types/known/structpb" "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling" logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" @@ -70,23 +71,23 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces klog.V(logutil.VERBOSE).Infof("Updated body: %v", string(requestBody)) } - targetPod, err := s.scheduler.Schedule(llmReq) + targetEndpoint, err := s.scheduler.Schedule(llmReq) if err != nil { return nil, fmt.Errorf("failed to find target pod: %w", err) } - klog.V(logutil.VERBOSE).Infof("Selected target model %v in target pod: %v\n", llmReq.ResolvedTargetModel, targetPod) + klog.V(logutil.VERBOSE).Infof("Selected target model %v in target pod: %v\n", llmReq.ResolvedTargetModel, targetEndpoint) reqCtx.Model = llmReq.Model reqCtx.ResolvedTargetModel = llmReq.ResolvedTargetModel reqCtx.RequestSize = len(v.RequestBody.Body) - reqCtx.TargetPod = targetPod + reqCtx.TargetPod = targetEndpoint // Insert "target-pod" to instruct Envoy to route requests to the specified target pod. headers := []*configPb.HeaderValueOption{ { Header: &configPb.HeaderValue{ - Key: s.targetPodHeader, - RawValue: []byte(targetPod.Address), + Key: s.targetEndpointKey, + RawValue: []byte(targetEndpoint.Address), }, }, // We need to update the content length header if the body is mutated, see Envoy doc: @@ -118,6 +119,15 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces }, }, }, + DynamicMetadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + s.targetEndpointKey: { + Kind: &structpb.Value_StringValue{ + StringValue: targetEndpoint.Address, + }, + }, + }, + }, } return resp, nil } diff --git a/pkg/ext-proc/handlers/server.go b/pkg/ext-proc/handlers/server.go index 900d3c8d..172249b6 100644 --- a/pkg/ext-proc/handlers/server.go +++ b/pkg/ext-proc/handlers/server.go @@ -16,12 +16,12 @@ import ( klog "k8s.io/klog/v2" ) -func NewServer(pp PodProvider, scheduler Scheduler, targetPodHeader string, datastore ModelDataStore) *Server { +func NewServer(pp PodProvider, scheduler Scheduler, targetEndpointKey string, datastore ModelDataStore) *Server { return &Server{ - scheduler: scheduler, - podProvider: pp, - targetPodHeader: targetPodHeader, - datastore: datastore, + scheduler: scheduler, + podProvider: pp, + targetEndpointKey: targetEndpointKey, + datastore: datastore, } } @@ -32,8 +32,8 @@ type Server struct { podProvider PodProvider // The key of the header to specify the target pod address. This value needs to match Envoy // configuration. - targetPodHeader string - datastore ModelDataStore + targetEndpointKey string + datastore ModelDataStore } type Scheduler interface { diff --git a/pkg/ext-proc/main.go b/pkg/ext-proc/main.go index 77ee5b2a..a783aa2c 100644 --- a/pkg/ext-proc/main.go +++ b/pkg/ext-proc/main.go @@ -41,9 +41,9 @@ var ( "The port used for gRPC liveness and readiness probes") metricsPort = flag.Int( "metricsPort", 9090, "The metrics port") - targetPodHeader = flag.String( - "targetPodHeader", - runserver.DefaultTargetPodHeader, + targetEndpointKey = flag.String( + "targetEndpointKey", + runserver.DefaultTargetEndpointKey, "Header key used by Envoy to route to the appropriate pod. This must match Envoy configuration.") poolName = flag.String( "poolName", @@ -103,7 +103,7 @@ func main() { serverRunner := &runserver.ExtProcServerRunner{ GrpcPort: *grpcPort, - TargetPodHeader: *targetPodHeader, + TargetEndpointKey: *targetEndpointKey, PoolName: *poolName, PoolNamespace: *poolNamespace, ServiceName: *serviceName, diff --git a/pkg/ext-proc/server/runserver.go b/pkg/ext-proc/server/runserver.go index 94c6078c..1c9c1b2e 100644 --- a/pkg/ext-proc/server/runserver.go +++ b/pkg/ext-proc/server/runserver.go @@ -20,7 +20,7 @@ import ( // ExtProcServerRunner provides methods to manage an external process server. type ExtProcServerRunner struct { GrpcPort int - TargetPodHeader string + TargetEndpointKey string PoolName string PoolNamespace string ServiceName string @@ -35,20 +35,20 @@ type ExtProcServerRunner struct { // Default values for CLI flags in main const ( - DefaultGrpcPort = 9002 // default for --grpcPort - DefaultTargetPodHeader = "target-pod" // default for --targetPodHeader - DefaultPoolName = "" // required but no default - DefaultPoolNamespace = "default" // default for --poolNamespace - DefaultServiceName = "" // required but no default - DefaultZone = "" // default for --zone - DefaultRefreshPodsInterval = 10 * time.Second // default for --refreshPodsInterval - DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval + DefaultGrpcPort = 9002 // default for --grpcPort + DefaultTargetEndpointKey = "x-gateway-destination-endpoint" // default for --targetEndpointKey + DefaultPoolName = "" // required but no default + DefaultPoolNamespace = "default" // default for --poolNamespace + DefaultServiceName = "" // required but no default + DefaultZone = "" // default for --zone + DefaultRefreshPodsInterval = 10 * time.Second // default for --refreshPodsInterval + DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval ) func NewDefaultExtProcServerRunner() *ExtProcServerRunner { return &ExtProcServerRunner{ GrpcPort: DefaultGrpcPort, - TargetPodHeader: DefaultTargetPodHeader, + TargetEndpointKey: DefaultTargetEndpointKey, PoolName: DefaultPoolName, PoolNamespace: DefaultPoolNamespace, ServiceName: DefaultServiceName, @@ -130,7 +130,7 @@ func (r *ExtProcServerRunner) Start( // Register ext_proc handlers extProcPb.RegisterExternalProcessorServer( svr, - handlers.NewServer(pp, scheduling.NewScheduler(pp), r.TargetPodHeader, r.Datastore), + handlers.NewServer(pp, scheduling.NewScheduler(pp), r.TargetEndpointKey, r.Datastore), ) // Blocking and will return when shutdown is complete. diff --git a/pkg/manifests/gateway/patch_policy.yaml b/pkg/manifests/gateway/patch_policy.yaml index 00cb9857..4a556b44 100644 --- a/pkg/manifests/gateway/patch_policy.yaml +++ b/pkg/manifests/gateway/patch_policy.yaml @@ -25,7 +25,7 @@ spec: type: ORIGINAL_DST original_dst_lb_config: use_http_header: true - http_header_name: "target-pod" + http_header_name: "x-gateway-destination-endpoint" connect_timeout: 1000s lb_policy: CLUSTER_PROVIDED dns_lookup_family: V4_ONLY @@ -40,4 +40,4 @@ spec: operation: op: replace path: "/virtual_hosts/0/routes/0/route/cluster" - value: original_destination_cluster \ No newline at end of file + value: original_destination_cluster diff --git a/pkg/manifests/vllm/deployment.yaml b/pkg/manifests/vllm/deployment.yaml index 30f6f671..142b9b0a 100644 --- a/pkg/manifests/vllm/deployment.yaml +++ b/pkg/manifests/vllm/deployment.yaml @@ -39,7 +39,7 @@ spec: - "8000" - "--enable-lora" - "--max-loras" - - "4" + - "2" - "--max-cpu-loras" - "12" - "--lora-modules" @@ -132,4 +132,4 @@ spec: emptyDir: medium: Memory - name: adapters - emptyDir: {} \ No newline at end of file + emptyDir: {} diff --git a/test/integration/hermetic_test.go b/test/integration/hermetic_test.go index 44c5ae0a..95ad4908 100644 --- a/test/integration/hermetic_test.go +++ b/test/integration/hermetic_test.go @@ -21,6 +21,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/testing/protocmp" + "google.golang.org/protobuf/types/known/structpb" "inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1" "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" runserver "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/server" @@ -111,7 +112,7 @@ func SKIPTestHandleRequestBody(t *testing.T) { wantHeaders: []*configPb.HeaderValueOption{ { Header: &configPb.HeaderValue{ - Key: runserver.DefaultTargetPodHeader, + Key: runserver.DefaultTargetEndpointKey, RawValue: []byte("address-1"), }, }, @@ -162,11 +163,12 @@ func SKIPTestHandleRequestBody(t *testing.T) { func TestKubeInferenceModelRequest(t *testing.T) { tests := []struct { - name string - req *extProcPb.ProcessingRequest - wantHeaders []*configPb.HeaderValueOption - wantBody []byte - wantErr bool + name string + req *extProcPb.ProcessingRequest + wantHeaders []*configPb.HeaderValueOption + wantMetadata *structpb.Struct + wantBody []byte + wantErr bool }{ { name: "success", @@ -176,7 +178,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { wantHeaders: []*configPb.HeaderValueOption{ { Header: &configPb.HeaderValue{ - Key: runserver.DefaultTargetPodHeader, + Key: runserver.DefaultTargetEndpointKey, RawValue: []byte("address-1"), }, }, @@ -187,6 +189,15 @@ func TestKubeInferenceModelRequest(t *testing.T) { }, }, }, + wantMetadata: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + runserver.DefaultTargetEndpointKey: { + Kind: &structpb.Value_StringValue{ + StringValue: "address-1", + }, + }, + }, + }, wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"hello\",\"temperature\":0}"), wantErr: false, }, @@ -249,6 +260,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { }, }, }, + DynamicMetadata: test.wantMetadata, } res, err := sendRequest(t, client, test.req) diff --git a/test/testdata/envoy.yaml b/test/testdata/envoy.yaml index 45f9b6f8..700eb24c 100644 --- a/test/testdata/envoy.yaml +++ b/test/testdata/envoy.yaml @@ -158,7 +158,7 @@ data: max_requests: 40000 original_dst_lb_config: use_http_header: true - http_header_name: target-pod + http_header_name: x-gateway-destination-endpoint - name: ext_proc type: STRICT_DNS connect_timeout: 86400s From c86ea56b3b378d03b6c77e8a8d57282bc91b8993 Mon Sep 17 00:00:00 2001 From: ahg-g Date: Sat, 1 Feb 2025 00:25:59 +0000 Subject: [PATCH 2/4] Addressing comments round 1 --- pkg/ext-proc/handlers/request.go | 15 +++++++++------ pkg/manifests/vllm/deployment.yaml | 2 +- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/pkg/ext-proc/handlers/request.go b/pkg/ext-proc/handlers/request.go index 9008aecc..d98f4602 100644 --- a/pkg/ext-proc/handlers/request.go +++ b/pkg/ext-proc/handlers/request.go @@ -71,23 +71,23 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces klog.V(logutil.VERBOSE).Infof("Updated body: %v", string(requestBody)) } - targetEndpoint, err := s.scheduler.Schedule(llmReq) + targetPod, err := s.scheduler.Schedule(llmReq) if err != nil { return nil, fmt.Errorf("failed to find target pod: %w", err) } - klog.V(logutil.VERBOSE).Infof("Selected target model %v in target pod: %v\n", llmReq.ResolvedTargetModel, targetEndpoint) + klog.V(logutil.VERBOSE).Infof("Selected target model %v in target pod: %v\n", llmReq.ResolvedTargetModel, targetPod) reqCtx.Model = llmReq.Model reqCtx.ResolvedTargetModel = llmReq.ResolvedTargetModel reqCtx.RequestSize = len(v.RequestBody.Body) - reqCtx.TargetPod = targetEndpoint + reqCtx.TargetPod = targetPod - // Insert "target-pod" to instruct Envoy to route requests to the specified target pod. + // Insert target endpoint to instruct Envoy to route requests to the specified target pod. headers := []*configPb.HeaderValueOption{ { Header: &configPb.HeaderValue{ Key: s.targetEndpointKey, - RawValue: []byte(targetEndpoint.Address), + RawValue: []byte(targetPod.Address), }, }, // We need to update the content length header if the body is mutated, see Envoy doc: @@ -105,6 +105,9 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces } resp := &extProcPb.ProcessingResponse{ + // The Endpoint Picker supports two approaches to communicating the target endpoint, as a request header + // and as an unstructure ext-proc response metadata key/value pair. This enables different integration + // options for gateway providers. Response: &extProcPb.ProcessingResponse_RequestBody{ RequestBody: &extProcPb.BodyResponse{ Response: &extProcPb.CommonResponse{ @@ -123,7 +126,7 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces Fields: map[string]*structpb.Value{ s.targetEndpointKey: { Kind: &structpb.Value_StringValue{ - StringValue: targetEndpoint.Address, + StringValue: targetPod.Address, }, }, }, diff --git a/pkg/manifests/vllm/deployment.yaml b/pkg/manifests/vllm/deployment.yaml index 142b9b0a..4af0891d 100644 --- a/pkg/manifests/vllm/deployment.yaml +++ b/pkg/manifests/vllm/deployment.yaml @@ -39,7 +39,7 @@ spec: - "8000" - "--enable-lora" - "--max-loras" - - "2" + - "4" - "--max-cpu-loras" - "12" - "--lora-modules" From f381644250bc3964743bb5590ef584c45e29abc9 Mon Sep 17 00:00:00 2001 From: ahg-g Date: Mon, 3 Feb 2025 18:50:16 +0000 Subject: [PATCH 3/4] Update the endpoint picker proposal --- docs/proposals/003-endpoint-picker-protocol/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/proposals/003-endpoint-picker-protocol/README.md b/docs/proposals/003-endpoint-picker-protocol/README.md index 3ce38344..201e1981 100644 --- a/docs/proposals/003-endpoint-picker-protocol/README.md +++ b/docs/proposals/003-endpoint-picker-protocol/README.md @@ -12,7 +12,7 @@ The EPP MUST implement the Envoy [external processing service](https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/ext_proc/v3/external_processor)protocol. For each HTTP request, the EPP MUST communicate to the proxy the picked model server endpoint, via -adding the `target-pod` HTTP header in the request, or otherwise return an error. +adding the `x-gateway-destination-endpoint` HTTP header in the request and as an unstructured entry in the [dynamic_metadata](https://github.com/envoyproxy/go-control-plane/blob/c19bf63a811c90bf9e02f8e0dc1dcef94931ebb4/envoy/service/ext_proc/v3/external_processor.pb.go#L320) field of the ext-proc response, or otherwise return an error. ## Model Server Protocol @@ -62,4 +62,4 @@ The model server MUST expose the following LoRA adapter metrics via the same Pro Requests will be queued if the model server has reached MaxActiveAdapter and canno load the requested adapter. Example: `"max_lora": "8"`. * `running_lora_adapters`: A comma separated list of adapters that are currently loaded in GPU - memory and ready to serve requests. Example: `"running_lora_adapters": "adapter1, adapter2"` \ No newline at end of file + memory and ready to serve requests. Example: `"running_lora_adapters": "adapter1, adapter2"` From 91d1d4f57afd5179a39c1e9cd1b6b7aeaa89e5b4 Mon Sep 17 00:00:00 2001 From: ahg-g Date: Mon, 3 Feb 2025 19:51:22 +0000 Subject: [PATCH 4/4] define the behavior when the two values differ --- docs/proposals/003-endpoint-picker-protocol/README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/proposals/003-endpoint-picker-protocol/README.md b/docs/proposals/003-endpoint-picker-protocol/README.md index 201e1981..8e96a630 100644 --- a/docs/proposals/003-endpoint-picker-protocol/README.md +++ b/docs/proposals/003-endpoint-picker-protocol/README.md @@ -12,7 +12,8 @@ The EPP MUST implement the Envoy [external processing service](https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/ext_proc/v3/external_processor)protocol. For each HTTP request, the EPP MUST communicate to the proxy the picked model server endpoint, via -adding the `x-gateway-destination-endpoint` HTTP header in the request and as an unstructured entry in the [dynamic_metadata](https://github.com/envoyproxy/go-control-plane/blob/c19bf63a811c90bf9e02f8e0dc1dcef94931ebb4/envoy/service/ext_proc/v3/external_processor.pb.go#L320) field of the ext-proc response, or otherwise return an error. +adding the `x-gateway-destination-endpoint` HTTP header in the request and as an unstructured entry in the [dynamic_metadata](https://github.com/envoyproxy/go-control-plane/blob/c19bf63a811c90bf9e02f8e0dc1dcef94931ebb4/envoy/service/ext_proc/v3/external_processor.pb.go#L320) field of the ext-proc response, or otherwise return an error. The EPP MUST not set two different values in the header and the response metadata. +Setting different value leads to unpredictable behavior because proxies aren't guaranteed to support both paths, and so this protocol does not define what takes precedence. ## Model Server Protocol