Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Defining an outer metadata struct as part of the extproc endpoint picking protocol #377

Merged
merged 3 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions cmd/epp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,15 @@ var (
"The port used for gRPC liveness and readiness probes")
metricsPort = flag.Int(
"metricsPort", 9090, "The metrics port")
targetEndpointKey = flag.String(
"targetEndpointKey",
runserver.DefaultTargetEndpointKey,
"Header key used by Envoy to route to the appropriate pod. This must match Envoy configuration.")
destinationEndpointHintKey = flag.String(
"destinationEndpointHintKey",
runserver.DefaultDestinationEndpointHintKey,
"Header and response metadata key used by Envoy to route to the appropriate pod. This must match Envoy configuration.")
destinationEndpointHintMetadataNamespace = flag.String(
"DestinationEndpointHintMetadataNamespace",
runserver.DefaultDestinationEndpointHintMetadataNamespace,
"The key for the outer namespace struct in the metadata field of the extproc response that is used to wrap the"+
"target endpoint. If not set, then an outer namespace struct should not be created.")
poolName = flag.String(
"poolName",
runserver.DefaultPoolName,
Expand Down Expand Up @@ -145,16 +150,17 @@ func run() error {
datastore := datastore.NewDatastore()
provider := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore)
serverRunner := &runserver.ExtProcServerRunner{
GrpcPort: *grpcPort,
TargetEndpointKey: *targetEndpointKey,
PoolName: *poolName,
PoolNamespace: *poolNamespace,
RefreshMetricsInterval: *refreshMetricsInterval,
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
Datastore: datastore,
SecureServing: *secureServing,
CertPath: *certPath,
Provider: provider,
GrpcPort: *grpcPort,
DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace,
DestinationEndpointHintKey: *destinationEndpointHintKey,
PoolName: *poolName,
PoolNamespace: *poolNamespace,
RefreshMetricsInterval: *refreshMetricsInterval,
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
Datastore: datastore,
SecureServing: *secureServing,
CertPath: *certPath,
Provider: provider,
}
if err := serverRunner.SetupWithManager(mgr); err != nil {
setupLog.Error(err, "Failed to setup ext-proc server")
Expand Down
24 changes: 22 additions & 2 deletions docs/proposals/003-endpoint-picker-protocol/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,28 @@ This is the protocol between the EPP and the proxy (e.g, Envoy).
The EPP MUST implement the Envoy
[external processing service](https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/ext_proc/v3/external_processor)protocol.

For each HTTP request, the EPP MUST communicate to the proxy the picked model server endpoint, via
adding the `x-gateway-destination-endpoint` HTTP header in the request and as an unstructured entry in the [dynamic_metadata](https://github.com/envoyproxy/go-control-plane/blob/c19bf63a811c90bf9e02f8e0dc1dcef94931ebb4/envoy/service/ext_proc/v3/external_processor.pb.go#L320) field of the ext-proc response, or otherwise return an error. The EPP MUST not set two different values in the header and the response metadata.
For each HTTP request, the EPP MUST communicate to the proxy the picked model server endpoint via:

1. Setting the `x-gateway-destination-endpoint` HTTP header to the selected endpoint in <ip:port> format.

2. Set an unstructured entry in the [dynamic_metadata](https://github.com/envoyproxy/go-control-plane/blob/c19bf63a811c90bf9e02f8e0dc1dcef94931ebb4/envoy/service/ext_proc/v3/external_processor.pb.go#L320) field of the ext-proc response. The metadata entry for the picked endpoint MUST be wrapped with an outer key (which represents the metadata namespace) with a default of `envoy.lb`.

The final metadata necessary would look like:
```go
dynamicMetadata: {
"envoy.lb": {
"x-gateway-destination-endpoint": <ip:port>"
}
}
```

Note:
- If the EPP did not communicate the server endpoint via these two methods, it MUST return an error.
- The EPP MUST not set two different values in the header and the inner response metadata value.

### Why envoy.lb namespace as a default?
The `envoy.lb` namesapce is a predefined namespace used for subsetting. One common way to use the selected endpoint returned from the server, is [envoy subsets](https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/upstream/load_balancing/subsets) where host metadata for subset load balancing must be placed under `envoy.lb`.

Setting different value leads to unpredictable behavior because proxies aren't guaranteed to support both paths, and so this protocol does not define what takes precedence.

## Model Server Protocol
Expand Down
35 changes: 25 additions & 10 deletions pkg/epp/handlers/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (s *Server) HandleRequestBody(
headers := []*configPb.HeaderValueOption{
{
Header: &configPb.HeaderValue{
Key: s.targetEndpointKey,
Key: s.destinationEndpointHintKey,
RawValue: []byte(endpoint),
},
},
Expand All @@ -137,6 +137,29 @@ func (s *Server) HandleRequestBody(
logger.V(logutil.DEBUG).Info("Request body header", "key", header.Header.Key, "value", header.Header.RawValue)
}

targetEndpointValue := &structpb.Struct{
Fields: map[string]*structpb.Value{
s.destinationEndpointHintKey: {
Kind: &structpb.Value_StringValue{
StringValue: endpoint,
},
},
},
}
dynamicMetadata := targetEndpointValue
if s.destinationEndpointHintMetadataNamespace != "" {
// If a namespace is defined, wrap the selected endpoint with that.
dynamicMetadata = &structpb.Struct{
Fields: map[string]*structpb.Value{
s.destinationEndpointHintMetadataNamespace: {
Kind: &structpb.Value_StructValue{
StructValue: targetEndpointValue,
},
},
},
}
}

resp := &extProcPb.ProcessingResponse{
// The Endpoint Picker supports two approaches to communicating the target endpoint, as a request header
// and as an unstructure ext-proc response metadata key/value pair. This enables different integration
Expand All @@ -155,15 +178,7 @@ func (s *Server) HandleRequestBody(
},
},
},
DynamicMetadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
s.targetEndpointKey: {
Kind: &structpb.Value_StringValue{
StringValue: endpoint,
},
},
},
},
DynamicMetadata: dynamicMetadata,
}
return resp, nil
}
Expand Down
16 changes: 10 additions & 6 deletions pkg/epp/handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ import (
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

func NewServer(scheduler Scheduler, targetEndpointKey string, datastore datastore.Datastore) *Server {
func NewServer(scheduler Scheduler, destinationEndpointHintMetadataNamespace, destinationEndpointHintKey string, datastore datastore.Datastore) *Server {
return &Server{
scheduler: scheduler,
targetEndpointKey: targetEndpointKey,
datastore: datastore,
scheduler: scheduler,
destinationEndpointHintMetadataNamespace: destinationEndpointHintMetadataNamespace,
destinationEndpointHintKey: destinationEndpointHintKey,
datastore: datastore,
}
}

Expand All @@ -48,8 +49,11 @@ type Server struct {
scheduler Scheduler
// The key of the header to specify the target pod address. This value needs to match Envoy
// configuration.
targetEndpointKey string
datastore datastore.Datastore
destinationEndpointHintKey string
// The key acting as the outer namespace struct in the metadata extproc response to communicate
// back the picked endpoints.
destinationEndpointHintMetadataNamespace string
datastore datastore.Datastore
}

type Scheduler interface {
Expand Down
53 changes: 28 additions & 25 deletions pkg/epp/server/runserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,38 +45,41 @@ import (

// ExtProcServerRunner provides methods to manage an external process server.
type ExtProcServerRunner struct {
GrpcPort int
TargetEndpointKey string
PoolName string
PoolNamespace string
RefreshMetricsInterval time.Duration
RefreshPrometheusMetricsInterval time.Duration
Datastore datastore.Datastore
Provider *backend.Provider
SecureServing bool
CertPath string
GrpcPort int
DestinationEndpointHintMetadataNamespace string
DestinationEndpointHintKey string
PoolName string
PoolNamespace string
RefreshMetricsInterval time.Duration
RefreshPrometheusMetricsInterval time.Duration
Datastore datastore.Datastore
Provider *backend.Provider
SecureServing bool
CertPath string
}

// Default values for CLI flags in main
const (
DefaultGrpcPort = 9002 // default for --grpcPort
DefaultTargetEndpointKey = "x-gateway-destination-endpoint" // default for --targetEndpointKey
DefaultPoolName = "" // required but no default
DefaultPoolNamespace = "default" // default for --poolNamespace
DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval
DefaultRefreshPrometheusMetricsInterval = 5 * time.Second // default for --refreshPrometheusMetricsInterval
DefaultSecureServing = true // default for --secureServing
DefaultGrpcPort = 9002 // default for --grpcPort
DefaultDestinationEndpointHintMetadataNamespace = "envoy.lb" // default for --destinationEndpointHintMetadataNamespace
DefaultDestinationEndpointHintKey = "x-gateway-destination-endpoint" // default for --destinationEndpointHintKey
DefaultPoolName = "" // required but no default
DefaultPoolNamespace = "default" // default for --poolNamespace
DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval
DefaultRefreshPrometheusMetricsInterval = 5 * time.Second // default for --refreshPrometheusMetricsInterval
DefaultSecureServing = true // default for --secureServing
)

func NewDefaultExtProcServerRunner() *ExtProcServerRunner {
return &ExtProcServerRunner{
GrpcPort: DefaultGrpcPort,
TargetEndpointKey: DefaultTargetEndpointKey,
PoolName: DefaultPoolName,
PoolNamespace: DefaultPoolNamespace,
RefreshMetricsInterval: DefaultRefreshMetricsInterval,
RefreshPrometheusMetricsInterval: DefaultRefreshPrometheusMetricsInterval,
SecureServing: DefaultSecureServing,
GrpcPort: DefaultGrpcPort,
DestinationEndpointHintKey: DefaultDestinationEndpointHintKey,
DestinationEndpointHintMetadataNamespace: DefaultDestinationEndpointHintMetadataNamespace,
PoolName: DefaultPoolName,
PoolNamespace: DefaultPoolNamespace,
RefreshMetricsInterval: DefaultRefreshMetricsInterval,
RefreshPrometheusMetricsInterval: DefaultRefreshPrometheusMetricsInterval,
SecureServing: DefaultSecureServing,
// Datastore can be assigned later.
}
}
Expand Down Expand Up @@ -156,7 +159,7 @@ func (r *ExtProcServerRunner) AsRunnable(logger logr.Logger) manager.Runnable {
}
extProcPb.RegisterExternalProcessorServer(
srv,
handlers.NewServer(scheduling.NewScheduler(r.Datastore), r.TargetEndpointKey, r.Datastore),
handlers.NewServer(scheduling.NewScheduler(r.Datastore), r.DestinationEndpointHintMetadataNamespace, r.DestinationEndpointHintKey, r.Datastore),
)

// Forward to the gRPC runnable.
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/test/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func startExtProc(logger logr.Logger, port int, datastore datastore.Datastore) *

s := grpc.NewServer()

extProcPb.RegisterExternalProcessorServer(s, handlers.NewServer(scheduling.NewScheduler(datastore), "target-pod", datastore))
extProcPb.RegisterExternalProcessorServer(s, handlers.NewServer(scheduling.NewScheduler(datastore), "", "target-pod", datastore))

logger.Info("gRPC server starting", "port", port)
reflection.Register(s)
Expand Down
84 changes: 36 additions & 48 deletions test/integration/hermetic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
wantHeaders: []*configPb.HeaderValueOption{
{
Header: &configPb.HeaderValue{
Key: runserver.DefaultTargetEndpointKey,
Key: runserver.DefaultDestinationEndpointHintKey,
RawValue: []byte("address-1:8000"),
},
},
Expand All @@ -111,17 +111,9 @@ func TestKubeInferenceModelRequest(t *testing.T) {
},
},
},
wantMetadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
runserver.DefaultTargetEndpointKey: {
Kind: &structpb.Value_StringValue{
StringValue: "address-1:8000",
},
},
},
},
wantBody: []byte("{\"max_tokens\":100,\"model\":\"my-model-12345\",\"prompt\":\"test1\",\"temperature\":0}"),
wantErr: false,
wantMetadata: makeMetadata("address-1:8000"),
wantBody: []byte("{\"max_tokens\":100,\"model\":\"my-model-12345\",\"prompt\":\"test1\",\"temperature\":0}"),
wantErr: false,
},
{
name: "select active lora, low queue",
Expand Down Expand Up @@ -156,7 +148,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
wantHeaders: []*configPb.HeaderValueOption{
{
Header: &configPb.HeaderValue{
Key: runserver.DefaultTargetEndpointKey,
Key: runserver.DefaultDestinationEndpointHintKey,
RawValue: []byte("address-1:8000"),
},
},
Expand All @@ -167,17 +159,9 @@ func TestKubeInferenceModelRequest(t *testing.T) {
},
},
},
wantMetadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
runserver.DefaultTargetEndpointKey: {
Kind: &structpb.Value_StringValue{
StringValue: "address-1:8000",
},
},
},
},
wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"test2\",\"temperature\":0}"),
wantErr: false,
wantMetadata: makeMetadata("address-1:8000"),
wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"test2\",\"temperature\":0}"),
wantErr: false,
},
{
name: "select no lora despite active model, avoid excessive queue size",
Expand Down Expand Up @@ -213,7 +197,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
wantHeaders: []*configPb.HeaderValueOption{
{
Header: &configPb.HeaderValue{
Key: runserver.DefaultTargetEndpointKey,
Key: runserver.DefaultDestinationEndpointHintKey,
RawValue: []byte("address-2:8000"),
},
},
Expand All @@ -224,17 +208,9 @@ func TestKubeInferenceModelRequest(t *testing.T) {
},
},
},
wantMetadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
runserver.DefaultTargetEndpointKey: {
Kind: &structpb.Value_StringValue{
StringValue: "address-2:8000",
},
},
},
},
wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"test3\",\"temperature\":0}"),
wantErr: false,
wantMetadata: makeMetadata("address-2:8000"),
wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"test3\",\"temperature\":0}"),
wantErr: false,
},
{
name: "noncritical and all models past threshold, shed request",
Expand Down Expand Up @@ -312,7 +288,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
wantHeaders: []*configPb.HeaderValueOption{
{
Header: &configPb.HeaderValue{
Key: runserver.DefaultTargetEndpointKey,
Key: runserver.DefaultDestinationEndpointHintKey,
RawValue: []byte("address-0:8000"),
},
},
Expand All @@ -323,17 +299,9 @@ func TestKubeInferenceModelRequest(t *testing.T) {
},
},
},
wantMetadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
runserver.DefaultTargetEndpointKey: {
Kind: &structpb.Value_StringValue{
StringValue: "address-0:8000",
},
},
},
},
wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg3\",\"prompt\":\"test5\",\"temperature\":0}"),
wantErr: false,
wantMetadata: makeMetadata("address-0:8000"),
wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg3\",\"prompt\":\"test5\",\"temperature\":0}"),
wantErr: false,
},
}

Expand Down Expand Up @@ -555,3 +523,23 @@ func readDocuments(fp string) ([][]byte, error) {
}
return docs, nil
}

func makeMetadata(endpoint string) *structpb.Struct {
return &structpb.Struct{
Fields: map[string]*structpb.Value{
runserver.DefaultDestinationEndpointHintMetadataNamespace: {
Kind: &structpb.Value_StructValue{
StructValue: &structpb.Struct{
Fields: map[string]*structpb.Value{
runserver.DefaultDestinationEndpointHintKey: {
Kind: &structpb.Value_StringValue{
StringValue: endpoint,
},
},
},
},
},
},
},
}
}