@@ -8,16 +8,14 @@ import (
8
8
"net/http"
9
9
"strconv"
10
10
11
- extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
12
11
"github.com/prometheus/client_golang/prometheus/promhttp"
13
12
"google.golang.org/grpc"
14
13
healthPb "google.golang.org/grpc/health/grpc_health_v1"
15
14
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
16
15
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
17
16
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend/vllm"
18
- "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/handlers"
19
17
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics"
20
- "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling "
18
+ runserver "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/server "
21
19
"k8s.io/apimachinery/pkg/runtime"
22
20
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
23
21
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
@@ -122,10 +120,7 @@ func main() {
122
120
healthSvr := startHealthServer (datastore , * grpcHealthPort )
123
121
extProcSvr := serverRunner .Start (
124
122
datastore ,
125
- * grpcPort ,
126
- * refreshPodsInterval ,
127
- * refreshMetricsInterval ,
128
- * targetPodHeader ,
123
+ & vllm.PodMetricsClientImpl {},
129
124
)
130
125
// Start metrics handler
131
126
metricsSvr := startMetricsHandler (* metricsPort , cfg )
@@ -173,86 +168,6 @@ func startHealthServer(ds *backend.K8sDatastore, port int) *grpc.Server {
173
168
return svr
174
169
}
175
170
176
- // startExternalProcessorServer starts the Envoy external processor server in a goroutine.
177
- func startExternalProcessorServer (
178
- datastore * backend.K8sDatastore ,
179
- port int ,
180
- refreshPodsInterval , refreshMetricsInterval time.Duration ,
181
- targetPodHeader string ,
182
- ) * grpc.Server {
183
- svr := grpc .NewServer ()
184
-
185
- go func () {
186
- lis , err := net .Listen ("tcp" , fmt .Sprintf (":%d" , port ))
187
- if err != nil {
188
- klog .Fatalf ("Ext-proc server failed to listen: %v" , err )
189
- }
190
- klog .Infof ("Ext-proc server listening on port: %d" , port )
191
-
192
- // Initialize backend provider
193
- pp := backend .NewProvider (& vllm.PodMetricsClientImpl {}, datastore )
194
- if err := pp .Init (refreshPodsInterval , refreshMetricsInterval ); err != nil {
195
- klog .Fatalf ("Failed to initialize backend provider: %v" , err )
196
- }
197
-
198
- // Register ext_proc handlers
199
- extProcPb .RegisterExternalProcessorServer (
200
- svr ,
201
- handlers .NewServer (pp , scheduling .NewScheduler (pp ), targetPodHeader , datastore ),
202
- )
203
-
204
- // Blocking and will return when shutdown is complete.
205
- if err := svr .Serve (lis ); err != nil && err != grpc .ErrServerStopped {
206
- klog .Fatalf ("Ext-proc server failed: %v" , err )
207
- }
208
- klog .Info ("Ext-proc server shutting down" )
209
- }()
210
- return svr
211
- }
212
-
213
- func startMetricsHandler (port int , cfg * rest.Config ) * http.Server {
214
- metrics .Register ()
215
-
216
- var svr * http.Server
217
- go func () {
218
- klog .Info ("Starting metrics HTTP handler ..." )
219
-
220
- mux := http .NewServeMux ()
221
- mux .Handle (defaultMetricsEndpoint , metricsHandlerWithAuthenticationAndAuthorization (cfg ))
222
-
223
- svr = & http.Server {
224
- Addr : net .JoinHostPort ("" , strconv .Itoa (port )),
225
- Handler : mux ,
226
- }
227
- if err := svr .ListenAndServe (); err != http .ErrServerClosed {
228
- klog .Fatalf ("failed to start metrics HTTP handler: %v" , err )
229
- }
230
- }()
231
- return svr
232
- }
233
-
234
- func metricsHandlerWithAuthenticationAndAuthorization (cfg * rest.Config ) http.Handler {
235
- h := promhttp .HandlerFor (
236
- legacyregistry .DefaultGatherer ,
237
- promhttp.HandlerOpts {},
238
- )
239
- httpClient , err := rest .HTTPClientFor (cfg )
240
- if err != nil {
241
- klog .Fatalf ("failed to create http client for metrics auth: %v" , err )
242
- }
243
-
244
- filter , err := filters .WithAuthenticationAndAuthorization (cfg , httpClient )
245
- if err != nil {
246
- klog .Fatalf ("failed to create metrics filter for auth: %v" , err )
247
- }
248
- metricsLogger := klog .LoggerWithValues (klog .NewKlogr (), "path" , defaultMetricsEndpoint )
249
- metricsAuthHandler , err := filter (metricsLogger , h )
250
- if err != nil {
251
- klog .Fatalf ("failed to create metrics auth handler: %v" , err )
252
- }
253
- return metricsAuthHandler
254
- }
255
-
256
171
func startMetricsHandler (port int , cfg * rest.Config ) * http.Server {
257
172
metrics .Register ()
258
173
@@ -296,8 +211,6 @@ func metricsHandlerWithAuthenticationAndAuthorization(cfg *rest.Config) http.Han
296
211
return metricsAuthHandler
297
212
}
298
213
299
- == == == =
300
- >> >> >> > ad32d85 (Add updated hermetic test with k8s client API , these pull from example object yamls .)
301
214
func validateFlags () error {
302
215
if * poolName == "" {
303
216
return fmt .Errorf ("required %q flag not set" , "poolName" )
0 commit comments