Skip to content

Commit e7ac50e

Browse files
committed
Remove fatal logging in executable code
All affected functions should now return an error. ErrorS is now being used instead of Fatal.
1 parent 7639e6f commit e7ac50e

File tree

3 files changed

+185
-100
lines changed

3 files changed

+185
-100
lines changed

pkg/ext-proc/main.go

+106-57
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ import (
66
"fmt"
77
"net"
88
"net/http"
9+
"os"
910
"strconv"
1011

1112
"github.com/prometheus/client_golang/prometheus/promhttp"
13+
"golang.org/x/sync/errgroup"
1214
"google.golang.org/grpc"
1315
healthPb "google.golang.org/grpc/health/grpc_health_v1"
1416
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
@@ -79,17 +81,25 @@ func init() {
7981
}
8082

8183
func main() {
84+
if err := run(); err != nil {
85+
os.Exit(1)
86+
}
87+
}
88+
89+
func run() error {
8290
klog.InitFlags(nil)
8391
flag.Parse()
8492

8593
ctrl.SetLogger(klog.TODO())
8694
cfg, err := ctrl.GetConfig()
8795
if err != nil {
88-
klog.Fatalf("Failed to get rest config: %v", err)
96+
klog.ErrorS(err, "Failed to get rest config")
97+
return err
8998
}
9099
// Validate flags
91100
if err := validateFlags(); err != nil {
92-
klog.Fatalf("Failed to validate flags: %v", err)
101+
klog.ErrorS(err, "Failed to validate flags")
102+
return err
93103
}
94104

95105
// Print all flag values
@@ -114,101 +124,140 @@ func main() {
114124
Config: ctrl.GetConfigOrDie(),
115125
Datastore: datastore,
116126
}
117-
serverRunner.Setup()
127+
if err := serverRunner.Setup(); err != nil {
128+
klog.ErrorS(err, "Failed to setup server runner")
129+
return err
130+
}
118131

119-
// Start health and ext-proc servers in goroutines
120-
healthSvr := startHealthServer(datastore, *grpcHealthPort)
121-
extProcSvr := serverRunner.Start(
122-
datastore,
123-
&vllm.PodMetricsClientImpl{},
124-
)
125-
// Start metrics handler
126-
metricsSvr := startMetricsHandler(*metricsPort, cfg)
132+
// Start processing signals and init the group to manage goroutines.
133+
g, ctx := errgroup.WithContext(ctrl.SetupSignalHandler())
127134

128-
// Start manager, blocking
129-
serverRunner.StartManager()
135+
// Start health server.
136+
startHealthServer(ctx, g, datastore, *grpcHealthPort)
130137

131-
// Gracefully shutdown servers
132-
if healthSvr != nil {
133-
klog.Info("Health server shutting down")
134-
healthSvr.GracefulStop()
135-
}
136-
if extProcSvr != nil {
137-
klog.Info("Ext-proc server shutting down")
138-
extProcSvr.GracefulStop()
139-
}
140-
if metricsSvr != nil {
141-
klog.Info("Metrics server shutting down")
142-
if err := metricsSvr.Shutdown(context.Background()); err != nil {
143-
klog.Infof("Metrics server Shutdown: %v", err)
144-
}
145-
}
138+
// Start ext-proc server.
139+
g.Go(func() error {
140+
return serverRunner.Start(ctx, datastore, &vllm.PodMetricsClientImpl{})
141+
})
142+
143+
// Start metrics handler.
144+
startMetricsHandler(ctx, g, *metricsPort, cfg)
146145

147-
klog.Info("All components shutdown")
146+
// Start manager.
147+
g.Go(func() error {
148+
return serverRunner.StartManager(ctx)
149+
})
150+
151+
err = g.Wait()
152+
klog.InfoS("All components terminated")
153+
return err
148154
}
149155

150-
// startHealthServer starts the gRPC health probe server in a goroutine.
151-
func startHealthServer(ds *backend.K8sDatastore, port int) *grpc.Server {
152-
svr := grpc.NewServer()
153-
healthPb.RegisterHealthServer(svr, &healthServer{datastore: ds})
156+
// startHealthServer starts the gRPC health probe server using the given errgroup.
157+
func startHealthServer(ctx context.Context, g *errgroup.Group, ds *backend.K8sDatastore, port int) {
158+
g.Go(func() error {
159+
klog.InfoS("Health server starting...")
154160

155-
go func() {
161+
// Start listening.
156162
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
157163
if err != nil {
158-
klog.Fatalf("Health server failed to listen: %v", err)
164+
klog.ErrorS(err, "Health server failed to listen")
165+
return err
159166
}
160-
klog.Infof("Health server listening on port: %d", port)
161167

162-
// Blocking and will return when shutdown is complete.
168+
klog.InfoS("Health server listening", "port", port)
169+
170+
svr := grpc.NewServer()
171+
healthPb.RegisterHealthServer(svr, &healthServer{datastore: ds})
172+
173+
// Shutdown on context closed.
174+
g.Go(func() error {
175+
<-ctx.Done()
176+
klog.InfoS("Health server shutting down...")
177+
svr.GracefulStop()
178+
return nil
179+
})
180+
181+
// Keep serving until terminated.
163182
if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped {
164-
klog.Fatalf("Health server failed: %v", err)
183+
klog.ErrorS(err, "Health server failed")
184+
return err
165185
}
166-
klog.Info("Health server shutting down")
167-
}()
168-
return svr
186+
klog.InfoS("Health server terminated")
187+
return nil
188+
})
169189
}
170190

171-
func startMetricsHandler(port int, cfg *rest.Config) *http.Server {
172-
metrics.Register()
191+
// startMetricsHandler starts the metrics HTTP handler using the given errgroup.
192+
func startMetricsHandler(ctx context.Context, g *errgroup.Group, port int, cfg *rest.Config) {
193+
g.Go(func() error {
194+
metrics.Register()
195+
klog.InfoS("Metrics HTTP handler starting...")
196+
197+
// Start listening.
198+
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
199+
if err != nil {
200+
klog.ErrorS(err, "Metrics HTTP handler failed to listen")
201+
return err
202+
}
173203

174-
var svr *http.Server
175-
go func() {
176-
klog.Info("Starting metrics HTTP handler ...")
204+
klog.InfoS("Metrics HTTP handler listening", "port", port)
205+
206+
// Init HTTP server.
207+
h, err := metricsHandlerWithAuthenticationAndAuthorization(cfg)
208+
if err != nil {
209+
return err
210+
}
177211

178212
mux := http.NewServeMux()
179-
mux.Handle(defaultMetricsEndpoint, metricsHandlerWithAuthenticationAndAuthorization(cfg))
213+
mux.Handle(defaultMetricsEndpoint, h)
180214

181-
svr = &http.Server{
215+
svr := &http.Server{
182216
Addr: net.JoinHostPort("", strconv.Itoa(port)),
183217
Handler: mux,
184218
}
185-
if err := svr.ListenAndServe(); err != http.ErrServerClosed {
186-
klog.Fatalf("failed to start metrics HTTP handler: %v", err)
219+
220+
// Shutdown on interrupt.
221+
g.Go(func() error {
222+
<-ctx.Done()
223+
klog.InfoS("Metrics HTTP handler shutting down...")
224+
_ = svr.Shutdown(context.Background())
225+
return nil
226+
})
227+
228+
// Keep serving until terminated.
229+
if err := svr.Serve(lis); err != http.ErrServerClosed {
230+
klog.ErrorS(err, "Metrics HTTP handler failed")
231+
return err
187232
}
188-
}()
189-
return svr
233+
klog.InfoS("Metrics HTTP handler terminated")
234+
return nil
235+
})
190236
}
191237

192-
func metricsHandlerWithAuthenticationAndAuthorization(cfg *rest.Config) http.Handler {
238+
func metricsHandlerWithAuthenticationAndAuthorization(cfg *rest.Config) (http.Handler, error) {
193239
h := promhttp.HandlerFor(
194240
legacyregistry.DefaultGatherer,
195241
promhttp.HandlerOpts{},
196242
)
197243
httpClient, err := rest.HTTPClientFor(cfg)
198244
if err != nil {
199-
klog.Fatalf("failed to create http client for metrics auth: %v", err)
245+
klog.ErrorS(err, "Failed to create http client for metrics auth")
246+
return nil, err
200247
}
201248

202249
filter, err := filters.WithAuthenticationAndAuthorization(cfg, httpClient)
203250
if err != nil {
204-
klog.Fatalf("failed to create metrics filter for auth: %v", err)
251+
klog.ErrorS(err, "Failed to create metrics filter for auth")
252+
return nil, err
205253
}
206254
metricsLogger := klog.LoggerWithValues(klog.NewKlogr(), "path", defaultMetricsEndpoint)
207255
metricsAuthHandler, err := filter(metricsLogger, h)
208256
if err != nil {
209-
klog.Fatalf("failed to create metrics auth handler: %v", err)
257+
klog.ErrorS(err, "Failed to create metrics auth handler")
258+
return nil, err
210259
}
211-
return metricsAuthHandler
260+
return metricsAuthHandler, nil
212261
}
213262

214263
func validateFlags() error {

pkg/ext-proc/server/runserver.go

+64-34
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package server
22

33
import (
4+
"context"
5+
"errors"
46
"fmt"
57
"net"
68
"time"
@@ -60,11 +62,11 @@ func NewDefaultExtProcServerRunner() *ExtProcServerRunner {
6062
}
6163

6264
// Setup creates the reconcilers for pools, models, and endpointSlices and starts the manager.
63-
func (r *ExtProcServerRunner) Setup() {
65+
func (r *ExtProcServerRunner) Setup() error {
6466
// Create a new manager to manage controllers
6567
mgr, err := ctrl.NewManager(r.Config, ctrl.Options{Scheme: r.Scheme})
6668
if err != nil {
67-
klog.Fatalf("Failed to create controller manager: %v", err)
69+
return fmt.Errorf("failed to create controller manager: %w", err)
6870
}
6971
r.manager = mgr
7072

@@ -79,7 +81,7 @@ func (r *ExtProcServerRunner) Setup() {
7981
},
8082
Record: mgr.GetEventRecorderFor("InferencePool"),
8183
}).SetupWithManager(mgr); err != nil {
82-
klog.Fatalf("Failed setting up InferencePoolReconciler: %v", err)
84+
return fmt.Errorf("failed setting up InferencePoolReconciler: %w", err)
8385
}
8486

8587
if err := (&backend.InferenceModelReconciler{
@@ -92,7 +94,7 @@ func (r *ExtProcServerRunner) Setup() {
9294
},
9395
Record: mgr.GetEventRecorderFor("InferenceModel"),
9496
}).SetupWithManager(mgr); err != nil {
95-
klog.Fatalf("Failed setting up InferenceModelReconciler: %v", err)
97+
return fmt.Errorf("failed setting up InferenceModelReconciler: %w", err)
9698
}
9799

98100
if err := (&backend.EndpointSliceReconciler{
@@ -103,54 +105,82 @@ func (r *ExtProcServerRunner) Setup() {
103105
ServiceName: r.ServiceName,
104106
Zone: r.Zone,
105107
}).SetupWithManager(mgr); err != nil {
106-
klog.Fatalf("Failed setting up EndpointSliceReconciler: %v", err)
108+
return fmt.Errorf("failed setting up EndpointSliceReconciler: %w", err)
107109
}
110+
return nil
108111
}
109112

110-
// Start starts the Envoy external processor server in a goroutine.
113+
// Start starts the Envoy external processor server and blocks
114+
// until the context is canceled or an error encountered.
111115
func (r *ExtProcServerRunner) Start(
116+
ctx context.Context,
112117
podDatastore *backend.K8sDatastore,
113118
podMetricsClient backend.PodMetricsClient,
114-
) *grpc.Server {
119+
) error {
120+
klog.InfoS("Ext-proc server starting...")
121+
122+
// Start listening.
115123
svr := grpc.NewServer()
124+
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", r.GrpcPort))
125+
if err != nil {
126+
klog.ErrorS(err, "Ext-proc server failed to listen", "port", r.GrpcPort)
127+
return err
128+
}
129+
// The listener will be closed by the server,
130+
// but the function may also return earlier on error.
131+
defer lis.Close()
116132

117-
go func() {
118-
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", r.GrpcPort))
119-
if err != nil {
120-
klog.Fatalf("Ext-proc server failed to listen: %v", err)
121-
}
122-
klog.Infof("Ext-proc server listening on port: %d", r.GrpcPort)
133+
klog.InfoS("Ext-proc server listening", "port", r.GrpcPort)
123134

124-
// Initialize backend provider
125-
pp := backend.NewProvider(podMetricsClient, podDatastore)
126-
if err := pp.Init(r.RefreshPodsInterval, r.RefreshMetricsInterval); err != nil {
127-
klog.Fatalf("Failed to initialize backend provider: %v", err)
128-
}
135+
// Initialize backend provider.
136+
pp := backend.NewProvider(podMetricsClient, podDatastore)
137+
if err := pp.Init(r.RefreshPodsInterval, r.RefreshMetricsInterval); err != nil {
138+
klog.ErrorS(err, "Failed to initialize backend provider")
139+
return err
140+
}
129141

130-
// Register ext_proc handlers
131-
extProcPb.RegisterExternalProcessorServer(
132-
svr,
133-
handlers.NewServer(pp, scheduling.NewScheduler(pp), r.TargetPodHeader, r.Datastore),
134-
)
142+
// Register ext_proc handlers.
143+
extProcPb.RegisterExternalProcessorServer(
144+
svr,
145+
handlers.NewServer(pp, scheduling.NewScheduler(pp), r.TargetPodHeader, r.Datastore),
146+
)
135147

136-
// Blocking and will return when shutdown is complete.
137-
if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped {
138-
klog.Fatalf("Ext-proc server failed: %v", err)
148+
// Terminate the server on context closed.
149+
// Make sure the goroutine does not leak.
150+
doneCh := make(chan struct{})
151+
defer close(doneCh)
152+
go func() {
153+
select {
154+
case <-ctx.Done():
155+
klog.InfoS("Ext-proc server shutting down...")
156+
svr.GracefulStop()
157+
case <-doneCh:
139158
}
140-
klog.Info("Ext-proc server shutting down")
141159
}()
142-
return svr
160+
161+
// Block until terminated.
162+
if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped {
163+
klog.ErrorS(err, "Ext-proc server failed")
164+
return err
165+
}
166+
klog.InfoS("Ext-proc server terminated")
167+
return nil
143168
}
144169

145-
func (r *ExtProcServerRunner) StartManager() {
170+
func (r *ExtProcServerRunner) StartManager(ctx context.Context) error {
146171
if r.manager == nil {
147-
klog.Fatalf("Runner has no manager setup to run: %v", r)
172+
err := errors.New("runner manager is not set")
173+
klog.ErrorS(err, "Runner has no manager setup to run")
174+
return err
148175
}
176+
149177
// Start the controller manager. Blocking and will return when shutdown is complete.
150-
klog.Infof("Starting controller manager")
178+
klog.InfoS("Controller manager starting...")
151179
mgr := r.manager
152-
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
153-
klog.Fatalf("Error starting controller manager: %v", err)
180+
if err := mgr.Start(ctx); err != nil {
181+
klog.ErrorS(err, "Error starting controller manager")
182+
return err
154183
}
155-
klog.Info("Controller manager shutting down")
184+
klog.InfoS("Controller manager terminated")
185+
return nil
156186
}

0 commit comments

Comments
 (0)