Skip to content

Commit 72a56e8

Browse files
committed
Add code for Envoy extension that support body-to-header translation
1 parent 2577f63 commit 72a56e8

File tree

2 files changed

+173
-0
lines changed

2 files changed

+173
-0
lines changed

pkg/body-based-routing/main.go

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
// Package main is the entry point for the server.
2+
package main
3+
4+
import (
5+
"fmt"
6+
"net"
7+
8+
"google3/base/go/log"
9+
"sigs.k8s.io/gateway-api-inference-extension/pkg/body-based-routing/service"
10+
eppb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
11+
"google.golang.org/grpc"
12+
)
13+
14+
func main() {
15+
address := "0.0.0.0:8181"
16+
lis, err := net.Listen("tcp", address)
17+
if err != nil {
18+
log.Fatalf("Failed to listen on insecure port: %v", err)
19+
}
20+
fmt.Printf("Starting server on address %s\n", address)
21+
22+
grpcServer := grpc.NewServer()
23+
eppb.RegisterExternalProcessorServer(grpcServer, &service.GRPCCalloutService{})
24+
25+
if err := grpcServer.Serve(lis); err != nil {
26+
log.Fatalf("Failed to serve gRPC on insecure port: %v", err)
27+
}
28+
}

pkg/body-based-routing/service.go

+145
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
// Package service provides the implementation of the external processor service.
2+
package service
3+
4+
import (
5+
"encoding/json"
6+
"fmt"
7+
8+
basepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
9+
eppb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
10+
)
11+
12+
// GRPCCalloutService implements the gRPC ExternalProcessorServer.
13+
type GRPCCalloutService struct {
14+
eppb.UnimplementedExternalProcessorServer
15+
}
16+
17+
// Process processes incoming gRPC streams.
18+
func (s *GRPCCalloutService) Process(stream eppb.ExternalProcessor_ProcessServer) error {
19+
for {
20+
req, err := stream.Recv()
21+
if err != nil {
22+
return err
23+
}
24+
25+
fmt.Printf("grpc_callout_service: Received a new request %+v\n", req)
26+
27+
var response *eppb.ProcessingResponse
28+
switch {
29+
case req.GetRequestHeaders() != nil:
30+
response, err = s.HandleRequestHeaders(req.GetRequestHeaders())
31+
case req.GetResponseHeaders() != nil:
32+
response, err = s.HandleResponseHeaders(req.GetResponseHeaders())
33+
case req.GetRequestBody() != nil:
34+
response, err = s.HandleRequestBody(req.GetRequestBody())
35+
case req.GetResponseBody() != nil:
36+
response, err = s.HandleResponseBody(req.GetResponseBody())
37+
case req.GetRequestTrailers() != nil:
38+
response, err = s.HandleRequestTrailers(req.GetRequestTrailers())
39+
case req.GetResponseTrailers() != nil:
40+
response, err = s.HandleResponseTrailers(req.GetResponseTrailers())
41+
}
42+
43+
if err != nil {
44+
return err
45+
}
46+
47+
if err := stream.Send(response); err != nil {
48+
return err
49+
}
50+
}
51+
}
52+
53+
// HandleRequestBody handles request bodies.
54+
func (s *GRPCCalloutService) HandleRequestBody(body *eppb.HttpBody) (*eppb.ProcessingResponse, error) {
55+
var data map[string]any
56+
if err := json.Unmarshal(body.GetBody(), &data); err != nil {
57+
return nil, err
58+
}
59+
60+
modelVal, ok := data["model"]
61+
if !ok {
62+
fmt.Print("The incoming request did not contain a model parameter\n")
63+
return &eppb.ProcessingResponse{
64+
Response: &eppb.ProcessingResponse_RequestBody{
65+
RequestBody: &eppb.BodyResponse{},
66+
},
67+
}, nil
68+
}
69+
70+
modelStr, ok := modelVal.(string)
71+
if !ok {
72+
return &eppb.ProcessingResponse{
73+
Response: &eppb.ProcessingResponse_RequestBody{
74+
RequestBody: &eppb.BodyResponse{},
75+
},
76+
}, fmt.Errorf("the model parameter value %v is not a string", modelVal)
77+
}
78+
79+
fmt.Print("grpc_callout_service: Returning mutated request headers\n")
80+
return &eppb.ProcessingResponse{
81+
Response: &eppb.ProcessingResponse_RequestBody{
82+
RequestBody: &eppb.BodyResponse{
83+
Response: &eppb.CommonResponse{
84+
// Necessary so that the new headers are used in the routing decision.
85+
ClearRouteCache: true,
86+
HeaderMutation: &eppb.HeaderMutation{
87+
SetHeaders: []*basepb.HeaderValueOption{
88+
{
89+
Header: &basepb.HeaderValue{
90+
Key: "Model",
91+
Value: modelStr,
92+
},
93+
},
94+
},
95+
},
96+
},
97+
},
98+
},
99+
}, nil
100+
}
101+
102+
// HandleRequestHeaders handles request headers.
103+
func (s *GRPCCalloutService) HandleRequestHeaders(headers *eppb.HttpHeaders) (*eppb.ProcessingResponse, error) {
104+
return &eppb.ProcessingResponse{
105+
Response: &eppb.ProcessingResponse_RequestHeaders{
106+
RequestHeaders: &eppb.HeadersResponse{},
107+
},
108+
}, nil
109+
}
110+
111+
// HandleResponseHeaders handles response headers.
112+
func (s *GRPCCalloutService) HandleResponseHeaders(headers *eppb.HttpHeaders) (*eppb.ProcessingResponse, error) {
113+
return &eppb.ProcessingResponse{
114+
Response: &eppb.ProcessingResponse_ResponseHeaders{
115+
ResponseHeaders: &eppb.HeadersResponse{},
116+
},
117+
}, nil
118+
}
119+
120+
// HandleResponseBody handles response bodies.
121+
func (s *GRPCCalloutService) HandleResponseBody(body *eppb.HttpBody) (*eppb.ProcessingResponse, error) {
122+
return &eppb.ProcessingResponse{
123+
Response: &eppb.ProcessingResponse_ResponseBody{
124+
ResponseBody: &eppb.BodyResponse{},
125+
},
126+
}, nil
127+
}
128+
129+
// HandleRequestTrailers handles request trailers.
130+
func (s *GRPCCalloutService) HandleRequestTrailers(trailers *eppb.HttpTrailers) (*eppb.ProcessingResponse, error) {
131+
return &eppb.ProcessingResponse{
132+
Response: &eppb.ProcessingResponse_RequestTrailers{
133+
RequestTrailers: &eppb.TrailersResponse{},
134+
},
135+
}, nil
136+
}
137+
138+
// HandleResponseTrailers handles response trailers.
139+
func (s *GRPCCalloutService) HandleResponseTrailers(trailers *eppb.HttpTrailers) (*eppb.ProcessingResponse, error) {
140+
return &eppb.ProcessingResponse{
141+
Response: &eppb.ProcessingResponse_ResponseTrailers{
142+
ResponseTrailers: &eppb.TrailersResponse{},
143+
},
144+
}, nil
145+
}

0 commit comments

Comments
 (0)