Skip to content

Commit 03f317a

Browse files
committed
Remove fatal logging in executable code
All Fatal log call are removed. Also all runnable components are now managed by controller-runtime, implementing manager.Runnable interface.
1 parent 3ff0af8 commit 03f317a

File tree

5 files changed

+210
-114
lines changed

5 files changed

+210
-114
lines changed
+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package runnable
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
8+
"google.golang.org/grpc"
9+
ctrl "sigs.k8s.io/controller-runtime"
10+
"sigs.k8s.io/controller-runtime/pkg/manager"
11+
)
12+
13+
// GRPCServer converts the given gRPC server into a runnable.
14+
// The server name is just being used for logging.
15+
func GRPCServer(name string, srv *grpc.Server, port int) manager.Runnable {
16+
return manager.RunnableFunc(func(ctx context.Context) error {
17+
// Use "name" key as that is what manager.Server does as well.
18+
log := ctrl.Log.WithValues("name", name)
19+
log.Info("gRPC server starting")
20+
21+
// Start listening.
22+
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
23+
if err != nil {
24+
log.Error(err, "gRPC server failed to listen")
25+
return err
26+
}
27+
28+
log.Info("gRPC server listening", "port", port)
29+
30+
// Shutdown on context closed.
31+
// Terminate the server on context closed.
32+
// Make sure the goroutine does not leak.
33+
doneCh := make(chan struct{})
34+
defer close(doneCh)
35+
go func() {
36+
select {
37+
case <-ctx.Done():
38+
log.Info("gRPC server shutting down")
39+
srv.GracefulStop()
40+
case <-doneCh:
41+
}
42+
}()
43+
44+
// Keep serving until terminated.
45+
if err := srv.Serve(lis); err != nil && err != grpc.ErrServerStopped {
46+
log.Error(err, "gRPC server failed")
47+
return err
48+
}
49+
log.Info("gRPC server terminated")
50+
return nil
51+
})
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package runnable
2+
3+
import "sigs.k8s.io/controller-runtime/pkg/manager"
4+
5+
type leaderElection struct {
6+
manager.Runnable
7+
needsLeaderElection bool
8+
}
9+
10+
// LeaderElection wraps the given runnable to implement manager.LeaderElectionRunnable.
11+
func LeaderElection(runnable manager.Runnable, needsLeaderElection bool) manager.Runnable {
12+
return &leaderElection{
13+
Runnable: runnable,
14+
needsLeaderElection: needsLeaderElection,
15+
}
16+
}
17+
18+
// RequireLeaderElection wraps the given runnable, marking it as requiring leader election.
19+
func RequireLeaderElection(runnable manager.Runnable) manager.Runnable {
20+
return LeaderElection(runnable, true)
21+
}
22+
23+
// RequireLeaderElection wraps the given runnable, marking it as not requiring leader election.
24+
func NoLeaderElection(runnable manager.Runnable) manager.Runnable {
25+
return LeaderElection(runnable, false)
26+
}
27+
28+
// NeedLeaderElection implements manager.NeedLeaderElection interface.
29+
func (r *leaderElection) NeedLeaderElection() bool {
30+
return r.needsLeaderElection
31+
}

pkg/ext-proc/main.go

+74-70
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
package main
22

33
import (
4-
"context"
54
"flag"
65
"fmt"
76
"net"
87
"net/http"
8+
"os"
99
"strconv"
1010

1111
"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -14,6 +14,7 @@ import (
1414
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
1515
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
1616
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend/vllm"
17+
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/internal/runnable"
1718
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics"
1819
runserver "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/server"
1920
"k8s.io/apimachinery/pkg/runtime"
@@ -23,6 +24,7 @@ import (
2324
"k8s.io/component-base/metrics/legacyregistry"
2425
klog "k8s.io/klog/v2"
2526
ctrl "sigs.k8s.io/controller-runtime"
27+
"sigs.k8s.io/controller-runtime/pkg/manager"
2628
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
2729
)
2830

@@ -79,17 +81,25 @@ func init() {
7981
}
8082

8183
func main() {
84+
if err := run(); err != nil {
85+
os.Exit(1)
86+
}
87+
}
88+
89+
func run() error {
8290
klog.InitFlags(nil)
8391
flag.Parse()
8492

8593
ctrl.SetLogger(klog.TODO())
8694
cfg, err := ctrl.GetConfig()
8795
if err != nil {
88-
klog.Fatalf("Failed to get rest config: %v", err)
96+
klog.ErrorS(err, "Failed to get rest config")
97+
return err
8998
}
9099
// Validate flags
91100
if err := validateFlags(); err != nil {
92-
klog.Fatalf("Failed to validate flags: %v", err)
101+
klog.ErrorS(err, "Failed to validate flags")
102+
return err
93103
}
94104

95105
// Print all flag values
@@ -114,101 +124,95 @@ func main() {
114124
Config: ctrl.GetConfigOrDie(),
115125
Datastore: datastore,
116126
}
117-
serverRunner.Setup()
118-
119-
// Start health and ext-proc servers in goroutines
120-
healthSvr := startHealthServer(datastore, *grpcHealthPort)
121-
extProcSvr := serverRunner.Start(
122-
datastore,
123-
&vllm.PodMetricsClientImpl{},
124-
)
125-
// Start metrics handler
126-
metricsSvr := startMetricsHandler(*metricsPort, cfg)
127-
128-
// Start manager, blocking
129-
serverRunner.StartManager()
127+
if err := serverRunner.Setup(); err != nil {
128+
klog.ErrorS(err, "Failed to setup ext-proc server")
129+
return err
130+
}
131+
mgr := serverRunner.Manager
130132

131-
// Gracefully shutdown servers
132-
if healthSvr != nil {
133-
klog.Info("Health server shutting down")
134-
healthSvr.GracefulStop()
133+
// Register health server.
134+
if err := registerHealthServer(mgr, datastore, *grpcHealthPort); err != nil {
135+
return err
135136
}
136-
if extProcSvr != nil {
137-
klog.Info("Ext-proc server shutting down")
138-
extProcSvr.GracefulStop()
137+
138+
// Register ext-proc server.
139+
if err := mgr.Add(serverRunner.AsRunnable(datastore, &vllm.PodMetricsClientImpl{})); err != nil {
140+
klog.ErrorS(err, "Failed to register ext-proc server")
141+
return err
139142
}
140-
if metricsSvr != nil {
141-
klog.Info("Metrics server shutting down")
142-
if err := metricsSvr.Shutdown(context.Background()); err != nil {
143-
klog.Infof("Metrics server Shutdown: %v", err)
144-
}
143+
144+
// Register metrics handler.
145+
if err := registerMetricsHandler(mgr, *metricsPort, cfg); err != nil {
146+
return err
145147
}
146148

147-
klog.Info("All components shutdown")
149+
// Start the manager.
150+
return serverRunner.StartManager(ctrl.SetupSignalHandler())
148151
}
149152

150-
// startHealthServer starts the gRPC health probe server in a goroutine.
151-
func startHealthServer(ds *backend.K8sDatastore, port int) *grpc.Server {
152-
svr := grpc.NewServer()
153-
healthPb.RegisterHealthServer(svr, &healthServer{datastore: ds})
154-
155-
go func() {
156-
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
157-
if err != nil {
158-
klog.Fatalf("Health server failed to listen: %v", err)
159-
}
160-
klog.Infof("Health server listening on port: %d", port)
161-
162-
// Blocking and will return when shutdown is complete.
163-
if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped {
164-
klog.Fatalf("Health server failed: %v", err)
165-
}
166-
klog.Info("Health server shutting down")
167-
}()
168-
return svr
153+
// registerHealthServer adds the Health gRPC server as a Runnable to the given manager.
154+
func registerHealthServer(mgr manager.Manager, ds *backend.K8sDatastore, port int) error {
155+
srv := grpc.NewServer()
156+
healthPb.RegisterHealthServer(srv, &healthServer{datastore: ds})
157+
if err := mgr.Add(
158+
runnable.NoLeaderElection(runnable.GRPCServer("health", srv, port))); err != nil {
159+
klog.ErrorS(err, "Failed to register health server")
160+
return err
161+
}
162+
return nil
169163
}
170164

171-
func startMetricsHandler(port int, cfg *rest.Config) *http.Server {
165+
// registerMetricsHandler adds the metrics HTTP handler as a Runnable to the given manager.
166+
func registerMetricsHandler(mgr manager.Manager, port int, cfg *rest.Config) error {
172167
metrics.Register()
173168

174-
var svr *http.Server
175-
go func() {
176-
klog.Info("Starting metrics HTTP handler ...")
177-
178-
mux := http.NewServeMux()
179-
mux.Handle(defaultMetricsEndpoint, metricsHandlerWithAuthenticationAndAuthorization(cfg))
180-
181-
svr = &http.Server{
182-
Addr: net.JoinHostPort("", strconv.Itoa(port)),
183-
Handler: mux,
184-
}
185-
if err := svr.ListenAndServe(); err != http.ErrServerClosed {
186-
klog.Fatalf("failed to start metrics HTTP handler: %v", err)
187-
}
188-
}()
189-
return svr
169+
// Init HTTP server.
170+
h, err := metricsHandlerWithAuthenticationAndAuthorization(cfg)
171+
if err != nil {
172+
return err
173+
}
174+
175+
mux := http.NewServeMux()
176+
mux.Handle(defaultMetricsEndpoint, h)
177+
178+
srv := &http.Server{
179+
Addr: net.JoinHostPort("", strconv.Itoa(port)),
180+
Handler: mux,
181+
}
182+
183+
if err := mgr.Add(&manager.Server{
184+
Name: "metrics",
185+
Server: srv,
186+
}); err != nil {
187+
klog.ErrorS(err, "Failed to register metrics HTTP handler")
188+
return err
189+
}
190+
return nil
190191
}
191192

192-
func metricsHandlerWithAuthenticationAndAuthorization(cfg *rest.Config) http.Handler {
193+
func metricsHandlerWithAuthenticationAndAuthorization(cfg *rest.Config) (http.Handler, error) {
193194
h := promhttp.HandlerFor(
194195
legacyregistry.DefaultGatherer,
195196
promhttp.HandlerOpts{},
196197
)
197198
httpClient, err := rest.HTTPClientFor(cfg)
198199
if err != nil {
199-
klog.Fatalf("failed to create http client for metrics auth: %v", err)
200+
klog.ErrorS(err, "Failed to create http client for metrics auth")
201+
return nil, err
200202
}
201203

202204
filter, err := filters.WithAuthenticationAndAuthorization(cfg, httpClient)
203205
if err != nil {
204-
klog.Fatalf("failed to create metrics filter for auth: %v", err)
206+
klog.ErrorS(err, "Failed to create metrics filter for auth")
207+
return nil, err
205208
}
206209
metricsLogger := klog.LoggerWithValues(klog.NewKlogr(), "path", defaultMetricsEndpoint)
207210
metricsAuthHandler, err := filter(metricsLogger, h)
208211
if err != nil {
209-
klog.Fatalf("failed to create metrics auth handler: %v", err)
212+
klog.ErrorS(err, "Failed to create metrics auth handler")
213+
return nil, err
210214
}
211-
return metricsAuthHandler
215+
return metricsAuthHandler, nil
212216
}
213217

214218
func validateFlags() error {

0 commit comments

Comments
 (0)