Skip to content

Commit aeac870

Browse files
committed
Defining an outer metadata struct as part of the extproc endpoint picking protocol
1 parent 2577f63 commit aeac870

File tree

7 files changed

+87
-69
lines changed

7 files changed

+87
-69
lines changed

cmd/epp/main.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,12 @@ var (
6767
targetEndpointKey = flag.String(
6868
"targetEndpointKey",
6969
runserver.DefaultTargetEndpointKey,
70-
"Header key used by Envoy to route to the appropriate pod. This must match Envoy configuration.")
70+
"Header and response metadata key used by Envoy to route to the appropriate pod. This must match Envoy configuration.")
71+
targetEndpointOuterMetadataKey = flag.String(
72+
"TargetEndpointOuterMetadataKey",
73+
runserver.DefaultTargetEndpointOuterMetadataKey,
74+
"The key for the outer namespace struct in the metadata field of the extproc response that is used to wrap the"+
75+
"target endpoint. If not set, then an outer namespace struct should not be created.")
7176
poolName = flag.String(
7277
"poolName",
7378
runserver.DefaultPoolName,
@@ -146,6 +151,7 @@ func run() error {
146151
provider := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore)
147152
serverRunner := &runserver.ExtProcServerRunner{
148153
GrpcPort: *grpcPort,
154+
TargetEndpointOuterMetadataKey: *targetEndpointOuterMetadataKey,
149155
TargetEndpointKey: *targetEndpointKey,
150156
PoolName: *poolName,
151157
PoolNamespace: *poolNamespace,

docs/proposals/003-endpoint-picker-protocol/README.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ The EPP MUST implement the Envoy
1212
[external processing service](https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/ext_proc/v3/external_processor)protocol.
1313

1414
For each HTTP request, the EPP MUST communicate to the proxy the picked model server endpoint, via
15-
adding the `x-gateway-destination-endpoint` HTTP header in the request and as an unstructured entry in the [dynamic_metadata](https://github.com/envoyproxy/go-control-plane/blob/c19bf63a811c90bf9e02f8e0dc1dcef94931ebb4/envoy/service/ext_proc/v3/external_processor.pb.go#L320) field of the ext-proc response, or otherwise return an error. The EPP MUST not set two different values in the header and the response metadata.
15+
adding the `x-gateway-destination-endpoint` HTTP header in the request and as an unstructured entry in the [dynamic_metadata](https://github.com/envoyproxy/go-control-plane/blob/c19bf63a811c90bf9e02f8e0dc1dcef94931ebb4/envoy/service/ext_proc/v3/external_processor.pb.go#L320) field of the ext-proc response, or otherwise return an error. The metadata entry for the picked endpoint MUST be wrapped with an outer key named `gateway-destination-endpoint.dynamic_forwarding.selected_endpoints`.
16+
17+
The EPP MUST not set two different values in the header and the response metadata.
1618
Setting different value leads to unpredictable behavior because proxies aren't guaranteed to support both paths, and so this protocol does not define what takes precedence.
1719

1820
## Model Server Protocol

pkg/epp/handlers/request.go

+24-9
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,29 @@ func (s *Server) HandleRequestBody(
137137
logger.V(logutil.DEBUG).Info("Request body header", "key", header.Header.Key, "value", header.Header.RawValue)
138138
}
139139

140+
targetEndpointValue := &structpb.Struct{
141+
Fields: map[string]*structpb.Value{
142+
s.targetEndpointKey: {
143+
Kind: &structpb.Value_StringValue{
144+
StringValue: endpoint,
145+
},
146+
},
147+
},
148+
}
149+
dynamicMetadata := targetEndpointValue
150+
if s.targetEndpointOuterMetadataKey != "" {
151+
// If a namespace is defined, wrap the selected endpoint with that.
152+
dynamicMetadata = &structpb.Struct{
153+
Fields: map[string]*structpb.Value{
154+
s.targetEndpointOuterMetadataKey: {
155+
Kind: &structpb.Value_StructValue{
156+
StructValue: targetEndpointValue,
157+
},
158+
},
159+
},
160+
}
161+
}
162+
140163
resp := &extProcPb.ProcessingResponse{
141164
// The Endpoint Picker supports two approaches to communicating the target endpoint, as a request header
142165
// and as an unstructure ext-proc response metadata key/value pair. This enables different integration
@@ -155,15 +178,7 @@ func (s *Server) HandleRequestBody(
155178
},
156179
},
157180
},
158-
DynamicMetadata: &structpb.Struct{
159-
Fields: map[string]*structpb.Value{
160-
s.targetEndpointKey: {
161-
Kind: &structpb.Value_StringValue{
162-
StringValue: endpoint,
163-
},
164-
},
165-
},
166-
},
181+
DynamicMetadata: dynamicMetadata,
167182
}
168183
return resp, nil
169184
}

pkg/epp/handlers/server.go

+9-5
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,12 @@ import (
3434
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3535
)
3636

37-
func NewServer(scheduler Scheduler, targetEndpointKey string, datastore datastore.Datastore) *Server {
37+
func NewServer(scheduler Scheduler, targetEndpointOuterMetadataKey, targetEndpointKey string, datastore datastore.Datastore) *Server {
3838
return &Server{
39-
scheduler: scheduler,
40-
targetEndpointKey: targetEndpointKey,
41-
datastore: datastore,
39+
scheduler: scheduler,
40+
targetEndpointOuterMetadataKey: targetEndpointOuterMetadataKey,
41+
targetEndpointKey: targetEndpointKey,
42+
datastore: datastore,
4243
}
4344
}
4445

@@ -49,7 +50,10 @@ type Server struct {
4950
// The key of the header to specify the target pod address. This value needs to match Envoy
5051
// configuration.
5152
targetEndpointKey string
52-
datastore datastore.Datastore
53+
// The key acting as the outer namespace struct in the metadata extproc response to communicate
54+
// back the picked endpoints.
55+
targetEndpointOuterMetadataKey string
56+
datastore datastore.Datastore
5357
}
5458

5559
type Scheduler interface {

pkg/epp/server/runserver.go

+11-8
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
// ExtProcServerRunner provides methods to manage an external process server.
4747
type ExtProcServerRunner struct {
4848
GrpcPort int
49+
TargetEndpointOuterMetadataKey string
4950
TargetEndpointKey string
5051
PoolName string
5152
PoolNamespace string
@@ -59,19 +60,21 @@ type ExtProcServerRunner struct {
5960

6061
// Default values for CLI flags in main
6162
const (
62-
DefaultGrpcPort = 9002 // default for --grpcPort
63-
DefaultTargetEndpointKey = "x-gateway-destination-endpoint" // default for --targetEndpointKey
64-
DefaultPoolName = "" // required but no default
65-
DefaultPoolNamespace = "default" // default for --poolNamespace
66-
DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval
67-
DefaultRefreshPrometheusMetricsInterval = 5 * time.Second // default for --refreshPrometheusMetricsInterval
68-
DefaultSecureServing = true // default for --secureServing
63+
DefaultGrpcPort = 9002 // default for --grpcPort
64+
DefaultTargetEndpointOuterMetadataKey = "gateway-destination-endpoint.dynamic_forwarding.selected_endpoints" // default for --targetEndpointOuterMetadataKey
65+
DefaultTargetEndpointKey = "x-gateway-destination-endpoint" // default for --targetEndpointKey
66+
DefaultPoolName = "" // required but no default
67+
DefaultPoolNamespace = "default" // default for --poolNamespace
68+
DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval
69+
DefaultRefreshPrometheusMetricsInterval = 5 * time.Second // default for --refreshPrometheusMetricsInterval
70+
DefaultSecureServing = true // default for --secureServing
6971
)
7072

7173
func NewDefaultExtProcServerRunner() *ExtProcServerRunner {
7274
return &ExtProcServerRunner{
7375
GrpcPort: DefaultGrpcPort,
7476
TargetEndpointKey: DefaultTargetEndpointKey,
77+
TargetEndpointOuterMetadataKey: DefaultTargetEndpointOuterMetadataKey,
7578
PoolName: DefaultPoolName,
7679
PoolNamespace: DefaultPoolNamespace,
7780
RefreshMetricsInterval: DefaultRefreshMetricsInterval,
@@ -156,7 +159,7 @@ func (r *ExtProcServerRunner) AsRunnable(logger logr.Logger) manager.Runnable {
156159
}
157160
extProcPb.RegisterExternalProcessorServer(
158161
srv,
159-
handlers.NewServer(scheduling.NewScheduler(r.Datastore), r.TargetEndpointKey, r.Datastore),
162+
handlers.NewServer(scheduling.NewScheduler(r.Datastore), r.TargetEndpointOuterMetadataKey, r.TargetEndpointKey, r.Datastore),
160163
)
161164

162165
// Forward to the gRPC runnable.

pkg/epp/test/utils.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func startExtProc(logger logr.Logger, port int, datastore datastore.Datastore) *
7979

8080
s := grpc.NewServer()
8181

82-
extProcPb.RegisterExternalProcessorServer(s, handlers.NewServer(scheduling.NewScheduler(datastore), "target-pod", datastore))
82+
extProcPb.RegisterExternalProcessorServer(s, handlers.NewServer(scheduling.NewScheduler(datastore), "", "target-pod", datastore))
8383

8484
logger.Info("gRPC server starting", "port", port)
8585
reflection.Register(s)

test/integration/hermetic_test.go

+32-44
Original file line numberDiff line numberDiff line change
@@ -111,17 +111,9 @@ func TestKubeInferenceModelRequest(t *testing.T) {
111111
},
112112
},
113113
},
114-
wantMetadata: &structpb.Struct{
115-
Fields: map[string]*structpb.Value{
116-
runserver.DefaultTargetEndpointKey: {
117-
Kind: &structpb.Value_StringValue{
118-
StringValue: "address-1:8000",
119-
},
120-
},
121-
},
122-
},
123-
wantBody: []byte("{\"max_tokens\":100,\"model\":\"my-model-12345\",\"prompt\":\"test1\",\"temperature\":0}"),
124-
wantErr: false,
114+
wantMetadata: makeMetadata("address-1:8000"),
115+
wantBody: []byte("{\"max_tokens\":100,\"model\":\"my-model-12345\",\"prompt\":\"test1\",\"temperature\":0}"),
116+
wantErr: false,
125117
},
126118
{
127119
name: "select active lora, low queue",
@@ -167,17 +159,9 @@ func TestKubeInferenceModelRequest(t *testing.T) {
167159
},
168160
},
169161
},
170-
wantMetadata: &structpb.Struct{
171-
Fields: map[string]*structpb.Value{
172-
runserver.DefaultTargetEndpointKey: {
173-
Kind: &structpb.Value_StringValue{
174-
StringValue: "address-1:8000",
175-
},
176-
},
177-
},
178-
},
179-
wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"test2\",\"temperature\":0}"),
180-
wantErr: false,
162+
wantMetadata: makeMetadata("address-1:8000"),
163+
wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"test2\",\"temperature\":0}"),
164+
wantErr: false,
181165
},
182166
{
183167
name: "select no lora despite active model, avoid excessive queue size",
@@ -224,17 +208,9 @@ func TestKubeInferenceModelRequest(t *testing.T) {
224208
},
225209
},
226210
},
227-
wantMetadata: &structpb.Struct{
228-
Fields: map[string]*structpb.Value{
229-
runserver.DefaultTargetEndpointKey: {
230-
Kind: &structpb.Value_StringValue{
231-
StringValue: "address-2:8000",
232-
},
233-
},
234-
},
235-
},
236-
wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"test3\",\"temperature\":0}"),
237-
wantErr: false,
211+
wantMetadata: makeMetadata("address-2:8000"),
212+
wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"test3\",\"temperature\":0}"),
213+
wantErr: false,
238214
},
239215
{
240216
name: "noncritical and all models past threshold, shed request",
@@ -323,17 +299,9 @@ func TestKubeInferenceModelRequest(t *testing.T) {
323299
},
324300
},
325301
},
326-
wantMetadata: &structpb.Struct{
327-
Fields: map[string]*structpb.Value{
328-
runserver.DefaultTargetEndpointKey: {
329-
Kind: &structpb.Value_StringValue{
330-
StringValue: "address-0:8000",
331-
},
332-
},
333-
},
334-
},
335-
wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg3\",\"prompt\":\"test5\",\"temperature\":0}"),
336-
wantErr: false,
302+
wantMetadata: makeMetadata("address-0:8000"),
303+
wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg3\",\"prompt\":\"test5\",\"temperature\":0}"),
304+
wantErr: false,
337305
},
338306
}
339307

@@ -555,3 +523,23 @@ func readDocuments(fp string) ([][]byte, error) {
555523
}
556524
return docs, nil
557525
}
526+
527+
func makeMetadata(endpoint string) *structpb.Struct {
528+
return &structpb.Struct{
529+
Fields: map[string]*structpb.Value{
530+
runserver.DefaultTargetEndpointOuterMetadataKey: {
531+
Kind: &structpb.Value_StructValue{
532+
StructValue: &structpb.Struct{
533+
Fields: map[string]*structpb.Value{
534+
runserver.DefaultTargetEndpointKey: {
535+
Kind: &structpb.Value_StringValue{
536+
StringValue: endpoint,
537+
},
538+
},
539+
},
540+
},
541+
},
542+
},
543+
},
544+
}
545+
}

0 commit comments

Comments
 (0)