forked from kubernetes-sigs/gateway-api-inference-extension
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.go
147 lines (134 loc) · 5.51 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package handlers
import (
"io"
"time"
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
envoyTypePb "github.com/envoyproxy/go-control-plane/envoy/type/v3"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics"
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling"
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
klog "k8s.io/klog/v2"
)
func NewServer(pp PodProvider, scheduler Scheduler, targetPodHeader string, datastore ModelDataStore) *Server {
return &Server{
scheduler: scheduler,
podProvider: pp,
targetPodHeader: targetPodHeader,
datastore: datastore,
}
}
// Server implements the Envoy external processing server.
// https://www.envoyproxy.io/docs/envoy/latest/api-v3/service/ext_proc/v3/external_processor.proto
type Server struct {
scheduler Scheduler
podProvider PodProvider
// The key of the header to specify the target pod address. This value needs to match Envoy
// configuration.
targetPodHeader string
datastore ModelDataStore
}
type Scheduler interface {
Schedule(b *scheduling.LLMRequest) (targetPod backend.Pod, err error)
}
// PodProvider is an interface to provide set of pods in the backend and information such as metrics.
type PodProvider interface {
GetPodMetrics(pod backend.Pod) (*backend.PodMetrics, bool)
UpdatePodMetrics(pod backend.Pod, pm *backend.PodMetrics)
}
type ModelDataStore interface {
FetchModelData(modelName string) (returnModel *v1alpha1.InferenceModel)
}
func (s *Server) Process(srv extProcPb.ExternalProcessor_ProcessServer) error {
klog.V(logutil.VERBOSE).Info("Processing")
ctx := srv.Context()
// Create request context to share states during life time of an HTTP request.
// See https://github.com/envoyproxy/envoy/issues/17540.
reqCtx := &RequestContext{}
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
req, err := srv.Recv()
if err == io.EOF {
return nil
}
if err != nil {
// This error occurs very frequently, though it doesn't seem to have any impact.
// TODO Figure out if we can remove this noise.
klog.V(logutil.VERBOSE).Infof("cannot receive stream request: %v", err)
return status.Errorf(codes.Unknown, "cannot receive stream request: %v", err)
}
var resp *extProcPb.ProcessingResponse
switch v := req.Request.(type) {
case *extProcPb.ProcessingRequest_RequestHeaders:
reqCtx.RequestReceivedTimestamp = time.Now()
resp = HandleRequestHeaders(reqCtx, req)
klog.V(logutil.VERBOSE).Infof("Request context after HandleRequestHeaders: %+v", reqCtx)
case *extProcPb.ProcessingRequest_RequestBody:
resp, err = s.HandleRequestBody(reqCtx, req)
if err == nil {
metrics.RecordRequestCounter(reqCtx.Model, reqCtx.ResolvedTargetModel)
metrics.RecordRequestSizes(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestSize)
}
klog.V(logutil.VERBOSE).Infof("Request context after HandleRequestBody: %+v", reqCtx)
case *extProcPb.ProcessingRequest_ResponseHeaders:
resp, err = s.HandleResponseHeaders(reqCtx, req)
klog.V(logutil.VERBOSE).Infof("Request context after HandleResponseHeaders: %+v", reqCtx)
case *extProcPb.ProcessingRequest_ResponseBody:
resp, err = s.HandleResponseBody(reqCtx, req)
if err == nil && reqCtx.ResponseComplete {
reqCtx.ResponseCompleteTimestamp = time.Now()
metrics.RecordRequestLatencies(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.RequestReceivedTimestamp, reqCtx.ResponseCompleteTimestamp)
metrics.RecordResponseSizes(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.ResponseSize)
metrics.RecordInputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.PromptTokens)
metrics.RecordOutputTokens(reqCtx.Model, reqCtx.ResolvedTargetModel, reqCtx.Response.Usage.CompletionTokens)
}
klog.V(logutil.VERBOSE).Infof("Request context after HandleResponseBody: %+v", reqCtx)
default:
klog.Errorf("Unknown Request type %+v", v)
return status.Error(codes.Unknown, "unknown request type")
}
if err != nil {
klog.Errorf("failed to process request: %v", err)
switch status.Code(err) {
// This code can be returned by scheduler when there is no capacity for sheddable
// requests.
case codes.ResourceExhausted:
resp = &extProcPb.ProcessingResponse{
Response: &extProcPb.ProcessingResponse_ImmediateResponse{
ImmediateResponse: &extProcPb.ImmediateResponse{
Status: &envoyTypePb.HttpStatus{
Code: envoyTypePb.StatusCode_TooManyRequests,
},
},
},
}
default:
return status.Errorf(status.Code(err), "failed to handle request: %v", err)
}
}
klog.V(logutil.VERBOSE).Infof("response: %v", resp)
if err := srv.Send(resp); err != nil {
klog.Errorf("send error %v", err)
return status.Errorf(codes.Unknown, "failed to send response back to Envoy: %v", err)
}
}
}
// RequestContext stores context information during the life time of an HTTP request.
type RequestContext struct {
TargetPod backend.Pod
Model string
ResolvedTargetModel string
RequestReceivedTimestamp time.Time
ResponseCompleteTimestamp time.Time
RequestSize int
Response Response
ResponseSize int
ResponseComplete bool
}