Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove fatal log calls in executable code #265

Merged
merged 1 commit into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions pkg/ext-proc/internal/runnable/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package runnable

import (
"context"
"fmt"
"net"

"google.golang.org/grpc"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/manager"
)

// GRPCServer converts the given gRPC server into a runnable.
// The server name is just being used for logging.
func GRPCServer(name string, srv *grpc.Server, port int) manager.Runnable {
return manager.RunnableFunc(func(ctx context.Context) error {
// Use "name" key as that is what manager.Server does as well.
log := ctrl.Log.WithValues("name", name)
log.Info("gRPC server starting")

// Start listening.
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
log.Error(err, "gRPC server failed to listen")
return err
}

log.Info("gRPC server listening", "port", port)

// Shutdown on context closed.
// Terminate the server on context closed.
// Make sure the goroutine does not leak.
doneCh := make(chan struct{})
defer close(doneCh)
go func() {
select {
case <-ctx.Done():
log.Info("gRPC server shutting down")
srv.GracefulStop()
case <-doneCh:
}
}()

// Keep serving until terminated.
if err := srv.Serve(lis); err != nil && err != grpc.ErrServerStopped {
log.Error(err, "gRPC server failed")
return err
}
log.Info("gRPC server terminated")
return nil
})
}
31 changes: 31 additions & 0 deletions pkg/ext-proc/internal/runnable/leader_election.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package runnable

import "sigs.k8s.io/controller-runtime/pkg/manager"

type leaderElection struct {
manager.Runnable
needsLeaderElection bool
}

// LeaderElection wraps the given runnable to implement manager.LeaderElectionRunnable.
func LeaderElection(runnable manager.Runnable, needsLeaderElection bool) manager.Runnable {
return &leaderElection{
Runnable: runnable,
needsLeaderElection: needsLeaderElection,
}
}

// RequireLeaderElection wraps the given runnable, marking it as requiring leader election.
func RequireLeaderElection(runnable manager.Runnable) manager.Runnable {
return LeaderElection(runnable, true)
}

// RequireLeaderElection wraps the given runnable, marking it as not requiring leader election.
func NoLeaderElection(runnable manager.Runnable) manager.Runnable {
return LeaderElection(runnable, false)
}

// NeedLeaderElection implements manager.NeedLeaderElection interface.
func (r *leaderElection) NeedLeaderElection() bool {
return r.needsLeaderElection
}
140 changes: 72 additions & 68 deletions pkg/ext-proc/main.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package main

import (
"context"
"flag"
"fmt"
"net"
"net/http"
"os"
"strconv"

"github.com/prometheus/client_golang/prometheus/promhttp"
Expand All @@ -21,10 +21,12 @@ import (
klog "k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1"
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend/vllm"
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/internal/runnable"
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics"
runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/server"
"sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
Expand Down Expand Up @@ -88,6 +90,12 @@ func init() {
}

func main() {
if err := run(); err != nil {
os.Exit(1)
}
}

func run() error {
opts := zap.Options{
Development: true,
}
Expand All @@ -97,11 +105,13 @@ func main() {

cfg, err := ctrl.GetConfig()
if err != nil {
klog.Fatalf("Failed to get rest config: %v", err)
klog.ErrorS(err, "Failed to get rest config")
return err
}
// Validate flags
if err := validateFlags(); err != nil {
klog.Fatalf("Failed to validate flags: %v", err)
klog.ErrorS(err, "Failed to validate flags")
return err
}

// Print all flag values
Expand All @@ -127,37 +137,30 @@ func main() {
Config: ctrl.GetConfigOrDie(),
Datastore: datastore,
}
serverRunner.Setup()

// Start health and ext-proc servers in goroutines
healthSvr := startHealthServer(datastore, *grpcHealthPort)
extProcSvr := serverRunner.Start(
datastore,
&vllm.PodMetricsClientImpl{},
)
// Start metrics handler
metricsSvr := startMetricsHandler(*metricsPort, cfg)

// Start manager, blocking
serverRunner.StartManager()
if err := serverRunner.Setup(); err != nil {
klog.ErrorS(err, "Failed to setup ext-proc server")
return err
}
mgr := serverRunner.Manager

// Gracefully shutdown servers
if healthSvr != nil {
klog.Info("Health server shutting down")
healthSvr.GracefulStop()
// Register health server.
if err := registerHealthServer(mgr, datastore, *grpcHealthPort); err != nil {
return err
}
if extProcSvr != nil {
klog.Info("Ext-proc server shutting down")
extProcSvr.GracefulStop()

// Register ext-proc server.
if err := mgr.Add(serverRunner.AsRunnable(datastore, &vllm.PodMetricsClientImpl{})); err != nil {
klog.ErrorS(err, "Failed to register ext-proc server")
return err
}
if metricsSvr != nil {
klog.Info("Metrics server shutting down")
if err := metricsSvr.Shutdown(context.Background()); err != nil {
klog.Infof("Metrics server Shutdown: %v", err)
}

// Register metrics handler.
if err := registerMetricsHandler(mgr, *metricsPort, cfg); err != nil {
return err
}

klog.Info("All components shutdown")
// Start the manager.
return serverRunner.StartManager(ctrl.SetupSignalHandler())
}

func initLogging(opts *zap.Options) {
Expand All @@ -179,68 +182,69 @@ func initLogging(opts *zap.Options) {
klog.SetLogger(logger)
}

// startHealthServer starts the gRPC health probe server in a goroutine.
func startHealthServer(ds *backend.K8sDatastore, port int) *grpc.Server {
svr := grpc.NewServer()
healthPb.RegisterHealthServer(svr, &healthServer{datastore: ds})

go func() {
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
klog.Fatalf("Health server failed to listen: %v", err)
}
klog.Infof("Health server listening on port: %d", port)

// Blocking and will return when shutdown is complete.
if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped {
klog.Fatalf("Health server failed: %v", err)
}
klog.Info("Health server shutting down")
}()
return svr
// registerHealthServer adds the Health gRPC server as a Runnable to the given manager.
func registerHealthServer(mgr manager.Manager, ds *backend.K8sDatastore, port int) error {
srv := grpc.NewServer()
healthPb.RegisterHealthServer(srv, &healthServer{datastore: ds})
if err := mgr.Add(
runnable.NoLeaderElection(runnable.GRPCServer("health", srv, port))); err != nil {
klog.ErrorS(err, "Failed to register health server")
return err
}
return nil
}

func startMetricsHandler(port int, cfg *rest.Config) *http.Server {
// registerMetricsHandler adds the metrics HTTP handler as a Runnable to the given manager.
func registerMetricsHandler(mgr manager.Manager, port int, cfg *rest.Config) error {
metrics.Register()

var svr *http.Server
go func() {
klog.Info("Starting metrics HTTP handler ...")
// Init HTTP server.
h, err := metricsHandlerWithAuthenticationAndAuthorization(cfg)
if err != nil {
return err
}

mux := http.NewServeMux()
mux.Handle(defaultMetricsEndpoint, h)

mux := http.NewServeMux()
mux.Handle(defaultMetricsEndpoint, metricsHandlerWithAuthenticationAndAuthorization(cfg))
srv := &http.Server{
Addr: net.JoinHostPort("", strconv.Itoa(port)),
Handler: mux,
}

svr = &http.Server{
Addr: net.JoinHostPort("", strconv.Itoa(port)),
Handler: mux,
}
if err := svr.ListenAndServe(); err != http.ErrServerClosed {
klog.Fatalf("failed to start metrics HTTP handler: %v", err)
}
}()
return svr
if err := mgr.Add(&manager.Server{
Name: "metrics",
Server: srv,
}); err != nil {
klog.ErrorS(err, "Failed to register metrics HTTP handler")
return err
}
return nil
}

func metricsHandlerWithAuthenticationAndAuthorization(cfg *rest.Config) http.Handler {
func metricsHandlerWithAuthenticationAndAuthorization(cfg *rest.Config) (http.Handler, error) {
h := promhttp.HandlerFor(
legacyregistry.DefaultGatherer,
promhttp.HandlerOpts{},
)
httpClient, err := rest.HTTPClientFor(cfg)
if err != nil {
klog.Fatalf("failed to create http client for metrics auth: %v", err)
klog.ErrorS(err, "Failed to create http client for metrics auth")
return nil, err
}

filter, err := filters.WithAuthenticationAndAuthorization(cfg, httpClient)
if err != nil {
klog.Fatalf("failed to create metrics filter for auth: %v", err)
klog.ErrorS(err, "Failed to create metrics filter for auth")
return nil, err
}
metricsLogger := klog.LoggerWithValues(klog.NewKlogr(), "path", defaultMetricsEndpoint)
metricsAuthHandler, err := filter(metricsLogger, h)
if err != nil {
klog.Fatalf("failed to create metrics auth handler: %v", err)
klog.ErrorS(err, "Failed to create metrics auth handler")
return nil, err
}
return metricsAuthHandler
return metricsAuthHandler, nil
}

func validateFlags() error {
Expand Down
Loading