Skip to content

Commit 2913da4

Browse files
ahg-gLiorLieberman
andauthored
Defining an outer metadata struct as part of the extproc endpoint picking protocol (#377)
* Defining an outer metadata struct as part of the extproc endpoint picking protocol * Apply suggestions from code review Update the protocol doc based on the suggested edits Co-authored-by: Lior Lieberman <[email protected]> * Updated the flag names --------- Co-authored-by: Lior Lieberman <[email protected]>
1 parent a78c768 commit 2913da4

File tree

7 files changed

+142
-106
lines changed

7 files changed

+142
-106
lines changed

cmd/epp/main.go

+20-14
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,15 @@ var (
6464
"The port used for gRPC liveness and readiness probes")
6565
metricsPort = flag.Int(
6666
"metricsPort", 9090, "The metrics port")
67-
targetEndpointKey = flag.String(
68-
"targetEndpointKey",
69-
runserver.DefaultTargetEndpointKey,
70-
"Header key used by Envoy to route to the appropriate pod. This must match Envoy configuration.")
67+
destinationEndpointHintKey = flag.String(
68+
"destinationEndpointHintKey",
69+
runserver.DefaultDestinationEndpointHintKey,
70+
"Header and response metadata key used by Envoy to route to the appropriate pod. This must match Envoy configuration.")
71+
destinationEndpointHintMetadataNamespace = flag.String(
72+
"DestinationEndpointHintMetadataNamespace",
73+
runserver.DefaultDestinationEndpointHintMetadataNamespace,
74+
"The key for the outer namespace struct in the metadata field of the extproc response that is used to wrap the"+
75+
"target endpoint. If not set, then an outer namespace struct should not be created.")
7176
poolName = flag.String(
7277
"poolName",
7378
runserver.DefaultPoolName,
@@ -145,16 +150,17 @@ func run() error {
145150
datastore := datastore.NewDatastore()
146151
provider := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore)
147152
serverRunner := &runserver.ExtProcServerRunner{
148-
GrpcPort: *grpcPort,
149-
TargetEndpointKey: *targetEndpointKey,
150-
PoolName: *poolName,
151-
PoolNamespace: *poolNamespace,
152-
RefreshMetricsInterval: *refreshMetricsInterval,
153-
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
154-
Datastore: datastore,
155-
SecureServing: *secureServing,
156-
CertPath: *certPath,
157-
Provider: provider,
153+
GrpcPort: *grpcPort,
154+
DestinationEndpointHintMetadataNamespace: *destinationEndpointHintMetadataNamespace,
155+
DestinationEndpointHintKey: *destinationEndpointHintKey,
156+
PoolName: *poolName,
157+
PoolNamespace: *poolNamespace,
158+
RefreshMetricsInterval: *refreshMetricsInterval,
159+
RefreshPrometheusMetricsInterval: *refreshPrometheusMetricsInterval,
160+
Datastore: datastore,
161+
SecureServing: *secureServing,
162+
CertPath: *certPath,
163+
Provider: provider,
158164
}
159165
if err := serverRunner.SetupWithManager(mgr); err != nil {
160166
setupLog.Error(err, "Failed to setup ext-proc server")

docs/proposals/003-endpoint-picker-protocol/README.md

+22-2
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,28 @@ This is the protocol between the EPP and the proxy (e.g, Envoy).
1111
The EPP MUST implement the Envoy
1212
[external processing service](https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/ext_proc/v3/external_processor)protocol.
1313

14-
For each HTTP request, the EPP MUST communicate to the proxy the picked model server endpoint, via
15-
adding the `x-gateway-destination-endpoint` HTTP header in the request and as an unstructured entry in the [dynamic_metadata](https://github.com/envoyproxy/go-control-plane/blob/c19bf63a811c90bf9e02f8e0dc1dcef94931ebb4/envoy/service/ext_proc/v3/external_processor.pb.go#L320) field of the ext-proc response, or otherwise return an error. The EPP MUST not set two different values in the header and the response metadata.
14+
For each HTTP request, the EPP MUST communicate to the proxy the picked model server endpoint via:
15+
16+
1. Setting the `x-gateway-destination-endpoint` HTTP header to the selected endpoint in <ip:port> format.
17+
18+
2. Set an unstructured entry in the [dynamic_metadata](https://github.com/envoyproxy/go-control-plane/blob/c19bf63a811c90bf9e02f8e0dc1dcef94931ebb4/envoy/service/ext_proc/v3/external_processor.pb.go#L320) field of the ext-proc response. The metadata entry for the picked endpoint MUST be wrapped with an outer key (which represents the metadata namespace) with a default of `envoy.lb`.
19+
20+
The final metadata necessary would look like:
21+
```go
22+
dynamicMetadata: {
23+
"envoy.lb": {
24+
"x-gateway-destination-endpoint": <ip:port>"
25+
}
26+
}
27+
```
28+
29+
Note:
30+
- If the EPP did not communicate the server endpoint via these two methods, it MUST return an error.
31+
- The EPP MUST not set two different values in the header and the inner response metadata value.
32+
33+
### Why envoy.lb namespace as a default?
34+
The `envoy.lb` namesapce is a predefined namespace used for subsetting. One common way to use the selected endpoint returned from the server, is [envoy subsets](https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/upstream/load_balancing/subsets) where host metadata for subset load balancing must be placed under `envoy.lb`.
35+
1636
Setting different value leads to unpredictable behavior because proxies aren't guaranteed to support both paths, and so this protocol does not define what takes precedence.
1737
1838
## Model Server Protocol

pkg/epp/handlers/request.go

+25-10
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ func (s *Server) HandleRequestBody(
119119
headers := []*configPb.HeaderValueOption{
120120
{
121121
Header: &configPb.HeaderValue{
122-
Key: s.targetEndpointKey,
122+
Key: s.destinationEndpointHintKey,
123123
RawValue: []byte(endpoint),
124124
},
125125
},
@@ -137,6 +137,29 @@ func (s *Server) HandleRequestBody(
137137
logger.V(logutil.DEBUG).Info("Request body header", "key", header.Header.Key, "value", header.Header.RawValue)
138138
}
139139

140+
targetEndpointValue := &structpb.Struct{
141+
Fields: map[string]*structpb.Value{
142+
s.destinationEndpointHintKey: {
143+
Kind: &structpb.Value_StringValue{
144+
StringValue: endpoint,
145+
},
146+
},
147+
},
148+
}
149+
dynamicMetadata := targetEndpointValue
150+
if s.destinationEndpointHintMetadataNamespace != "" {
151+
// If a namespace is defined, wrap the selected endpoint with that.
152+
dynamicMetadata = &structpb.Struct{
153+
Fields: map[string]*structpb.Value{
154+
s.destinationEndpointHintMetadataNamespace: {
155+
Kind: &structpb.Value_StructValue{
156+
StructValue: targetEndpointValue,
157+
},
158+
},
159+
},
160+
}
161+
}
162+
140163
resp := &extProcPb.ProcessingResponse{
141164
// The Endpoint Picker supports two approaches to communicating the target endpoint, as a request header
142165
// and as an unstructure ext-proc response metadata key/value pair. This enables different integration
@@ -155,15 +178,7 @@ func (s *Server) HandleRequestBody(
155178
},
156179
},
157180
},
158-
DynamicMetadata: &structpb.Struct{
159-
Fields: map[string]*structpb.Value{
160-
s.targetEndpointKey: {
161-
Kind: &structpb.Value_StringValue{
162-
StringValue: endpoint,
163-
},
164-
},
165-
},
166-
},
181+
DynamicMetadata: dynamicMetadata,
167182
}
168183
return resp, nil
169184
}

pkg/epp/handlers/server.go

+10-6
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,12 @@ import (
3434
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
3535
)
3636

37-
func NewServer(scheduler Scheduler, targetEndpointKey string, datastore datastore.Datastore) *Server {
37+
func NewServer(scheduler Scheduler, destinationEndpointHintMetadataNamespace, destinationEndpointHintKey string, datastore datastore.Datastore) *Server {
3838
return &Server{
39-
scheduler: scheduler,
40-
targetEndpointKey: targetEndpointKey,
41-
datastore: datastore,
39+
scheduler: scheduler,
40+
destinationEndpointHintMetadataNamespace: destinationEndpointHintMetadataNamespace,
41+
destinationEndpointHintKey: destinationEndpointHintKey,
42+
datastore: datastore,
4243
}
4344
}
4445

@@ -48,8 +49,11 @@ type Server struct {
4849
scheduler Scheduler
4950
// The key of the header to specify the target pod address. This value needs to match Envoy
5051
// configuration.
51-
targetEndpointKey string
52-
datastore datastore.Datastore
52+
destinationEndpointHintKey string
53+
// The key acting as the outer namespace struct in the metadata extproc response to communicate
54+
// back the picked endpoints.
55+
destinationEndpointHintMetadataNamespace string
56+
datastore datastore.Datastore
5357
}
5458

5559
type Scheduler interface {

pkg/epp/server/runserver.go

+28-25
Original file line numberDiff line numberDiff line change
@@ -45,38 +45,41 @@ import (
4545

4646
// ExtProcServerRunner provides methods to manage an external process server.
4747
type ExtProcServerRunner struct {
48-
GrpcPort int
49-
TargetEndpointKey string
50-
PoolName string
51-
PoolNamespace string
52-
RefreshMetricsInterval time.Duration
53-
RefreshPrometheusMetricsInterval time.Duration
54-
Datastore datastore.Datastore
55-
Provider *backend.Provider
56-
SecureServing bool
57-
CertPath string
48+
GrpcPort int
49+
DestinationEndpointHintMetadataNamespace string
50+
DestinationEndpointHintKey string
51+
PoolName string
52+
PoolNamespace string
53+
RefreshMetricsInterval time.Duration
54+
RefreshPrometheusMetricsInterval time.Duration
55+
Datastore datastore.Datastore
56+
Provider *backend.Provider
57+
SecureServing bool
58+
CertPath string
5859
}
5960

6061
// Default values for CLI flags in main
6162
const (
62-
DefaultGrpcPort = 9002 // default for --grpcPort
63-
DefaultTargetEndpointKey = "x-gateway-destination-endpoint" // default for --targetEndpointKey
64-
DefaultPoolName = "" // required but no default
65-
DefaultPoolNamespace = "default" // default for --poolNamespace
66-
DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval
67-
DefaultRefreshPrometheusMetricsInterval = 5 * time.Second // default for --refreshPrometheusMetricsInterval
68-
DefaultSecureServing = true // default for --secureServing
63+
DefaultGrpcPort = 9002 // default for --grpcPort
64+
DefaultDestinationEndpointHintMetadataNamespace = "envoy.lb" // default for --destinationEndpointHintMetadataNamespace
65+
DefaultDestinationEndpointHintKey = "x-gateway-destination-endpoint" // default for --destinationEndpointHintKey
66+
DefaultPoolName = "" // required but no default
67+
DefaultPoolNamespace = "default" // default for --poolNamespace
68+
DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval
69+
DefaultRefreshPrometheusMetricsInterval = 5 * time.Second // default for --refreshPrometheusMetricsInterval
70+
DefaultSecureServing = true // default for --secureServing
6971
)
7072

7173
func NewDefaultExtProcServerRunner() *ExtProcServerRunner {
7274
return &ExtProcServerRunner{
73-
GrpcPort: DefaultGrpcPort,
74-
TargetEndpointKey: DefaultTargetEndpointKey,
75-
PoolName: DefaultPoolName,
76-
PoolNamespace: DefaultPoolNamespace,
77-
RefreshMetricsInterval: DefaultRefreshMetricsInterval,
78-
RefreshPrometheusMetricsInterval: DefaultRefreshPrometheusMetricsInterval,
79-
SecureServing: DefaultSecureServing,
75+
GrpcPort: DefaultGrpcPort,
76+
DestinationEndpointHintKey: DefaultDestinationEndpointHintKey,
77+
DestinationEndpointHintMetadataNamespace: DefaultDestinationEndpointHintMetadataNamespace,
78+
PoolName: DefaultPoolName,
79+
PoolNamespace: DefaultPoolNamespace,
80+
RefreshMetricsInterval: DefaultRefreshMetricsInterval,
81+
RefreshPrometheusMetricsInterval: DefaultRefreshPrometheusMetricsInterval,
82+
SecureServing: DefaultSecureServing,
8083
// Datastore can be assigned later.
8184
}
8285
}
@@ -156,7 +159,7 @@ func (r *ExtProcServerRunner) AsRunnable(logger logr.Logger) manager.Runnable {
156159
}
157160
extProcPb.RegisterExternalProcessorServer(
158161
srv,
159-
handlers.NewServer(scheduling.NewScheduler(r.Datastore), r.TargetEndpointKey, r.Datastore),
162+
handlers.NewServer(scheduling.NewScheduler(r.Datastore), r.DestinationEndpointHintMetadataNamespace, r.DestinationEndpointHintKey, r.Datastore),
160163
)
161164

162165
// Forward to the gRPC runnable.

pkg/epp/test/utils.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func startExtProc(logger logr.Logger, port int, datastore datastore.Datastore) *
7979

8080
s := grpc.NewServer()
8181

82-
extProcPb.RegisterExternalProcessorServer(s, handlers.NewServer(scheduling.NewScheduler(datastore), "target-pod", datastore))
82+
extProcPb.RegisterExternalProcessorServer(s, handlers.NewServer(scheduling.NewScheduler(datastore), "", "target-pod", datastore))
8383

8484
logger.Info("gRPC server starting", "port", port)
8585
reflection.Register(s)

test/integration/hermetic_test.go

+36-48
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
100100
wantHeaders: []*configPb.HeaderValueOption{
101101
{
102102
Header: &configPb.HeaderValue{
103-
Key: runserver.DefaultTargetEndpointKey,
103+
Key: runserver.DefaultDestinationEndpointHintKey,
104104
RawValue: []byte("address-1:8000"),
105105
},
106106
},
@@ -111,17 +111,9 @@ func TestKubeInferenceModelRequest(t *testing.T) {
111111
},
112112
},
113113
},
114-
wantMetadata: &structpb.Struct{
115-
Fields: map[string]*structpb.Value{
116-
runserver.DefaultTargetEndpointKey: {
117-
Kind: &structpb.Value_StringValue{
118-
StringValue: "address-1:8000",
119-
},
120-
},
121-
},
122-
},
123-
wantBody: []byte("{\"max_tokens\":100,\"model\":\"my-model-12345\",\"prompt\":\"test1\",\"temperature\":0}"),
124-
wantErr: false,
114+
wantMetadata: makeMetadata("address-1:8000"),
115+
wantBody: []byte("{\"max_tokens\":100,\"model\":\"my-model-12345\",\"prompt\":\"test1\",\"temperature\":0}"),
116+
wantErr: false,
125117
},
126118
{
127119
name: "select active lora, low queue",
@@ -156,7 +148,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
156148
wantHeaders: []*configPb.HeaderValueOption{
157149
{
158150
Header: &configPb.HeaderValue{
159-
Key: runserver.DefaultTargetEndpointKey,
151+
Key: runserver.DefaultDestinationEndpointHintKey,
160152
RawValue: []byte("address-1:8000"),
161153
},
162154
},
@@ -167,17 +159,9 @@ func TestKubeInferenceModelRequest(t *testing.T) {
167159
},
168160
},
169161
},
170-
wantMetadata: &structpb.Struct{
171-
Fields: map[string]*structpb.Value{
172-
runserver.DefaultTargetEndpointKey: {
173-
Kind: &structpb.Value_StringValue{
174-
StringValue: "address-1:8000",
175-
},
176-
},
177-
},
178-
},
179-
wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"test2\",\"temperature\":0}"),
180-
wantErr: false,
162+
wantMetadata: makeMetadata("address-1:8000"),
163+
wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"test2\",\"temperature\":0}"),
164+
wantErr: false,
181165
},
182166
{
183167
name: "select no lora despite active model, avoid excessive queue size",
@@ -213,7 +197,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
213197
wantHeaders: []*configPb.HeaderValueOption{
214198
{
215199
Header: &configPb.HeaderValue{
216-
Key: runserver.DefaultTargetEndpointKey,
200+
Key: runserver.DefaultDestinationEndpointHintKey,
217201
RawValue: []byte("address-2:8000"),
218202
},
219203
},
@@ -224,17 +208,9 @@ func TestKubeInferenceModelRequest(t *testing.T) {
224208
},
225209
},
226210
},
227-
wantMetadata: &structpb.Struct{
228-
Fields: map[string]*structpb.Value{
229-
runserver.DefaultTargetEndpointKey: {
230-
Kind: &structpb.Value_StringValue{
231-
StringValue: "address-2:8000",
232-
},
233-
},
234-
},
235-
},
236-
wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"test3\",\"temperature\":0}"),
237-
wantErr: false,
211+
wantMetadata: makeMetadata("address-2:8000"),
212+
wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"test3\",\"temperature\":0}"),
213+
wantErr: false,
238214
},
239215
{
240216
name: "noncritical and all models past threshold, shed request",
@@ -312,7 +288,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
312288
wantHeaders: []*configPb.HeaderValueOption{
313289
{
314290
Header: &configPb.HeaderValue{
315-
Key: runserver.DefaultTargetEndpointKey,
291+
Key: runserver.DefaultDestinationEndpointHintKey,
316292
RawValue: []byte("address-0:8000"),
317293
},
318294
},
@@ -323,17 +299,9 @@ func TestKubeInferenceModelRequest(t *testing.T) {
323299
},
324300
},
325301
},
326-
wantMetadata: &structpb.Struct{
327-
Fields: map[string]*structpb.Value{
328-
runserver.DefaultTargetEndpointKey: {
329-
Kind: &structpb.Value_StringValue{
330-
StringValue: "address-0:8000",
331-
},
332-
},
333-
},
334-
},
335-
wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg3\",\"prompt\":\"test5\",\"temperature\":0}"),
336-
wantErr: false,
302+
wantMetadata: makeMetadata("address-0:8000"),
303+
wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg3\",\"prompt\":\"test5\",\"temperature\":0}"),
304+
wantErr: false,
337305
},
338306
}
339307

@@ -555,3 +523,23 @@ func readDocuments(fp string) ([][]byte, error) {
555523
}
556524
return docs, nil
557525
}
526+
527+
func makeMetadata(endpoint string) *structpb.Struct {
528+
return &structpb.Struct{
529+
Fields: map[string]*structpb.Value{
530+
runserver.DefaultDestinationEndpointHintMetadataNamespace: {
531+
Kind: &structpb.Value_StructValue{
532+
StructValue: &structpb.Struct{
533+
Fields: map[string]*structpb.Value{
534+
runserver.DefaultDestinationEndpointHintKey: {
535+
Kind: &structpb.Value_StringValue{
536+
StringValue: endpoint,
537+
},
538+
},
539+
},
540+
},
541+
},
542+
},
543+
},
544+
}
545+
}

0 commit comments

Comments
 (0)