Skip to content

Commit e5a3458

Browse files
committed
Add code for Envoy extension that support body-to-header translation
1 parent 2577f63 commit e5a3458

File tree

9 files changed

+689
-0
lines changed

9 files changed

+689
-0
lines changed

body-based-routing.Dockerfile

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Dockerfile has specific requirement to put this ARG at the beginning:
2+
# https://docs.docker.com/engine/reference/builder/#understand-how-arg-and-from-interact
3+
ARG BUILDER_IMAGE=golang:1.23-alpine
4+
ARG BASE_IMAGE=gcr.io/distroless/base-debian10
5+
6+
## Multistage build
7+
FROM ${BUILDER_IMAGE} AS builder
8+
ENV CGO_ENABLED=0
9+
ENV GOOS=linux
10+
ENV GOARCH=amd64
11+
12+
# Dependencies
13+
WORKDIR /src
14+
COPY go.mod go.sum ./
15+
RUN go mod download
16+
17+
# Sources
18+
COPY cmd ./cmd
19+
COPY pkg ./pkg
20+
COPY internal ./internal
21+
WORKDIR /src/cmd/body-based-routing
22+
RUN go build -o /body-based-routing
23+
24+
## Multistage deploy
25+
FROM ${BASE_IMAGE}
26+
27+
WORKDIR /
28+
COPY --from=builder /body-based-routing /body-based-routing
29+
30+
ENTRYPOINT ["/body-based-routing"]

cmd/body-based-routing/health.go

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"context"
21+
22+
"github.com/go-logr/logr"
23+
"google.golang.org/grpc/codes"
24+
healthPb "google.golang.org/grpc/health/grpc_health_v1"
25+
"google.golang.org/grpc/status"
26+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
27+
)
28+
29+
type healthServer struct {
30+
logger logr.Logger
31+
}
32+
33+
func (s *healthServer) Check(ctx context.Context, in *healthPb.HealthCheckRequest) (*healthPb.HealthCheckResponse, error) {
34+
s.logger.V(logutil.VERBOSE).Info("gRPC health check serving", "service", in.Service)
35+
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil
36+
}
37+
38+
func (s *healthServer) Watch(in *healthPb.HealthCheckRequest, srv healthPb.Health_WatchServer) error {
39+
return status.Error(codes.Unimplemented, "Watch is not implemented")
40+
}

cmd/body-based-routing/main.go

+122
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"flag"
21+
"os"
22+
23+
"github.com/go-logr/logr"
24+
uberzap "go.uber.org/zap"
25+
"google.golang.org/grpc"
26+
healthPb "google.golang.org/grpc/health/grpc_health_v1"
27+
ctrl "sigs.k8s.io/controller-runtime"
28+
"sigs.k8s.io/controller-runtime/pkg/log/zap"
29+
"sigs.k8s.io/controller-runtime/pkg/manager"
30+
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
31+
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/body-based-routing/server"
32+
)
33+
34+
var (
35+
grpcPort = flag.Int(
36+
"grpcPort",
37+
runserver.DefaultGrpcPort,
38+
"The gRPC port used for communicating with Envoy proxy")
39+
grpcHealthPort = flag.Int(
40+
"grpcHealthPort",
41+
9003,
42+
"The port used for gRPC liveness and readiness probes")
43+
44+
setupLog = ctrl.Log.WithName("setup")
45+
)
46+
47+
func main() {
48+
if err := run(); err != nil {
49+
os.Exit(1)
50+
}
51+
}
52+
53+
func run() error {
54+
opts := zap.Options{Development: true}
55+
opts.BindFlags(flag.CommandLine)
56+
flag.Parse()
57+
initLogging(&opts)
58+
59+
// Print all flag values
60+
flags := make(map[string]any)
61+
flag.VisitAll(func(f *flag.Flag) {
62+
flags[f.Name] = f.Value
63+
})
64+
setupLog.Info("Flags processed", "flags", flags)
65+
66+
// Init runtime.
67+
cfg, err := ctrl.GetConfig()
68+
if err != nil {
69+
setupLog.Error(err, "Failed to get rest config")
70+
return err
71+
}
72+
73+
mgr, err := ctrl.NewManager(cfg, ctrl.Options{})
74+
if err != nil {
75+
setupLog.Error(err, "Failed to create manager", "config", cfg)
76+
return err
77+
}
78+
79+
ctx := ctrl.SetupSignalHandler()
80+
81+
// Setup runner.
82+
serverRunner := &runserver.ExtProcServerRunner{GrpcPort: *grpcPort}
83+
84+
// Register health server.
85+
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), *grpcHealthPort); err != nil {
86+
return err
87+
}
88+
89+
// Register ext-proc server.
90+
if err := mgr.Add(serverRunner.AsRunnable(ctrl.Log.WithName("ext-proc"))); err != nil {
91+
setupLog.Error(err, "Failed to register ext-proc gRPC server")
92+
return err
93+
}
94+
95+
// Start the manager. This blocks until a signal is received.
96+
setupLog.Info("Manager starting")
97+
if err := mgr.Start(ctx); err != nil {
98+
setupLog.Error(err, "Error starting manager")
99+
return err
100+
}
101+
setupLog.Info("Manager terminated")
102+
return nil
103+
}
104+
105+
// registerHealthServer adds the Health gRPC server as a Runnable to the given manager.
106+
func registerHealthServer(mgr manager.Manager, logger logr.Logger, port int) error {
107+
srv := grpc.NewServer()
108+
healthPb.RegisterHealthServer(srv, &healthServer{
109+
logger: logger,
110+
})
111+
if err := mgr.Add(
112+
runnable.NoLeaderElection(runnable.GRPCServer("health", srv, port))); err != nil {
113+
setupLog.Error(err, "Failed to register health server")
114+
return err
115+
}
116+
return nil
117+
}
118+
119+
func initLogging(opts *zap.Options) {
120+
logger := zap.New(zap.UseFlagOptions(opts), zap.RawZapOpts(uberzap.AddCaller()))
121+
ctrl.SetLogger(logger)
122+
}

pkg/body-based-routing/README.md

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# Body-Based Routing
2+
This package provides an extension that can be deployed to write the `model`
3+
HTTP body parameter as a header so as to enable routing capabilities on the
4+
model name.
5+
6+
As per OpenAI spec, it is standard for the model name to be included in the
7+
body of the HTTP request. However, most implementations do not support routing
8+
based on the request body. This extension helps bridge the gap for clients that
9+
This extension works by parsing the request body. If it finds a `model` parameter in the
10+
request body, it will copy the value of that parameter into a request header.
11+
12+
This extension is intended to be paired with an `ext_proc` capable Gateway. There is not
13+
a standard way to represent this kind of extension in Gateway API yet, so we recommend
14+
referring to implementation-specific documentation for how to deploy this extension.
+97
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package handlers
18+
19+
import (
20+
"context"
21+
"encoding/json"
22+
"fmt"
23+
24+
basepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
25+
eppb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
26+
"sigs.k8s.io/controller-runtime/pkg/log"
27+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
28+
)
29+
30+
// HandleRequestBody handles request bodies.
31+
func (s *Server) HandleRequestBody(ctx context.Context, body *eppb.HttpBody) (*eppb.ProcessingResponse, error) {
32+
logger := log.FromContext(ctx)
33+
34+
var data map[string]any
35+
if err := json.Unmarshal(body.GetBody(), &data); err != nil {
36+
return nil, err
37+
}
38+
39+
modelVal, ok := data["model"]
40+
if !ok {
41+
logger.V(logutil.DEFAULT).Info("Request body does not contain model parameter")
42+
return &eppb.ProcessingResponse{
43+
Response: &eppb.ProcessingResponse_RequestBody{
44+
RequestBody: &eppb.BodyResponse{},
45+
},
46+
}, nil
47+
}
48+
49+
modelStr, ok := modelVal.(string)
50+
if !ok {
51+
logger.V(logutil.DEFAULT).Info("Model parameter value is not a string")
52+
return &eppb.ProcessingResponse{
53+
Response: &eppb.ProcessingResponse_RequestBody{
54+
RequestBody: &eppb.BodyResponse{},
55+
},
56+
}, fmt.Errorf("the model parameter value %v is not a string", modelVal)
57+
}
58+
59+
return &eppb.ProcessingResponse{
60+
Response: &eppb.ProcessingResponse_RequestBody{
61+
RequestBody: &eppb.BodyResponse{
62+
Response: &eppb.CommonResponse{
63+
// Necessary so that the new headers are used in the routing decision.
64+
ClearRouteCache: true,
65+
HeaderMutation: &eppb.HeaderMutation{
66+
SetHeaders: []*basepb.HeaderValueOption{
67+
{
68+
Header: &basepb.HeaderValue{
69+
Key: "model",
70+
Value: modelStr,
71+
},
72+
},
73+
},
74+
},
75+
},
76+
},
77+
},
78+
}, nil
79+
}
80+
81+
// HandleRequestHeaders handles request headers.
82+
func (s *Server) HandleRequestHeaders(headers *eppb.HttpHeaders) (*eppb.ProcessingResponse, error) {
83+
return &eppb.ProcessingResponse{
84+
Response: &eppb.ProcessingResponse_RequestHeaders{
85+
RequestHeaders: &eppb.HeadersResponse{},
86+
},
87+
}, nil
88+
}
89+
90+
// HandleRequestTrailers handles request trailers.
91+
func (s *Server) HandleRequestTrailers(trailers *eppb.HttpTrailers) (*eppb.ProcessingResponse, error) {
92+
return &eppb.ProcessingResponse{
93+
Response: &eppb.ProcessingResponse_RequestTrailers{
94+
RequestTrailers: &eppb.TrailersResponse{},
95+
},
96+
}, nil
97+
}

0 commit comments

Comments
 (0)