Skip to content

Commit 5e9514a

Browse files
committed
Add code for Envoy extension that support body-to-header translation
1 parent 2577f63 commit 5e9514a

File tree

9 files changed

+704
-0
lines changed

9 files changed

+704
-0
lines changed

body-based-routing.Dockerfile

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Dockerfile has specific requirement to put this ARG at the beginning:
2+
# https://docs.docker.com/engine/reference/builder/#understand-how-arg-and-from-interact
3+
ARG BUILDER_IMAGE=golang:1.23
4+
ARG BASE_IMAGE=gcr.io/distroless/static:nonroot
5+
6+
## Multistage build
7+
FROM ${BUILDER_IMAGE} AS builder
8+
ENV CGO_ENABLED=0
9+
ENV GOOS=linux
10+
ENV GOARCH=amd64
11+
12+
# Dependencies
13+
WORKDIR /src
14+
COPY go.mod go.sum ./
15+
RUN go mod download
16+
17+
# Sources
18+
COPY cmd ./cmd
19+
COPY pkg ./pkg
20+
COPY internal ./internal
21+
WORKDIR /src/cmd/body-based-routing
22+
RUN go build -o /body-based-routing
23+
24+
## Multistage deploy
25+
FROM ${BASE_IMAGE}
26+
27+
WORKDIR /
28+
COPY --from=builder /body-based-routing /body-based-routing
29+
30+
ENTRYPOINT ["/body-based-routing"]

cmd/body-based-routing/health.go

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"context"
21+
22+
"github.com/go-logr/logr"
23+
"google.golang.org/grpc/codes"
24+
healthPb "google.golang.org/grpc/health/grpc_health_v1"
25+
"google.golang.org/grpc/status"
26+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
27+
)
28+
29+
type healthServer struct {
30+
logger logr.Logger
31+
}
32+
33+
func (s *healthServer) Check(ctx context.Context, in *healthPb.HealthCheckRequest) (*healthPb.HealthCheckResponse, error) {
34+
s.logger.V(logutil.VERBOSE).Info("gRPC health check serving", "service", in.Service)
35+
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil
36+
}
37+
38+
func (s *healthServer) Watch(in *healthPb.HealthCheckRequest, srv healthPb.Health_WatchServer) error {
39+
return status.Error(codes.Unimplemented, "Watch is not implemented")
40+
}

cmd/body-based-routing/main.go

+137
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"flag"
21+
"os"
22+
23+
"github.com/go-logr/logr"
24+
uberzap "go.uber.org/zap"
25+
"go.uber.org/zap/zapcore"
26+
"google.golang.org/grpc"
27+
healthPb "google.golang.org/grpc/health/grpc_health_v1"
28+
ctrl "sigs.k8s.io/controller-runtime"
29+
"sigs.k8s.io/controller-runtime/pkg/log/zap"
30+
"sigs.k8s.io/controller-runtime/pkg/manager"
31+
"sigs.k8s.io/gateway-api-inference-extension/internal/runnable"
32+
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/body-based-routing/server"
33+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
34+
)
35+
36+
var (
37+
grpcPort = flag.Int(
38+
"grpcPort",
39+
runserver.DefaultGrpcPort,
40+
"The gRPC port used for communicating with Envoy proxy")
41+
grpcHealthPort = flag.Int(
42+
"grpcHealthPort",
43+
9003,
44+
"The port used for gRPC liveness and readiness probes")
45+
logVerbosity = flag.Int("v", logging.DEFAULT, "number for the log level verbosity")
46+
47+
setupLog = ctrl.Log.WithName("setup")
48+
)
49+
50+
func main() {
51+
if err := run(); err != nil {
52+
os.Exit(1)
53+
}
54+
}
55+
56+
func run() error {
57+
opts := zap.Options{Development: true}
58+
opts.BindFlags(flag.CommandLine)
59+
flag.Parse()
60+
initLogging(&opts)
61+
62+
// Print all flag values
63+
flags := make(map[string]any)
64+
flag.VisitAll(func(f *flag.Flag) {
65+
flags[f.Name] = f.Value
66+
})
67+
setupLog.Info("Flags processed", "flags", flags)
68+
69+
// Init runtime.
70+
cfg, err := ctrl.GetConfig()
71+
if err != nil {
72+
setupLog.Error(err, "Failed to get rest config")
73+
return err
74+
}
75+
76+
mgr, err := ctrl.NewManager(cfg, ctrl.Options{})
77+
if err != nil {
78+
setupLog.Error(err, "Failed to create manager", "config", cfg)
79+
return err
80+
}
81+
82+
ctx := ctrl.SetupSignalHandler()
83+
84+
// Setup runner.
85+
serverRunner := &runserver.ExtProcServerRunner{GrpcPort: *grpcPort}
86+
87+
// Register health server.
88+
if err := registerHealthServer(mgr, ctrl.Log.WithName("health"), *grpcHealthPort); err != nil {
89+
return err
90+
}
91+
92+
// Register ext-proc server.
93+
if err := mgr.Add(serverRunner.AsRunnable(ctrl.Log.WithName("ext-proc"))); err != nil {
94+
setupLog.Error(err, "Failed to register ext-proc gRPC server")
95+
return err
96+
}
97+
98+
// Start the manager. This blocks until a signal is received.
99+
setupLog.Info("Manager starting")
100+
if err := mgr.Start(ctx); err != nil {
101+
setupLog.Error(err, "Error starting manager")
102+
return err
103+
}
104+
setupLog.Info("Manager terminated")
105+
return nil
106+
}
107+
108+
// registerHealthServer adds the Health gRPC server as a Runnable to the given manager.
109+
func registerHealthServer(mgr manager.Manager, logger logr.Logger, port int) error {
110+
srv := grpc.NewServer()
111+
healthPb.RegisterHealthServer(srv, &healthServer{
112+
logger: logger,
113+
})
114+
if err := mgr.Add(
115+
runnable.NoLeaderElection(runnable.GRPCServer("health", srv, port))); err != nil {
116+
setupLog.Error(err, "Failed to register health server")
117+
return err
118+
}
119+
return nil
120+
}
121+
122+
func initLogging(opts *zap.Options) {
123+
useV := true
124+
flag.Visit(func(f *flag.Flag) {
125+
if f.Name == "zap-log-level" {
126+
useV = false
127+
}
128+
})
129+
if useV {
130+
// See https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/log/zap#Options.Level
131+
lvl := -1 * (*logVerbosity)
132+
opts.Level = uberzap.NewAtomicLevelAt(zapcore.Level(int8(lvl)))
133+
}
134+
135+
logger := zap.New(zap.UseFlagOptions(opts), zap.RawZapOpts(uberzap.AddCaller()))
136+
ctrl.SetLogger(logger)
137+
}

pkg/body-based-routing/README.md

+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# Body-Based Routing
2+
This package provides an extension that can be deployed to write the `model`
3+
HTTP body parameter as a header (X-Gateway-Model-Name) so as to enable routing capabilities on the
4+
model name.
5+
6+
As per OpenAI spec, it is standard for the model name to be included in the
7+
body of the HTTP request. However, most implementations do not support routing
8+
based on the request body. This extension helps bridge that gap for clients.
9+
This extension works by parsing the request body. If it finds a `model` parameter in the
10+
request body, it will copy the value of that parameter into a request header.
11+
12+
This extension is intended to be paired with an `ext_proc` capable Gateway. There is not
13+
a standard way to represent this kind of extension in Gateway API yet, so we recommend
14+
referring to implementation-specific documentation for how to deploy this extension.
+97
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package handlers
18+
19+
import (
20+
"context"
21+
"encoding/json"
22+
"fmt"
23+
24+
basepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
25+
eppb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
26+
"sigs.k8s.io/controller-runtime/pkg/log"
27+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
28+
)
29+
30+
// HandleRequestBody handles request bodies.
31+
func (s *Server) HandleRequestBody(ctx context.Context, body *eppb.HttpBody) (*eppb.ProcessingResponse, error) {
32+
logger := log.FromContext(ctx)
33+
34+
var data map[string]any
35+
if err := json.Unmarshal(body.GetBody(), &data); err != nil {
36+
return nil, err
37+
}
38+
39+
modelVal, ok := data["model"]
40+
if !ok {
41+
logger.V(logutil.DEFAULT).Info("Request body does not contain model parameter")
42+
return &eppb.ProcessingResponse{
43+
Response: &eppb.ProcessingResponse_RequestBody{
44+
RequestBody: &eppb.BodyResponse{},
45+
},
46+
}, nil
47+
}
48+
49+
modelStr, ok := modelVal.(string)
50+
if !ok {
51+
logger.V(logutil.DEFAULT).Info("Model parameter value is not a string")
52+
return &eppb.ProcessingResponse{
53+
Response: &eppb.ProcessingResponse_RequestBody{
54+
RequestBody: &eppb.BodyResponse{},
55+
},
56+
}, fmt.Errorf("the model parameter value %v is not a string", modelVal)
57+
}
58+
59+
return &eppb.ProcessingResponse{
60+
Response: &eppb.ProcessingResponse_RequestBody{
61+
RequestBody: &eppb.BodyResponse{
62+
Response: &eppb.CommonResponse{
63+
// Necessary so that the new headers are used in the routing decision.
64+
ClearRouteCache: true,
65+
HeaderMutation: &eppb.HeaderMutation{
66+
SetHeaders: []*basepb.HeaderValueOption{
67+
{
68+
Header: &basepb.HeaderValue{
69+
Key: "X-Gateway-Model-Name",
70+
RawValue: []byte(modelStr),
71+
},
72+
},
73+
},
74+
},
75+
},
76+
},
77+
},
78+
}, nil
79+
}
80+
81+
// HandleRequestHeaders handles request headers.
82+
func (s *Server) HandleRequestHeaders(headers *eppb.HttpHeaders) (*eppb.ProcessingResponse, error) {
83+
return &eppb.ProcessingResponse{
84+
Response: &eppb.ProcessingResponse_RequestHeaders{
85+
RequestHeaders: &eppb.HeadersResponse{},
86+
},
87+
}, nil
88+
}
89+
90+
// HandleRequestTrailers handles request trailers.
91+
func (s *Server) HandleRequestTrailers(trailers *eppb.HttpTrailers) (*eppb.ProcessingResponse, error) {
92+
return &eppb.ProcessingResponse{
93+
Response: &eppb.ProcessingResponse_RequestTrailers{
94+
RequestTrailers: &eppb.TrailersResponse{},
95+
},
96+
}, nil
97+
}

0 commit comments

Comments
 (0)