@@ -6,9 +6,11 @@ import (
6
6
"fmt"
7
7
"net"
8
8
"net/http"
9
+ "os"
9
10
"strconv"
10
11
11
12
"github.com/prometheus/client_golang/prometheus/promhttp"
13
+ "golang.org/x/sync/errgroup"
12
14
"google.golang.org/grpc"
13
15
healthPb "google.golang.org/grpc/health/grpc_health_v1"
14
16
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
@@ -72,17 +74,25 @@ func init() {
72
74
}
73
75
74
76
func main () {
77
+ if err := run (); err != nil {
78
+ os .Exit (1 )
79
+ }
80
+ }
81
+
82
+ func run () error {
75
83
klog .InitFlags (nil )
76
84
flag .Parse ()
77
85
78
86
ctrl .SetLogger (klog .TODO ())
79
87
cfg , err := ctrl .GetConfig ()
80
88
if err != nil {
81
- klog .Fatalf ("Failed to get rest config: %v" , err )
89
+ klog .ErrorS (err , "Failed to get rest config" )
90
+ return err
82
91
}
83
92
// Validate flags
84
93
if err := validateFlags (); err != nil {
85
- klog .Fatalf ("Failed to validate flags: %v" , err )
94
+ klog .ErrorS (err , "Failed to validate flags" )
95
+ return err
86
96
}
87
97
88
98
// Print all flag values
@@ -113,96 +123,140 @@ func main() {
113
123
}
114
124
datastore .SetClient (k8sClient )
115
125
116
- // Start health and ext-proc servers in goroutines
117
- healthSvr := startHealthServer (datastore , * grpcHealthPort )
118
- extProcSvr := serverRunner .Start (& vllm.PodMetricsClientImpl {})
119
- // Start metrics handler
120
- metricsSvr := startMetricsHandler (* metricsPort , cfg )
126
+ if err := serverRunner .Setup (); err != nil {
127
+ klog .ErrorS (err , "Failed to setup server runner" )
128
+ return err
129
+ }
121
130
122
- // Start manager, blocking
123
- serverRunner . StartManager ( )
131
+ // Start processing signals and init the group to manage goroutines.
132
+ g , ctx := errgroup . WithContext ( ctrl . SetupSignalHandler () )
124
133
125
- // Gracefully shutdown servers
126
- if healthSvr != nil {
127
- klog .Info ("Health server shutting down" )
128
- healthSvr .GracefulStop ()
129
- }
130
- if extProcSvr != nil {
131
- klog .Info ("Ext-proc server shutting down" )
132
- extProcSvr .GracefulStop ()
133
- }
134
- if metricsSvr != nil {
135
- klog .Info ("Metrics server shutting down" )
136
- if err := metricsSvr .Shutdown (context .Background ()); err != nil {
137
- klog .Infof ("Metrics server Shutdown: %v" , err )
138
- }
139
- }
134
+ // Start health server.
135
+ startHealthServer (ctx , g , datastore , * grpcHealthPort )
136
+
137
+ // Start ext-proc server.
138
+ g .Go (func () error {
139
+ return serverRunner .Start (ctx , & vllm.PodMetricsClientImpl {})
140
+ })
141
+
142
+ // Start metrics handler.
143
+ startMetricsHandler (ctx , g , * metricsPort , cfg )
140
144
141
- klog .Info ("All components shutdown" )
145
+ // Start manager.
146
+ g .Go (func () error {
147
+ return serverRunner .StartManager (ctx )
148
+ })
149
+
150
+ err = g .Wait ()
151
+ klog .InfoS ("All components terminated" )
152
+ return err
142
153
}
143
154
144
- // startHealthServer starts the gRPC health probe server in a goroutine .
145
- func startHealthServer (ds * backend.K8sDatastore , port int ) * grpc. Server {
146
- svr := grpc . NewServer ()
147
- healthPb . RegisterHealthServer ( svr , & healthServer { datastore : ds } )
155
+ // startHealthServer starts the gRPC health probe server using the given errgroup .
156
+ func startHealthServer (ctx context. Context , g * errgroup. Group , ds * backend.K8sDatastore , port int ) {
157
+ g . Go ( func () error {
158
+ klog . InfoS ( "Health server starting..." )
148
159
149
- go func () {
160
+ // Start listening.
150
161
lis , err := net .Listen ("tcp" , fmt .Sprintf (":%d" , port ))
151
162
if err != nil {
152
- klog .Fatalf ("Health server failed to listen: %v" , err )
163
+ klog .ErrorS (err , "Health server failed to listen" )
164
+ return err
153
165
}
154
- klog .Infof ("Health server listening on port: %d" , port )
155
166
156
- // Blocking and will return when shutdown is complete.
167
+ klog .InfoS ("Health server listening" , "port" , port )
168
+
169
+ svr := grpc .NewServer ()
170
+ healthPb .RegisterHealthServer (svr , & healthServer {datastore : ds })
171
+
172
+ // Shutdown on context closed.
173
+ g .Go (func () error {
174
+ <- ctx .Done ()
175
+ klog .InfoS ("Health server shutting down..." )
176
+ svr .GracefulStop ()
177
+ return nil
178
+ })
179
+
180
+ // Keep serving until terminated.
157
181
if err := svr .Serve (lis ); err != nil && err != grpc .ErrServerStopped {
158
- klog .Fatalf ("Health server failed: %v" , err )
182
+ klog .ErrorS (err , "Health server failed" )
183
+ return err
159
184
}
160
- klog .Info ("Health server shutting down " )
161
- }()
162
- return svr
185
+ klog .InfoS ("Health server terminated " )
186
+ return nil
187
+ })
163
188
}
164
189
165
- func startMetricsHandler (port int , cfg * rest.Config ) * http.Server {
166
- metrics .Register ()
190
+ // startMetricsHandler starts the metrics HTTP handler using the given errgroup.
191
+ func startMetricsHandler (ctx context.Context , g * errgroup.Group , port int , cfg * rest.Config ) {
192
+ g .Go (func () error {
193
+ metrics .Register ()
194
+ klog .InfoS ("Metrics HTTP handler starting..." )
195
+
196
+ // Start listening.
197
+ lis , err := net .Listen ("tcp" , fmt .Sprintf (":%d" , port ))
198
+ if err != nil {
199
+ klog .ErrorS (err , "Metrics HTTP handler failed to listen" )
200
+ return err
201
+ }
167
202
168
- var svr * http.Server
169
- go func () {
170
- klog .Info ("Starting metrics HTTP handler ..." )
203
+ klog .InfoS ("Metrics HTTP handler listening" , "port" , port )
204
+
205
+ // Init HTTP server.
206
+ h , err := metricsHandlerWithAuthenticationAndAuthorization (cfg )
207
+ if err != nil {
208
+ return err
209
+ }
171
210
172
211
mux := http .NewServeMux ()
173
- mux .Handle (defaultMetricsEndpoint , metricsHandlerWithAuthenticationAndAuthorization ( cfg ) )
212
+ mux .Handle (defaultMetricsEndpoint , h )
174
213
175
- svr = & http.Server {
214
+ svr : = & http.Server {
176
215
Addr : net .JoinHostPort ("" , strconv .Itoa (port )),
177
216
Handler : mux ,
178
217
}
179
- if err := svr .ListenAndServe (); err != http .ErrServerClosed {
180
- klog .Fatalf ("failed to start metrics HTTP handler: %v" , err )
218
+
219
+ // Shutdown on interrupt.
220
+ g .Go (func () error {
221
+ <- ctx .Done ()
222
+ klog .InfoS ("Metrics HTTP handler shutting down..." )
223
+ _ = svr .Shutdown (context .Background ())
224
+ return nil
225
+ })
226
+
227
+ // Keep serving until terminated.
228
+ if err := svr .Serve (lis ); err != http .ErrServerClosed {
229
+ klog .ErrorS (err , "Metrics HTTP handler failed" )
230
+ return err
181
231
}
182
- }()
183
- return svr
232
+ klog .InfoS ("Metrics HTTP handler terminated" )
233
+ return nil
234
+ })
184
235
}
185
236
186
- func metricsHandlerWithAuthenticationAndAuthorization (cfg * rest.Config ) http.Handler {
237
+ func metricsHandlerWithAuthenticationAndAuthorization (cfg * rest.Config ) ( http.Handler , error ) {
187
238
h := promhttp .HandlerFor (
188
239
legacyregistry .DefaultGatherer ,
189
240
promhttp.HandlerOpts {},
190
241
)
191
242
httpClient , err := rest .HTTPClientFor (cfg )
192
243
if err != nil {
193
- klog .Fatalf ("failed to create http client for metrics auth: %v" , err )
244
+ klog .ErrorS (err , "Failed to create http client for metrics auth" )
245
+ return nil , err
194
246
}
195
247
196
248
filter , err := filters .WithAuthenticationAndAuthorization (cfg , httpClient )
197
249
if err != nil {
198
- klog .Fatalf ("failed to create metrics filter for auth: %v" , err )
250
+ klog .ErrorS (err , "Failed to create metrics filter for auth" )
251
+ return nil , err
199
252
}
200
253
metricsLogger := klog .LoggerWithValues (klog .NewKlogr (), "path" , defaultMetricsEndpoint )
201
254
metricsAuthHandler , err := filter (metricsLogger , h )
202
255
if err != nil {
203
- klog .Fatalf ("failed to create metrics auth handler: %v" , err )
256
+ klog .ErrorS (err , "Failed to create metrics auth handler" )
257
+ return nil , err
204
258
}
205
- return metricsAuthHandler
259
+ return metricsAuthHandler , nil
206
260
}
207
261
208
262
func validateFlags () error {
0 commit comments