@@ -6,9 +6,11 @@ import (
6
6
"fmt"
7
7
"net"
8
8
"net/http"
9
+ "os"
9
10
"strconv"
10
11
11
12
"github.com/prometheus/client_golang/prometheus/promhttp"
13
+ "golang.org/x/sync/errgroup"
12
14
"google.golang.org/grpc"
13
15
healthPb "google.golang.org/grpc/health/grpc_health_v1"
14
16
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
@@ -79,17 +81,25 @@ func init() {
79
81
}
80
82
81
83
func main () {
84
+ if err := run (); err != nil {
85
+ os .Exit (1 )
86
+ }
87
+ }
88
+
89
+ func run () error {
82
90
klog .InitFlags (nil )
83
91
flag .Parse ()
84
92
85
93
ctrl .SetLogger (klog .TODO ())
86
94
cfg , err := ctrl .GetConfig ()
87
95
if err != nil {
88
- klog .Fatalf ("Failed to get rest config: %v" , err )
96
+ klog .ErrorS (err , "Failed to get rest config" )
97
+ return err
89
98
}
90
99
// Validate flags
91
100
if err := validateFlags (); err != nil {
92
- klog .Fatalf ("Failed to validate flags: %v" , err )
101
+ klog .ErrorS (err , "Failed to validate flags" )
102
+ return err
93
103
}
94
104
95
105
// Print all flag values
@@ -114,101 +124,164 @@ func main() {
114
124
Config : ctrl .GetConfigOrDie (),
115
125
Datastore : datastore ,
116
126
}
117
- serverRunner .Setup ()
127
+ if err := serverRunner .Setup (); err != nil {
128
+ klog .ErrorS (err , "Failed to setup server runner" )
129
+ return err
130
+ }
118
131
119
- // Start health and ext-proc servers in goroutines
120
- healthSvr := startHealthServer (datastore , * grpcHealthPort )
121
- extProcSvr := serverRunner .Start (
122
- datastore ,
123
- & vllm.PodMetricsClientImpl {},
124
- )
125
- // Start metrics handler
126
- metricsSvr := startMetricsHandler (* metricsPort , cfg )
132
+ // Start processing signals and init the group to manage goroutines.
133
+ g , ctx := errgroup .WithContext (ctrl .SetupSignalHandler ())
127
134
128
- // Start manager, blocking
129
- serverRunner . StartManager ( )
135
+ // Start health server.
136
+ startHealthServer ( g , ctx , datastore , * grpcHealthPort )
130
137
131
- // Gracefully shutdown servers
132
- if healthSvr != nil {
133
- klog .Info ("Health server shutting down" )
134
- healthSvr .GracefulStop ()
135
- }
136
- if extProcSvr != nil {
137
- klog .Info ("Ext-proc server shutting down" )
138
- extProcSvr .GracefulStop ()
139
- }
140
- if metricsSvr != nil {
141
- klog .Info ("Metrics server shutting down" )
142
- if err := metricsSvr .Shutdown (context .Background ()); err != nil {
143
- klog .Infof ("Metrics server Shutdown: %v" , err )
144
- }
145
- }
138
+ // Start ext-proc server.
139
+ startExtProcServer (g , ctx , serverRunner , datastore )
140
+
141
+ // Start metrics handler.
142
+ startMetricsHandler (g , ctx , * metricsPort , cfg )
146
143
147
- klog .Info ("All components shutdown" )
144
+ // Start manager.
145
+ g .Go (func () error {
146
+ return serverRunner .StartManager (ctx )
147
+ })
148
+
149
+ err = g .Wait ()
150
+ klog .InfoS ("All components terminated" )
151
+ return err
148
152
}
149
153
150
154
// startHealthServer starts the gRPC health probe server in a goroutine.
151
- func startHealthServer (ds * backend.K8sDatastore , port int ) * grpc.Server {
155
+ func startHealthServer (g * errgroup.Group , ctx context.Context , ds * backend.K8sDatastore , port int ) {
156
+ klog .InfoS ("Health server starting..." )
157
+
152
158
svr := grpc .NewServer ()
153
159
healthPb .RegisterHealthServer (svr , & healthServer {datastore : ds })
154
160
155
- go func () {
161
+ g .Go (func () error {
162
+ // Init the listener.
156
163
lis , err := net .Listen ("tcp" , fmt .Sprintf (":%d" , port ))
157
164
if err != nil {
158
- klog .Fatalf ("Health server failed to listen: %v" , err )
165
+ klog .ErrorS (err , "Health server failed to listen" )
166
+ return err
159
167
}
160
- klog .Infof ("Health server listening on port: %d" , port )
161
168
162
- // Blocking and will return when shutdown is complete.
163
- if err := svr .Serve (lis ); err != nil && err != grpc .ErrServerStopped {
164
- klog .Fatalf ("Health server failed: %v" , err )
165
- }
166
- klog .Info ("Health server shutting down" )
167
- }()
168
- return svr
169
+ klog .InfoS ("Health server listening" , "port" , port )
170
+
171
+ // Keep serving until interrupted.
172
+ g .Go (func () error {
173
+ if err := svr .Serve (lis ); err != nil && err != grpc .ErrServerStopped {
174
+ klog .ErrorS (err , "Health server failed" )
175
+ return err
176
+ }
177
+ klog .InfoS ("Health server terminated" )
178
+ return nil
179
+ })
180
+
181
+ // Shutdown on interrupt.
182
+ g .Go (func () error {
183
+ <- ctx .Done ()
184
+ klog .InfoS ("Health server shutting down..." )
185
+ svr .GracefulStop ()
186
+ return nil
187
+ })
188
+ return nil
189
+ })
169
190
}
170
191
171
- func startMetricsHandler (port int , cfg * rest.Config ) * http.Server {
192
+ func startExtProcServer (g * errgroup.Group , ctx context.Context , runner * runserver.ExtProcServerRunner , ds * backend.K8sDatastore ) {
193
+ g .Go (func () error {
194
+ errCh := make (chan error , 1 )
195
+ svr := runner .Start (ds , & vllm.PodMetricsClientImpl {}, func (err error ) {
196
+ errCh <- err
197
+ })
198
+
199
+ g .Go (func () error {
200
+ <- ctx .Done ()
201
+ klog .InfoS ("Ext-proc server shutting down..." )
202
+ svr .GracefulStop ()
203
+ return nil
204
+ })
205
+ return <- errCh
206
+ })
207
+ }
208
+
209
+ func startMetricsHandler (g * errgroup.Group , ctx context.Context , port int , cfg * rest.Config ) {
172
210
metrics .Register ()
211
+ klog .InfoS ("Metrics HTTP handler starting..." )
173
212
174
213
var svr * http.Server
175
214
go func () {
176
- klog .Info ("Starting metrics HTTP handler ..." )
215
+ }()
216
+
217
+ g .Go (func () error {
218
+ // Init the listener.
219
+ lis , err := net .Listen ("tcp" , fmt .Sprintf (":%d" , port ))
220
+ if err != nil {
221
+ klog .ErrorS (err , "Metrics HTTP handler failed to listen" )
222
+ return err
223
+ }
224
+
225
+ klog .InfoS ("Metrics server listening" , "port" , port )
226
+
227
+ // Init HTTP server.
228
+ h , err := metricsHandlerWithAuthenticationAndAuthorization (cfg )
229
+ if err != nil {
230
+ return err
231
+ }
177
232
178
233
mux := http .NewServeMux ()
179
- mux .Handle (defaultMetricsEndpoint , metricsHandlerWithAuthenticationAndAuthorization ( cfg ) )
234
+ mux .Handle (defaultMetricsEndpoint , h )
180
235
181
236
svr = & http.Server {
182
237
Addr : net .JoinHostPort ("" , strconv .Itoa (port )),
183
238
Handler : mux ,
184
239
}
185
- if err := svr .ListenAndServe (); err != http .ErrServerClosed {
186
- klog .Fatalf ("failed to start metrics HTTP handler: %v" , err )
187
- }
188
- }()
189
- return svr
240
+
241
+ // Keep serving until interrupted.
242
+ g .Go (func () error {
243
+ if err := svr .Serve (lis ); err != http .ErrServerClosed {
244
+ klog .ErrorS (err , "Failed to start metrics HTTP handler" )
245
+ return err
246
+ }
247
+ klog .InfoS ("Metrics HTTP handler terminated" )
248
+ return nil
249
+ })
250
+
251
+ // Shutdown on interrupt.
252
+ g .Go (func () error {
253
+ <- ctx .Done ()
254
+ klog .InfoS ("Metrics HTTP handler shutting down..." )
255
+ _ = svr .Shutdown (context .Background ())
256
+ return nil
257
+ })
258
+ return nil
259
+ })
190
260
}
191
261
192
- func metricsHandlerWithAuthenticationAndAuthorization (cfg * rest.Config ) http.Handler {
262
+ func metricsHandlerWithAuthenticationAndAuthorization (cfg * rest.Config ) ( http.Handler , error ) {
193
263
h := promhttp .HandlerFor (
194
264
legacyregistry .DefaultGatherer ,
195
265
promhttp.HandlerOpts {},
196
266
)
197
267
httpClient , err := rest .HTTPClientFor (cfg )
198
268
if err != nil {
199
- klog .Fatalf ("failed to create http client for metrics auth: %v" , err )
269
+ klog .ErrorS (err , "Failed to create http client for metrics auth" )
270
+ return nil , err
200
271
}
201
272
202
273
filter , err := filters .WithAuthenticationAndAuthorization (cfg , httpClient )
203
274
if err != nil {
204
- klog .Fatalf ("failed to create metrics filter for auth: %v" , err )
275
+ klog .ErrorS (err , "Failed to create metrics filter for auth" )
276
+ return nil , err
205
277
}
206
278
metricsLogger := klog .LoggerWithValues (klog .NewKlogr (), "path" , defaultMetricsEndpoint )
207
279
metricsAuthHandler , err := filter (metricsLogger , h )
208
280
if err != nil {
209
- klog .Fatalf ("failed to create metrics auth handler: %v" , err )
281
+ klog .ErrorS (err , "Failed to create metrics auth handler" )
282
+ return nil , err
210
283
}
211
- return metricsAuthHandler
284
+ return metricsAuthHandler , nil
212
285
}
213
286
214
287
func validateFlags () error {
0 commit comments