Skip to content

Commit 0662f1f

Browse files
authored
Remove fatal logging in executable code (#265)
All Fatal log call are removed. Also all runnable components are now managed by controller-runtime, implementing manager.Runnable interface.
1 parent 242b73e commit 0662f1f

File tree

6 files changed

+229
-112
lines changed

6 files changed

+229
-112
lines changed
+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package runnable
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
8+
"google.golang.org/grpc"
9+
ctrl "sigs.k8s.io/controller-runtime"
10+
"sigs.k8s.io/controller-runtime/pkg/manager"
11+
)
12+
13+
// GRPCServer converts the given gRPC server into a runnable.
14+
// The server name is just being used for logging.
15+
func GRPCServer(name string, srv *grpc.Server, port int) manager.Runnable {
16+
return manager.RunnableFunc(func(ctx context.Context) error {
17+
// Use "name" key as that is what manager.Server does as well.
18+
log := ctrl.Log.WithValues("name", name)
19+
log.Info("gRPC server starting")
20+
21+
// Start listening.
22+
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
23+
if err != nil {
24+
log.Error(err, "gRPC server failed to listen")
25+
return err
26+
}
27+
28+
log.Info("gRPC server listening", "port", port)
29+
30+
// Shutdown on context closed.
31+
// Terminate the server on context closed.
32+
// Make sure the goroutine does not leak.
33+
doneCh := make(chan struct{})
34+
defer close(doneCh)
35+
go func() {
36+
select {
37+
case <-ctx.Done():
38+
log.Info("gRPC server shutting down")
39+
srv.GracefulStop()
40+
case <-doneCh:
41+
}
42+
}()
43+
44+
// Keep serving until terminated.
45+
if err := srv.Serve(lis); err != nil && err != grpc.ErrServerStopped {
46+
log.Error(err, "gRPC server failed")
47+
return err
48+
}
49+
log.Info("gRPC server terminated")
50+
return nil
51+
})
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package runnable
2+
3+
import "sigs.k8s.io/controller-runtime/pkg/manager"
4+
5+
type leaderElection struct {
6+
manager.Runnable
7+
needsLeaderElection bool
8+
}
9+
10+
// LeaderElection wraps the given runnable to implement manager.LeaderElectionRunnable.
11+
func LeaderElection(runnable manager.Runnable, needsLeaderElection bool) manager.Runnable {
12+
return &leaderElection{
13+
Runnable: runnable,
14+
needsLeaderElection: needsLeaderElection,
15+
}
16+
}
17+
18+
// RequireLeaderElection wraps the given runnable, marking it as requiring leader election.
19+
func RequireLeaderElection(runnable manager.Runnable) manager.Runnable {
20+
return LeaderElection(runnable, true)
21+
}
22+
23+
// RequireLeaderElection wraps the given runnable, marking it as not requiring leader election.
24+
func NoLeaderElection(runnable manager.Runnable) manager.Runnable {
25+
return LeaderElection(runnable, false)
26+
}
27+
28+
// NeedLeaderElection implements manager.NeedLeaderElection interface.
29+
func (r *leaderElection) NeedLeaderElection() bool {
30+
return r.needsLeaderElection
31+
}

pkg/ext-proc/main.go

+72-68
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
package main
22

33
import (
4-
"context"
54
"flag"
65
"fmt"
76
"net"
87
"net/http"
8+
"os"
99
"strconv"
1010

1111
"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -21,10 +21,12 @@ import (
2121
klog "k8s.io/klog/v2"
2222
ctrl "sigs.k8s.io/controller-runtime"
2323
"sigs.k8s.io/controller-runtime/pkg/log/zap"
24+
"sigs.k8s.io/controller-runtime/pkg/manager"
2425
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
2526
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1"
2627
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
2728
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend/vllm"
29+
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/internal/runnable"
2830
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics"
2931
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/server"
3032
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
@@ -88,6 +90,12 @@ func init() {
8890
}
8991

9092
func main() {
93+
if err := run(); err != nil {
94+
os.Exit(1)
95+
}
96+
}
97+
98+
func run() error {
9199
opts := zap.Options{
92100
Development: true,
93101
}
@@ -97,11 +105,13 @@ func main() {
97105

98106
cfg, err := ctrl.GetConfig()
99107
if err != nil {
100-
klog.Fatalf("Failed to get rest config: %v", err)
108+
klog.ErrorS(err, "Failed to get rest config")
109+
return err
101110
}
102111
// Validate flags
103112
if err := validateFlags(); err != nil {
104-
klog.Fatalf("Failed to validate flags: %v", err)
113+
klog.ErrorS(err, "Failed to validate flags")
114+
return err
105115
}
106116

107117
// Print all flag values
@@ -127,37 +137,30 @@ func main() {
127137
Config: ctrl.GetConfigOrDie(),
128138
Datastore: datastore,
129139
}
130-
serverRunner.Setup()
131-
132-
// Start health and ext-proc servers in goroutines
133-
healthSvr := startHealthServer(datastore, *grpcHealthPort)
134-
extProcSvr := serverRunner.Start(
135-
datastore,
136-
&vllm.PodMetricsClientImpl{},
137-
)
138-
// Start metrics handler
139-
metricsSvr := startMetricsHandler(*metricsPort, cfg)
140-
141-
// Start manager, blocking
142-
serverRunner.StartManager()
140+
if err := serverRunner.Setup(); err != nil {
141+
klog.ErrorS(err, "Failed to setup ext-proc server")
142+
return err
143+
}
144+
mgr := serverRunner.Manager
143145

144-
// Gracefully shutdown servers
145-
if healthSvr != nil {
146-
klog.Info("Health server shutting down")
147-
healthSvr.GracefulStop()
146+
// Register health server.
147+
if err := registerHealthServer(mgr, datastore, *grpcHealthPort); err != nil {
148+
return err
148149
}
149-
if extProcSvr != nil {
150-
klog.Info("Ext-proc server shutting down")
151-
extProcSvr.GracefulStop()
150+
151+
// Register ext-proc server.
152+
if err := mgr.Add(serverRunner.AsRunnable(datastore, &vllm.PodMetricsClientImpl{})); err != nil {
153+
klog.ErrorS(err, "Failed to register ext-proc server")
154+
return err
152155
}
153-
if metricsSvr != nil {
154-
klog.Info("Metrics server shutting down")
155-
if err := metricsSvr.Shutdown(context.Background()); err != nil {
156-
klog.Infof("Metrics server Shutdown: %v", err)
157-
}
156+
157+
// Register metrics handler.
158+
if err := registerMetricsHandler(mgr, *metricsPort, cfg); err != nil {
159+
return err
158160
}
159161

160-
klog.Info("All components shutdown")
162+
// Start the manager.
163+
return serverRunner.StartManager(ctrl.SetupSignalHandler())
161164
}
162165

163166
func initLogging(opts *zap.Options) {
@@ -179,68 +182,69 @@ func initLogging(opts *zap.Options) {
179182
klog.SetLogger(logger)
180183
}
181184

182-
// startHealthServer starts the gRPC health probe server in a goroutine.
183-
func startHealthServer(ds *backend.K8sDatastore, port int) *grpc.Server {
184-
svr := grpc.NewServer()
185-
healthPb.RegisterHealthServer(svr, &healthServer{datastore: ds})
186-
187-
go func() {
188-
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
189-
if err != nil {
190-
klog.Fatalf("Health server failed to listen: %v", err)
191-
}
192-
klog.Infof("Health server listening on port: %d", port)
193-
194-
// Blocking and will return when shutdown is complete.
195-
if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped {
196-
klog.Fatalf("Health server failed: %v", err)
197-
}
198-
klog.Info("Health server shutting down")
199-
}()
200-
return svr
185+
// registerHealthServer adds the Health gRPC server as a Runnable to the given manager.
186+
func registerHealthServer(mgr manager.Manager, ds *backend.K8sDatastore, port int) error {
187+
srv := grpc.NewServer()
188+
healthPb.RegisterHealthServer(srv, &healthServer{datastore: ds})
189+
if err := mgr.Add(
190+
runnable.NoLeaderElection(runnable.GRPCServer("health", srv, port))); err != nil {
191+
klog.ErrorS(err, "Failed to register health server")
192+
return err
193+
}
194+
return nil
201195
}
202196

203-
func startMetricsHandler(port int, cfg *rest.Config) *http.Server {
197+
// registerMetricsHandler adds the metrics HTTP handler as a Runnable to the given manager.
198+
func registerMetricsHandler(mgr manager.Manager, port int, cfg *rest.Config) error {
204199
metrics.Register()
205200

206-
var svr *http.Server
207-
go func() {
208-
klog.Info("Starting metrics HTTP handler ...")
201+
// Init HTTP server.
202+
h, err := metricsHandlerWithAuthenticationAndAuthorization(cfg)
203+
if err != nil {
204+
return err
205+
}
206+
207+
mux := http.NewServeMux()
208+
mux.Handle(defaultMetricsEndpoint, h)
209209

210-
mux := http.NewServeMux()
211-
mux.Handle(defaultMetricsEndpoint, metricsHandlerWithAuthenticationAndAuthorization(cfg))
210+
srv := &http.Server{
211+
Addr: net.JoinHostPort("", strconv.Itoa(port)),
212+
Handler: mux,
213+
}
212214

213-
svr = &http.Server{
214-
Addr: net.JoinHostPort("", strconv.Itoa(port)),
215-
Handler: mux,
216-
}
217-
if err := svr.ListenAndServe(); err != http.ErrServerClosed {
218-
klog.Fatalf("failed to start metrics HTTP handler: %v", err)
219-
}
220-
}()
221-
return svr
215+
if err := mgr.Add(&manager.Server{
216+
Name: "metrics",
217+
Server: srv,
218+
}); err != nil {
219+
klog.ErrorS(err, "Failed to register metrics HTTP handler")
220+
return err
221+
}
222+
return nil
222223
}
223224

224-
func metricsHandlerWithAuthenticationAndAuthorization(cfg *rest.Config) http.Handler {
225+
func metricsHandlerWithAuthenticationAndAuthorization(cfg *rest.Config) (http.Handler, error) {
225226
h := promhttp.HandlerFor(
226227
legacyregistry.DefaultGatherer,
227228
promhttp.HandlerOpts{},
228229
)
229230
httpClient, err := rest.HTTPClientFor(cfg)
230231
if err != nil {
231-
klog.Fatalf("failed to create http client for metrics auth: %v", err)
232+
klog.ErrorS(err, "Failed to create http client for metrics auth")
233+
return nil, err
232234
}
233235

234236
filter, err := filters.WithAuthenticationAndAuthorization(cfg, httpClient)
235237
if err != nil {
236-
klog.Fatalf("failed to create metrics filter for auth: %v", err)
238+
klog.ErrorS(err, "Failed to create metrics filter for auth")
239+
return nil, err
237240
}
238241
metricsLogger := klog.LoggerWithValues(klog.NewKlogr(), "path", defaultMetricsEndpoint)
239242
metricsAuthHandler, err := filter(metricsLogger, h)
240243
if err != nil {
241-
klog.Fatalf("failed to create metrics auth handler: %v", err)
244+
klog.ErrorS(err, "Failed to create metrics auth handler")
245+
return nil, err
242246
}
243-
return metricsAuthHandler
247+
return metricsAuthHandler, nil
244248
}
245249

246250
func validateFlags() error {

0 commit comments

Comments
 (0)