Skip to content

Commit 2b99c6b

Browse files
committed
Remove fatal logging in executable code
All affected functions should now return an error. ErrorS is now being used instead of Fatal.
1 parent d5f5507 commit 2b99c6b

File tree

3 files changed

+193
-98
lines changed

3 files changed

+193
-98
lines changed

pkg/ext-proc/main.go

+113-55
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ import (
66
"fmt"
77
"net"
88
"net/http"
9+
"os"
910
"strconv"
1011

1112
"github.com/prometheus/client_golang/prometheus/promhttp"
13+
"golang.org/x/sync/errgroup"
1214
"google.golang.org/grpc"
1315
healthPb "google.golang.org/grpc/health/grpc_health_v1"
1416
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
@@ -72,17 +74,25 @@ func init() {
7274
}
7375

7476
func main() {
77+
if err := run(); err != nil {
78+
os.Exit(1)
79+
}
80+
}
81+
82+
func run() error {
7583
klog.InitFlags(nil)
7684
flag.Parse()
7785

7886
ctrl.SetLogger(klog.TODO())
7987
cfg, err := ctrl.GetConfig()
8088
if err != nil {
81-
klog.Fatalf("Failed to get rest config: %v", err)
89+
klog.ErrorS(err, "Failed to get rest config")
90+
return err
8291
}
8392
// Validate flags
8493
if err := validateFlags(); err != nil {
85-
klog.Fatalf("Failed to validate flags: %v", err)
94+
klog.ErrorS(err, "Failed to validate flags")
95+
return err
8696
}
8797

8898
// Print all flag values
@@ -105,104 +115,152 @@ func main() {
105115
Config: ctrl.GetConfigOrDie(),
106116
Datastore: datastore,
107117
}
108-
serverRunner.Setup()
118+
if err := serverRunner.Setup(); err != nil {
119+
klog.ErrorS(err, "Failed to setup ext-proc server")
120+
return err
121+
}
109122

110123
k8sClient, err := kubernetes.NewForConfigAndClient(cfg, serverRunner.Manager.GetHTTPClient())
111124
if err != nil {
112-
klog.Fatalf("Failed to create client: %v", err)
125+
klog.ErrorS(err, "Failed to create client")
126+
return err
113127
}
114128
datastore.SetClient(k8sClient)
115129

116-
// Start health and ext-proc servers in goroutines
117-
healthSvr := startHealthServer(datastore, *grpcHealthPort)
118-
extProcSvr := serverRunner.Start(&vllm.PodMetricsClientImpl{})
119-
// Start metrics handler
120-
metricsSvr := startMetricsHandler(*metricsPort, cfg)
130+
if err := serverRunner.Setup(); err != nil {
131+
klog.ErrorS(err, "Failed to setup server runner")
132+
return err
133+
}
121134

122-
// Start manager, blocking
123-
serverRunner.StartManager()
135+
// Start processing signals and init the group to manage goroutines.
136+
g, ctx := errgroup.WithContext(ctrl.SetupSignalHandler())
124137

125-
// Gracefully shutdown servers
126-
if healthSvr != nil {
127-
klog.Info("Health server shutting down")
128-
healthSvr.GracefulStop()
129-
}
130-
if extProcSvr != nil {
131-
klog.Info("Ext-proc server shutting down")
132-
extProcSvr.GracefulStop()
133-
}
134-
if metricsSvr != nil {
135-
klog.Info("Metrics server shutting down")
136-
if err := metricsSvr.Shutdown(context.Background()); err != nil {
137-
klog.Infof("Metrics server Shutdown: %v", err)
138-
}
139-
}
138+
// Start health server.
139+
startHealthServer(ctx, g, datastore, *grpcHealthPort)
140+
141+
// Start ext-proc server.
142+
g.Go(func() error {
143+
return serverRunner.Start(ctx, &vllm.PodMetricsClientImpl{})
144+
})
145+
146+
// Start metrics handler.
147+
startMetricsHandler(ctx, g, *metricsPort, cfg)
148+
149+
// Start manager.
150+
g.Go(func() error {
151+
return serverRunner.StartManager(ctx)
152+
})
140153

141-
klog.Info("All components shutdown")
154+
err = g.Wait()
155+
klog.InfoS("All components terminated")
156+
return err
142157
}
143158

144-
// startHealthServer starts the gRPC health probe server in a goroutine.
145-
func startHealthServer(ds *backend.K8sDatastore, port int) *grpc.Server {
146-
svr := grpc.NewServer()
147-
healthPb.RegisterHealthServer(svr, &healthServer{datastore: ds})
159+
// startHealthServer starts the gRPC health probe server using the given errgroup.
160+
func startHealthServer(ctx context.Context, g *errgroup.Group, ds *backend.K8sDatastore, port int) {
161+
g.Go(func() error {
162+
klog.InfoS("Health server starting...")
148163

149-
go func() {
164+
// Start listening.
150165
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
151166
if err != nil {
152-
klog.Fatalf("Health server failed to listen: %v", err)
167+
klog.ErrorS(err, "Health server failed to listen")
168+
return err
153169
}
154-
klog.Infof("Health server listening on port: %d", port)
155170

156-
// Blocking and will return when shutdown is complete.
171+
klog.InfoS("Health server listening", "port", port)
172+
173+
svr := grpc.NewServer()
174+
healthPb.RegisterHealthServer(svr, &healthServer{datastore: ds})
175+
176+
// Shutdown on context closed.
177+
g.Go(func() error {
178+
<-ctx.Done()
179+
klog.InfoS("Health server shutting down...")
180+
svr.GracefulStop()
181+
return nil
182+
})
183+
184+
// Keep serving until terminated.
157185
if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped {
158-
klog.Fatalf("Health server failed: %v", err)
186+
klog.ErrorS(err, "Health server failed")
187+
return err
159188
}
160-
klog.Info("Health server shutting down")
161-
}()
162-
return svr
189+
klog.InfoS("Health server terminated")
190+
return nil
191+
})
163192
}
164193

165-
func startMetricsHandler(port int, cfg *rest.Config) *http.Server {
166-
metrics.Register()
194+
// startMetricsHandler starts the metrics HTTP handler using the given errgroup.
195+
func startMetricsHandler(ctx context.Context, g *errgroup.Group, port int, cfg *rest.Config) {
196+
g.Go(func() error {
197+
metrics.Register()
198+
klog.InfoS("Metrics HTTP handler starting...")
199+
200+
// Start listening.
201+
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
202+
if err != nil {
203+
klog.ErrorS(err, "Metrics HTTP handler failed to listen")
204+
return err
205+
}
206+
207+
klog.InfoS("Metrics HTTP handler listening", "port", port)
167208

168-
var svr *http.Server
169-
go func() {
170-
klog.Info("Starting metrics HTTP handler ...")
209+
// Init HTTP server.
210+
h, err := metricsHandlerWithAuthenticationAndAuthorization(cfg)
211+
if err != nil {
212+
return err
213+
}
171214

172215
mux := http.NewServeMux()
173-
mux.Handle(defaultMetricsEndpoint, metricsHandlerWithAuthenticationAndAuthorization(cfg))
216+
mux.Handle(defaultMetricsEndpoint, h)
174217

175-
svr = &http.Server{
218+
svr := &http.Server{
176219
Addr: net.JoinHostPort("", strconv.Itoa(port)),
177220
Handler: mux,
178221
}
179-
if err := svr.ListenAndServe(); err != http.ErrServerClosed {
180-
klog.Fatalf("failed to start metrics HTTP handler: %v", err)
222+
223+
// Shutdown on interrupt.
224+
g.Go(func() error {
225+
<-ctx.Done()
226+
klog.InfoS("Metrics HTTP handler shutting down...")
227+
_ = svr.Shutdown(context.Background())
228+
return nil
229+
})
230+
231+
// Keep serving until terminated.
232+
if err := svr.Serve(lis); err != http.ErrServerClosed {
233+
klog.ErrorS(err, "Metrics HTTP handler failed")
234+
return err
181235
}
182-
}()
183-
return svr
236+
klog.InfoS("Metrics HTTP handler terminated")
237+
return nil
238+
})
184239
}
185240

186-
func metricsHandlerWithAuthenticationAndAuthorization(cfg *rest.Config) http.Handler {
241+
func metricsHandlerWithAuthenticationAndAuthorization(cfg *rest.Config) (http.Handler, error) {
187242
h := promhttp.HandlerFor(
188243
legacyregistry.DefaultGatherer,
189244
promhttp.HandlerOpts{},
190245
)
191246
httpClient, err := rest.HTTPClientFor(cfg)
192247
if err != nil {
193-
klog.Fatalf("failed to create http client for metrics auth: %v", err)
248+
klog.ErrorS(err, "Failed to create http client for metrics auth")
249+
return nil, err
194250
}
195251

196252
filter, err := filters.WithAuthenticationAndAuthorization(cfg, httpClient)
197253
if err != nil {
198-
klog.Fatalf("failed to create metrics filter for auth: %v", err)
254+
klog.ErrorS(err, "Failed to create metrics filter for auth")
255+
return nil, err
199256
}
200257
metricsLogger := klog.LoggerWithValues(klog.NewKlogr(), "path", defaultMetricsEndpoint)
201258
metricsAuthHandler, err := filter(metricsLogger, h)
202259
if err != nil {
203-
klog.Fatalf("failed to create metrics auth handler: %v", err)
260+
klog.ErrorS(err, "Failed to create metrics auth handler")
261+
return nil, err
204262
}
205-
return metricsAuthHandler
263+
return metricsAuthHandler, nil
206264
}
207265

208266
func validateFlags() error {

pkg/ext-proc/server/runserver.go

+64-34
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package server
22

33
import (
4+
"context"
5+
"errors"
46
"fmt"
57
"net"
68
"time"
@@ -53,12 +55,12 @@ func NewDefaultExtProcServerRunner() *ExtProcServerRunner {
5355
}
5456
}
5557

56-
// Setup creates the reconcilers for pools and models and starts the manager.
57-
func (r *ExtProcServerRunner) Setup() {
58+
// Setup creates the reconcilers for pools, models, and endpointSlices and starts the manager.
59+
func (r *ExtProcServerRunner) Setup() error {
5860
// Create a new manager to manage controllers
5961
mgr, err := ctrl.NewManager(r.Config, ctrl.Options{Scheme: r.Scheme})
6062
if err != nil {
61-
klog.Fatalf("Failed to create controller manager: %v", err)
63+
return fmt.Errorf("failed to create controller manager: %w", err)
6264
}
6365
r.Manager = mgr
6466

@@ -73,7 +75,7 @@ func (r *ExtProcServerRunner) Setup() {
7375
},
7476
Record: mgr.GetEventRecorderFor("InferencePool"),
7577
}).SetupWithManager(mgr); err != nil {
76-
klog.Fatalf("Failed setting up InferencePoolReconciler: %v", err)
78+
return fmt.Errorf("failed setting up InferencePoolReconciler: %w", err)
7779
}
7880

7981
if err := (&backend.InferenceModelReconciler{
@@ -86,52 +88,80 @@ func (r *ExtProcServerRunner) Setup() {
8688
},
8789
Record: mgr.GetEventRecorderFor("InferenceModel"),
8890
}).SetupWithManager(mgr); err != nil {
89-
klog.Fatalf("Failed setting up InferenceModelReconciler: %v", err)
91+
return fmt.Errorf("failed setting up InferenceModelReconciler: %w", err)
9092
}
93+
return nil
9194
}
9295

93-
// Start starts the Envoy external processor server in a goroutine.
96+
// Start starts the Envoy external processor server and blocks
97+
// until the context is canceled or an error encountered.
9498
func (r *ExtProcServerRunner) Start(
99+
ctx context.Context,
95100
podMetricsClient backend.PodMetricsClient,
96-
) *grpc.Server {
101+
) error {
102+
klog.InfoS("Ext-proc server starting...")
103+
104+
// Start listening.
97105
svr := grpc.NewServer()
106+
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", r.GrpcPort))
107+
if err != nil {
108+
klog.ErrorS(err, "Ext-proc server failed to listen", "port", r.GrpcPort)
109+
return err
110+
}
111+
// The listener will be closed by the server,
112+
// but the function may also return earlier on error.
113+
defer lis.Close()
98114

99-
go func() {
100-
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", r.GrpcPort))
101-
if err != nil {
102-
klog.Fatalf("Ext-proc server failed to listen: %v", err)
103-
}
104-
klog.Infof("Ext-proc server listening on port: %d", r.GrpcPort)
115+
klog.InfoS("Ext-proc server listening", "port", r.GrpcPort)
105116

106-
// Initialize backend provider
107-
pp := backend.NewProvider(podMetricsClient, r.Datastore)
108-
if err := pp.Init(r.RefreshPodsInterval, r.RefreshMetricsInterval); err != nil {
109-
klog.Fatalf("Failed to initialize backend provider: %v", err)
110-
}
117+
// Initialize backend provider
118+
pp := backend.NewProvider(podMetricsClient, r.Datastore)
119+
if err := pp.Init(r.RefreshPodsInterval, r.RefreshMetricsInterval); err != nil {
120+
klog.ErrorS(err, "Failed to initialize backend provider")
121+
return err
122+
}
111123

112-
// Register ext_proc handlers
113-
extProcPb.RegisterExternalProcessorServer(
114-
svr,
115-
handlers.NewServer(pp, scheduling.NewScheduler(pp), r.TargetEndpointKey, r.Datastore),
116-
)
124+
// Register ext_proc handlers
125+
extProcPb.RegisterExternalProcessorServer(
126+
svr,
127+
handlers.NewServer(pp, scheduling.NewScheduler(pp), r.TargetEndpointKey, r.Datastore),
128+
)
117129

118-
// Blocking and will return when shutdown is complete.
119-
if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped {
120-
klog.Fatalf("Ext-proc server failed: %v", err)
130+
// Terminate the server on context closed.
131+
// Make sure the goroutine does not leak.
132+
doneCh := make(chan struct{})
133+
defer close(doneCh)
134+
go func() {
135+
select {
136+
case <-ctx.Done():
137+
klog.InfoS("Ext-proc server shutting down...")
138+
svr.GracefulStop()
139+
case <-doneCh:
121140
}
122-
klog.Info("Ext-proc server shutting down")
123141
}()
124-
return svr
142+
143+
// Block until terminated.
144+
if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped {
145+
klog.ErrorS(err, "Ext-proc server failed")
146+
return err
147+
}
148+
klog.InfoS("Ext-proc server terminated")
149+
return nil
125150
}
126151

127-
func (r *ExtProcServerRunner) StartManager() {
152+
func (r *ExtProcServerRunner) StartManager(ctx context.Context) error {
128153
if r.Manager == nil {
129-
klog.Fatalf("Runner has no manager setup to run: %v", r)
154+
err := errors.New("runner manager is not set")
155+
klog.ErrorS(err, "Runner has no manager setup to run")
156+
return err
130157
}
158+
131159
// Start the controller manager. Blocking and will return when shutdown is complete.
132-
klog.Infof("Starting controller manager")
133-
if err := r.Manager.Start(ctrl.SetupSignalHandler()); err != nil {
134-
klog.Fatalf("Error starting controller manager: %v", err)
160+
klog.InfoS("Controller manager starting...")
161+
if err := r.Manager.Start(ctx); err != nil {
162+
klog.ErrorS(err, "Error starting controller manager")
163+
return err
135164
}
136-
klog.Info("Controller manager shutting down")
165+
klog.InfoS("Controller manager terminated")
166+
return nil
137167
}

0 commit comments

Comments
 (0)