diff --git a/cmd/epp/main.go b/cmd/epp/main.go index a189984b..1f76cfab 100644 --- a/cmd/epp/main.go +++ b/cmd/epp/main.go @@ -64,10 +64,15 @@ var ( "The port used for gRPC liveness and readiness probes") metricsPort = flag.Int( "metricsPort", 9090, "The metrics port") - targetEndpointKey = flag.String( - "targetEndpointKey", - runserver.DefaultTargetEndpointKey, - "Header key used by Envoy to route to the appropriate pod. This must match Envoy configuration.") + destinationEndpointHintKey = flag.String( + "destinationEndpointHintKey", + runserver.DefaultDestinationEndpointHintKey, + "Header and response metadata key used by Envoy to route to the appropriate pod. This must match Envoy configuration.") + destinationEndpointHintMetadataNamespace = flag.String( + "DestinationEndpointHintMetadataNamespace", + runserver.DefaultDestinationEndpointHintMetadataNamespace, + "The key for the outer namespace struct in the metadata field of the extproc response that is used to wrap the"+ + "target endpoint. If not set, then an outer namespace struct should not be created.") poolName = flag.String( "poolName", runserver.DefaultPoolName, @@ -145,16 +150,17 @@ func run() error { datastore := datastore.NewDatastore() provider := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore) serverRunner := &runserver.ExtProcServerRunner{ - GrpcPort: *grpcPort, - TargetEndpointKey: *targetEndpointKey, - PoolName: *poolName, - PoolNamespace: *poolNamespace, - RefreshMetricsInterval: *refreshMetricsInterval, - RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval, - Datastore: datastore, - SecureServing: *secureServing, - CertPath: *certPath, - Provider: provider, + GrpcPort: *grpcPort, + DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace, + DestinationEndpointHintKey: *destinationEndpointHintKey, + PoolName: *poolName, + PoolNamespace: *poolNamespace, + RefreshMetricsInterval: *refreshMetricsInterval, + RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval, + Datastore: datastore, + SecureServing: *secureServing, + CertPath: *certPath, + Provider: provider, } if err := serverRunner.SetupWithManager(mgr); err != nil { setupLog.Error(err, "Failed to setup ext-proc server") diff --git a/docs/proposals/003-endpoint-picker-protocol/README.md b/docs/proposals/003-endpoint-picker-protocol/README.md index 6876135d..418c0f3c 100644 --- a/docs/proposals/003-endpoint-picker-protocol/README.md +++ b/docs/proposals/003-endpoint-picker-protocol/README.md @@ -11,8 +11,28 @@ This is the protocol between the EPP and the proxy (e.g, Envoy). The EPP MUST implement the Envoy [external processing service](https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/ext_proc/v3/external_processor)protocol. -For each HTTP request, the EPP MUST communicate to the proxy the picked model server endpoint, via -adding the `x-gateway-destination-endpoint` HTTP header in the request and as an unstructured entry in the [dynamic_metadata](https://github.com/envoyproxy/go-control-plane/blob/c19bf63a811c90bf9e02f8e0dc1dcef94931ebb4/envoy/service/ext_proc/v3/external_processor.pb.go#L320) field of the ext-proc response, or otherwise return an error. The EPP MUST not set two different values in the header and the response metadata. +For each HTTP request, the EPP MUST communicate to the proxy the picked model server endpoint via: + +1. Setting the `x-gateway-destination-endpoint` HTTP header to the selected endpoint in format. + +2. Set an unstructured entry in the [dynamic_metadata](https://github.com/envoyproxy/go-control-plane/blob/c19bf63a811c90bf9e02f8e0dc1dcef94931ebb4/envoy/service/ext_proc/v3/external_processor.pb.go#L320) field of the ext-proc response. The metadata entry for the picked endpoint MUST be wrapped with an outer key (which represents the metadata namespace) with a default of `envoy.lb`. + +The final metadata necessary would look like: +```go +dynamicMetadata: { + "envoy.lb": { + "x-gateway-destination-endpoint": " + } +} +``` + +Note: +- If the EPP did not communicate the server endpoint via these two methods, it MUST return an error. +- The EPP MUST not set two different values in the header and the inner response metadata value. + +### Why envoy.lb namespace as a default? +The `envoy.lb` namesapce is a predefined namespace used for subsetting. One common way to use the selected endpoint returned from the server, is [envoy subsets](https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/upstream/load_balancing/subsets) where host metadata for subset load balancing must be placed under `envoy.lb`. + Setting different value leads to unpredictable behavior because proxies aren't guaranteed to support both paths, and so this protocol does not define what takes precedence. ## Model Server Protocol diff --git a/pkg/epp/handlers/request.go b/pkg/epp/handlers/request.go index b9ffd0b0..c6cfdda2 100644 --- a/pkg/epp/handlers/request.go +++ b/pkg/epp/handlers/request.go @@ -119,7 +119,7 @@ func (s *Server) HandleRequestBody( headers := []*configPb.HeaderValueOption{ { Header: &configPb.HeaderValue{ - Key: s.targetEndpointKey, + Key: s.destinationEndpointHintKey, RawValue: []byte(endpoint), }, }, @@ -137,6 +137,29 @@ func (s *Server) HandleRequestBody( logger.V(logutil.DEBUG).Info("Request body header", "key", header.Header.Key, "value", header.Header.RawValue) } + targetEndpointValue := &structpb.Struct{ + Fields: map[string]*structpb.Value{ + s.destinationEndpointHintKey: { + Kind: &structpb.Value_StringValue{ + StringValue: endpoint, + }, + }, + }, + } + dynamicMetadata := targetEndpointValue + if s.destinationEndpointHintMetadataNamespace != "" { + // If a namespace is defined, wrap the selected endpoint with that. + dynamicMetadata = &structpb.Struct{ + Fields: map[string]*structpb.Value{ + s.destinationEndpointHintMetadataNamespace: { + Kind: &structpb.Value_StructValue{ + StructValue: targetEndpointValue, + }, + }, + }, + } + } + resp := &extProcPb.ProcessingResponse{ // The Endpoint Picker supports two approaches to communicating the target endpoint, as a request header // and as an unstructure ext-proc response metadata key/value pair. This enables different integration @@ -155,15 +178,7 @@ func (s *Server) HandleRequestBody( }, }, }, - DynamicMetadata: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - s.targetEndpointKey: { - Kind: &structpb.Value_StringValue{ - StringValue: endpoint, - }, - }, - }, - }, + DynamicMetadata: dynamicMetadata, } return resp, nil } diff --git a/pkg/epp/handlers/server.go b/pkg/epp/handlers/server.go index 2c61118c..9105e8b1 100644 --- a/pkg/epp/handlers/server.go +++ b/pkg/epp/handlers/server.go @@ -34,11 +34,12 @@ import ( logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging" ) -func NewServer(scheduler Scheduler, targetEndpointKey string, datastore datastore.Datastore) *Server { +func NewServer(scheduler Scheduler, destinationEndpointHintMetadataNamespace, destinationEndpointHintKey string, datastore datastore.Datastore) *Server { return &Server{ - scheduler: scheduler, - targetEndpointKey: targetEndpointKey, - datastore: datastore, + scheduler: scheduler, + destinationEndpointHintMetadataNamespace: destinationEndpointHintMetadataNamespace, + destinationEndpointHintKey: destinationEndpointHintKey, + datastore: datastore, } } @@ -48,8 +49,11 @@ type Server struct { scheduler Scheduler // The key of the header to specify the target pod address. This value needs to match Envoy // configuration. - targetEndpointKey string - datastore datastore.Datastore + destinationEndpointHintKey string + // The key acting as the outer namespace struct in the metadata extproc response to communicate + // back the picked endpoints. + destinationEndpointHintMetadataNamespace string + datastore datastore.Datastore } type Scheduler interface { diff --git a/pkg/epp/server/runserver.go b/pkg/epp/server/runserver.go index 92b7be7f..6e6b68b1 100644 --- a/pkg/epp/server/runserver.go +++ b/pkg/epp/server/runserver.go @@ -45,38 +45,41 @@ import ( // ExtProcServerRunner provides methods to manage an external process server. type ExtProcServerRunner struct { - GrpcPort int - TargetEndpointKey string - PoolName string - PoolNamespace string - RefreshMetricsInterval time.Duration - RefreshPrometheusMetricsInterval time.Duration - Datastore datastore.Datastore - Provider *backend.Provider - SecureServing bool - CertPath string + GrpcPort int + DestinationEndpointHintMetadataNamespace string + DestinationEndpointHintKey string + PoolName string + PoolNamespace string + RefreshMetricsInterval time.Duration + RefreshPrometheusMetricsInterval time.Duration + Datastore datastore.Datastore + Provider *backend.Provider + SecureServing bool + CertPath string } // Default values for CLI flags in main const ( - DefaultGrpcPort = 9002 // default for --grpcPort - DefaultTargetEndpointKey = "x-gateway-destination-endpoint" // default for --targetEndpointKey - DefaultPoolName = "" // required but no default - DefaultPoolNamespace = "default" // default for --poolNamespace - DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval - DefaultRefreshPrometheusMetricsInterval = 5 * time.Second // default for --refreshPrometheusMetricsInterval - DefaultSecureServing = true // default for --secureServing + DefaultGrpcPort = 9002 // default for --grpcPort + DefaultDestinationEndpointHintMetadataNamespace = "envoy.lb" // default for --destinationEndpointHintMetadataNamespace + DefaultDestinationEndpointHintKey = "x-gateway-destination-endpoint" // default for --destinationEndpointHintKey + DefaultPoolName = "" // required but no default + DefaultPoolNamespace = "default" // default for --poolNamespace + DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval + DefaultRefreshPrometheusMetricsInterval = 5 * time.Second // default for --refreshPrometheusMetricsInterval + DefaultSecureServing = true // default for --secureServing ) func NewDefaultExtProcServerRunner() *ExtProcServerRunner { return &ExtProcServerRunner{ - GrpcPort: DefaultGrpcPort, - TargetEndpointKey: DefaultTargetEndpointKey, - PoolName: DefaultPoolName, - PoolNamespace: DefaultPoolNamespace, - RefreshMetricsInterval: DefaultRefreshMetricsInterval, - RefreshPrometheusMetricsInterval: DefaultRefreshPrometheusMetricsInterval, - SecureServing: DefaultSecureServing, + GrpcPort: DefaultGrpcPort, + DestinationEndpointHintKey: DefaultDestinationEndpointHintKey, + DestinationEndpointHintMetadataNamespace: DefaultDestinationEndpointHintMetadataNamespace, + PoolName: DefaultPoolName, + PoolNamespace: DefaultPoolNamespace, + RefreshMetricsInterval: DefaultRefreshMetricsInterval, + RefreshPrometheusMetricsInterval: DefaultRefreshPrometheusMetricsInterval, + SecureServing: DefaultSecureServing, // Datastore can be assigned later. } } @@ -156,7 +159,7 @@ func (r *ExtProcServerRunner) AsRunnable(logger logr.Logger) manager.Runnable { } extProcPb.RegisterExternalProcessorServer( srv, - handlers.NewServer(scheduling.NewScheduler(r.Datastore), r.TargetEndpointKey, r.Datastore), + handlers.NewServer(scheduling.NewScheduler(r.Datastore), r.DestinationEndpointHintMetadataNamespace, r.DestinationEndpointHintKey, r.Datastore), ) // Forward to the gRPC runnable. diff --git a/pkg/epp/test/utils.go b/pkg/epp/test/utils.go index f82084d9..c44d7147 100644 --- a/pkg/epp/test/utils.go +++ b/pkg/epp/test/utils.go @@ -79,7 +79,7 @@ func startExtProc(logger logr.Logger, port int, datastore datastore.Datastore) * s := grpc.NewServer() - extProcPb.RegisterExternalProcessorServer(s, handlers.NewServer(scheduling.NewScheduler(datastore), "target-pod", datastore)) + extProcPb.RegisterExternalProcessorServer(s, handlers.NewServer(scheduling.NewScheduler(datastore), "", "target-pod", datastore)) logger.Info("gRPC server starting", "port", port) reflection.Register(s) diff --git a/test/integration/hermetic_test.go b/test/integration/hermetic_test.go index eb2ca40e..91bc71c6 100644 --- a/test/integration/hermetic_test.go +++ b/test/integration/hermetic_test.go @@ -100,7 +100,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { wantHeaders: []*configPb.HeaderValueOption{ { Header: &configPb.HeaderValue{ - Key: runserver.DefaultTargetEndpointKey, + Key: runserver.DefaultDestinationEndpointHintKey, RawValue: []byte("address-1:8000"), }, }, @@ -111,17 +111,9 @@ func TestKubeInferenceModelRequest(t *testing.T) { }, }, }, - wantMetadata: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - runserver.DefaultTargetEndpointKey: { - Kind: &structpb.Value_StringValue{ - StringValue: "address-1:8000", - }, - }, - }, - }, - wantBody: []byte("{\"max_tokens\":100,\"model\":\"my-model-12345\",\"prompt\":\"test1\",\"temperature\":0}"), - wantErr: false, + wantMetadata: makeMetadata("address-1:8000"), + wantBody: []byte("{\"max_tokens\":100,\"model\":\"my-model-12345\",\"prompt\":\"test1\",\"temperature\":0}"), + wantErr: false, }, { name: "select active lora, low queue", @@ -156,7 +148,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { wantHeaders: []*configPb.HeaderValueOption{ { Header: &configPb.HeaderValue{ - Key: runserver.DefaultTargetEndpointKey, + Key: runserver.DefaultDestinationEndpointHintKey, RawValue: []byte("address-1:8000"), }, }, @@ -167,17 +159,9 @@ func TestKubeInferenceModelRequest(t *testing.T) { }, }, }, - wantMetadata: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - runserver.DefaultTargetEndpointKey: { - Kind: &structpb.Value_StringValue{ - StringValue: "address-1:8000", - }, - }, - }, - }, - wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"test2\",\"temperature\":0}"), - wantErr: false, + wantMetadata: makeMetadata("address-1:8000"), + wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"test2\",\"temperature\":0}"), + wantErr: false, }, { name: "select no lora despite active model, avoid excessive queue size", @@ -213,7 +197,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { wantHeaders: []*configPb.HeaderValueOption{ { Header: &configPb.HeaderValue{ - Key: runserver.DefaultTargetEndpointKey, + Key: runserver.DefaultDestinationEndpointHintKey, RawValue: []byte("address-2:8000"), }, }, @@ -224,17 +208,9 @@ func TestKubeInferenceModelRequest(t *testing.T) { }, }, }, - wantMetadata: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - runserver.DefaultTargetEndpointKey: { - Kind: &structpb.Value_StringValue{ - StringValue: "address-2:8000", - }, - }, - }, - }, - wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"test3\",\"temperature\":0}"), - wantErr: false, + wantMetadata: makeMetadata("address-2:8000"), + wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"test3\",\"temperature\":0}"), + wantErr: false, }, { name: "noncritical and all models past threshold, shed request", @@ -312,7 +288,7 @@ func TestKubeInferenceModelRequest(t *testing.T) { wantHeaders: []*configPb.HeaderValueOption{ { Header: &configPb.HeaderValue{ - Key: runserver.DefaultTargetEndpointKey, + Key: runserver.DefaultDestinationEndpointHintKey, RawValue: []byte("address-0:8000"), }, }, @@ -323,17 +299,9 @@ func TestKubeInferenceModelRequest(t *testing.T) { }, }, }, - wantMetadata: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - runserver.DefaultTargetEndpointKey: { - Kind: &structpb.Value_StringValue{ - StringValue: "address-0:8000", - }, - }, - }, - }, - wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg3\",\"prompt\":\"test5\",\"temperature\":0}"), - wantErr: false, + wantMetadata: makeMetadata("address-0:8000"), + wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg3\",\"prompt\":\"test5\",\"temperature\":0}"), + wantErr: false, }, } @@ -555,3 +523,23 @@ func readDocuments(fp string) ([][]byte, error) { } return docs, nil } + +func makeMetadata(endpoint string) *structpb.Struct { + return &structpb.Struct{ + Fields: map[string]*structpb.Value{ + runserver.DefaultDestinationEndpointHintMetadataNamespace: { + Kind: &structpb.Value_StructValue{ + StructValue: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + runserver.DefaultDestinationEndpointHintKey: { + Kind: &structpb.Value_StringValue{ + StringValue: endpoint, + }, + }, + }, + }, + }, + }, + }, + } +}