Skip to content

Update default target-pod and inject it into response metadata #270

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/proposals/003-endpoint-picker-protocol/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ The EPP MUST implement the Envoy
[external processing service](https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/ext_proc/v3/external_processor)protocol.

For each HTTP request, the EPP MUST communicate to the proxy the picked model server endpoint, via
adding the `target-pod` HTTP header in the request, or otherwise return an error.
adding the `x-gateway-destination-endpoint` HTTP header in the request and as an unstructured entry in the [dynamic_metadata](https://github.com/envoyproxy/go-control-plane/blob/c19bf63a811c90bf9e02f8e0dc1dcef94931ebb4/envoy/service/ext_proc/v3/external_processor.pb.go#L320) field of the ext-proc response, or otherwise return an error. The EPP MUST not set two different values in the header and the response metadata.
Setting different value leads to unpredictable behavior because proxies aren't guaranteed to support both paths, and so this protocol does not define what takes precedence.

## Model Server Protocol

Expand Down Expand Up @@ -62,4 +63,4 @@ The model server MUST expose the following LoRA adapter metrics via the same Pro
Requests will be queued if the model server has reached MaxActiveAdapter and canno load the
requested adapter. Example: `"max_lora": "8"`.
* `running_lora_adapters`: A comma separated list of adapters that are currently loaded in GPU
memory and ready to serve requests. Example: `"running_lora_adapters": "adapter1, adapter2"`
memory and ready to serve requests. Example: `"running_lora_adapters": "adapter1, adapter2"`
17 changes: 15 additions & 2 deletions pkg/ext-proc/handlers/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

configPb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"google.golang.org/protobuf/types/known/structpb"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling"
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
Expand Down Expand Up @@ -81,11 +82,11 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
reqCtx.RequestSize = len(v.RequestBody.Body)
reqCtx.TargetPod = targetPod

// Insert "target-pod" to instruct Envoy to route requests to the specified target pod.
// Insert target endpoint to instruct Envoy to route requests to the specified target pod.
headers := []*configPb.HeaderValueOption{
{
Header: &configPb.HeaderValue{
Key: s.targetPodHeader,
Key: s.targetEndpointKey,
RawValue: []byte(targetPod.Address),
},
},
Expand All @@ -104,6 +105,9 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
}

resp := &extProcPb.ProcessingResponse{
// The Endpoint Picker supports two approaches to communicating the target endpoint, as a request header
// and as an unstructure ext-proc response metadata key/value pair. This enables different integration
// options for gateway providers.
Response: &extProcPb.ProcessingResponse_RequestBody{
RequestBody: &extProcPb.BodyResponse{
Response: &extProcPb.CommonResponse{
Expand All @@ -118,6 +122,15 @@ func (s *Server) HandleRequestBody(reqCtx *RequestContext, req *extProcPb.Proces
},
},
},
DynamicMetadata: &structpb.Struct{
Fields: map[string]*structpb.Value{

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This LGTM.

@yanavlasov PTAL as well.

s.targetEndpointKey: {
Kind: &structpb.Value_StringValue{
StringValue: targetPod.Address,
},
},
},
},
}
return resp, nil
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/ext-proc/handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (
klog "k8s.io/klog/v2"
)

func NewServer(pp PodProvider, scheduler Scheduler, targetPodHeader string, datastore ModelDataStore) *Server {
func NewServer(pp PodProvider, scheduler Scheduler, targetEndpointKey string, datastore ModelDataStore) *Server {
return &Server{
scheduler: scheduler,
podProvider: pp,
targetPodHeader: targetPodHeader,
datastore: datastore,
scheduler: scheduler,
podProvider: pp,
targetEndpointKey: targetEndpointKey,
datastore: datastore,
}
}

Expand All @@ -32,8 +32,8 @@ type Server struct {
podProvider PodProvider
// The key of the header to specify the target pod address. This value needs to match Envoy
// configuration.
targetPodHeader string
datastore ModelDataStore
targetEndpointKey string
datastore ModelDataStore
}

type Scheduler interface {
Expand Down
8 changes: 4 additions & 4 deletions pkg/ext-proc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ var (
"The port used for gRPC liveness and readiness probes")
metricsPort = flag.Int(
"metricsPort", 9090, "The metrics port")
targetPodHeader = flag.String(
"targetPodHeader",
runserver.DefaultTargetPodHeader,
targetEndpointKey = flag.String(
"targetEndpointKey",
runserver.DefaultTargetEndpointKey,
"Header key used by Envoy to route to the appropriate pod. This must match Envoy configuration.")
poolName = flag.String(
"poolName",
Expand Down Expand Up @@ -103,7 +103,7 @@ func main() {

serverRunner := &runserver.ExtProcServerRunner{
GrpcPort: *grpcPort,
TargetPodHeader: *targetPodHeader,
TargetEndpointKey: *targetEndpointKey,
PoolName: *poolName,
PoolNamespace: *poolNamespace,
ServiceName: *serviceName,
Expand Down
22 changes: 11 additions & 11 deletions pkg/ext-proc/server/runserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
// ExtProcServerRunner provides methods to manage an external process server.
type ExtProcServerRunner struct {
GrpcPort int
TargetPodHeader string
TargetEndpointKey string
PoolName string
PoolNamespace string
ServiceName string
Expand All @@ -35,20 +35,20 @@ type ExtProcServerRunner struct {

// Default values for CLI flags in main
const (
DefaultGrpcPort = 9002 // default for --grpcPort
DefaultTargetPodHeader = "target-pod" // default for --targetPodHeader
DefaultPoolName = "" // required but no default
DefaultPoolNamespace = "default" // default for --poolNamespace
DefaultServiceName = "" // required but no default
DefaultZone = "" // default for --zone
DefaultRefreshPodsInterval = 10 * time.Second // default for --refreshPodsInterval
DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval
DefaultGrpcPort = 9002 // default for --grpcPort
DefaultTargetEndpointKey = "x-gateway-destination-endpoint" // default for --targetEndpointKey
DefaultPoolName = "" // required but no default
DefaultPoolNamespace = "default" // default for --poolNamespace
DefaultServiceName = "" // required but no default
DefaultZone = "" // default for --zone
DefaultRefreshPodsInterval = 10 * time.Second // default for --refreshPodsInterval
DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval
)

func NewDefaultExtProcServerRunner() *ExtProcServerRunner {
return &ExtProcServerRunner{
GrpcPort: DefaultGrpcPort,
TargetPodHeader: DefaultTargetPodHeader,
TargetEndpointKey: DefaultTargetEndpointKey,
PoolName: DefaultPoolName,
PoolNamespace: DefaultPoolNamespace,
ServiceName: DefaultServiceName,
Expand Down Expand Up @@ -130,7 +130,7 @@ func (r *ExtProcServerRunner) Start(
// Register ext_proc handlers
extProcPb.RegisterExternalProcessorServer(
svr,
handlers.NewServer(pp, scheduling.NewScheduler(pp), r.TargetPodHeader, r.Datastore),
handlers.NewServer(pp, scheduling.NewScheduler(pp), r.TargetEndpointKey, r.Datastore),
)

// Blocking and will return when shutdown is complete.
Expand Down
4 changes: 2 additions & 2 deletions pkg/manifests/gateway/patch_policy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ spec:
type: ORIGINAL_DST
original_dst_lb_config:
use_http_header: true
http_header_name: "target-pod"
http_header_name: "x-gateway-destination-endpoint"
connect_timeout: 1000s
lb_policy: CLUSTER_PROVIDED
dns_lookup_family: V4_ONLY
Expand All @@ -40,4 +40,4 @@ spec:
operation:
op: replace
path: "/virtual_hosts/0/routes/0/route/cluster"
value: original_destination_cluster
value: original_destination_cluster
2 changes: 1 addition & 1 deletion pkg/manifests/vllm/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,4 @@ spec:
emptyDir:
medium: Memory
- name: adapters
emptyDir: {}
emptyDir: {}
26 changes: 19 additions & 7 deletions test/integration/hermetic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/structpb"
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
runserver "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/server"
Expand Down Expand Up @@ -111,7 +112,7 @@ func SKIPTestHandleRequestBody(t *testing.T) {
wantHeaders: []*configPb.HeaderValueOption{
{
Header: &configPb.HeaderValue{
Key: runserver.DefaultTargetPodHeader,
Key: runserver.DefaultTargetEndpointKey,
RawValue: []byte("address-1"),
},
},
Expand Down Expand Up @@ -162,11 +163,12 @@ func SKIPTestHandleRequestBody(t *testing.T) {

func TestKubeInferenceModelRequest(t *testing.T) {
tests := []struct {
name string
req *extProcPb.ProcessingRequest
wantHeaders []*configPb.HeaderValueOption
wantBody []byte
wantErr bool
name string
req *extProcPb.ProcessingRequest
wantHeaders []*configPb.HeaderValueOption
wantMetadata *structpb.Struct
wantBody []byte
wantErr bool
}{
{
name: "success",
Expand All @@ -176,7 +178,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
wantHeaders: []*configPb.HeaderValueOption{
{
Header: &configPb.HeaderValue{
Key: runserver.DefaultTargetPodHeader,
Key: runserver.DefaultTargetEndpointKey,
RawValue: []byte("address-1"),
},
},
Expand All @@ -187,6 +189,15 @@ func TestKubeInferenceModelRequest(t *testing.T) {
},
},
},
wantMetadata: &structpb.Struct{
Fields: map[string]*structpb.Value{
runserver.DefaultTargetEndpointKey: {
Kind: &structpb.Value_StringValue{
StringValue: "address-1",
},
},
},
},
wantBody: []byte("{\"max_tokens\":100,\"model\":\"sql-lora-1fdg2\",\"prompt\":\"hello\",\"temperature\":0}"),
wantErr: false,
},
Expand Down Expand Up @@ -249,6 +260,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
},
},
},
DynamicMetadata: test.wantMetadata,
}
res, err := sendRequest(t, client, test.req)

Expand Down
2 changes: 1 addition & 1 deletion test/testdata/envoy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ data:
max_requests: 40000
original_dst_lb_config:
use_http_header: true
http_header_name: target-pod
http_header_name: x-gateway-destination-endpoint
- name: ext_proc
type: STRICT_DNS
connect_timeout: 86400s
Expand Down