diff --git a/pkg/ext-proc/internal/runnable/grpc.go b/pkg/ext-proc/internal/runnable/grpc.go new file mode 100644 index 00000000..a619f788 --- /dev/null +++ b/pkg/ext-proc/internal/runnable/grpc.go @@ -0,0 +1,52 @@ +package runnable + +import ( + "context" + "fmt" + "net" + + "google.golang.org/grpc" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +// GRPCServer converts the given gRPC server into a runnable. +// The server name is just being used for logging. +func GRPCServer(name string, srv *grpc.Server, port int) manager.Runnable { + return manager.RunnableFunc(func(ctx context.Context) error { + // Use "name" key as that is what manager.Server does as well. + log := ctrl.Log.WithValues("name", name) + log.Info("gRPC server starting") + + // Start listening. + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + log.Error(err, "gRPC server failed to listen") + return err + } + + log.Info("gRPC server listening", "port", port) + + // Shutdown on context closed. + // Terminate the server on context closed. + // Make sure the goroutine does not leak. + doneCh := make(chan struct{}) + defer close(doneCh) + go func() { + select { + case <-ctx.Done(): + log.Info("gRPC server shutting down") + srv.GracefulStop() + case <-doneCh: + } + }() + + // Keep serving until terminated. + if err := srv.Serve(lis); err != nil && err != grpc.ErrServerStopped { + log.Error(err, "gRPC server failed") + return err + } + log.Info("gRPC server terminated") + return nil + }) +} diff --git a/pkg/ext-proc/internal/runnable/leader_election.go b/pkg/ext-proc/internal/runnable/leader_election.go new file mode 100644 index 00000000..00dfc782 --- /dev/null +++ b/pkg/ext-proc/internal/runnable/leader_election.go @@ -0,0 +1,31 @@ +package runnable + +import "sigs.k8s.io/controller-runtime/pkg/manager" + +type leaderElection struct { + manager.Runnable + needsLeaderElection bool +} + +// LeaderElection wraps the given runnable to implement manager.LeaderElectionRunnable. +func LeaderElection(runnable manager.Runnable, needsLeaderElection bool) manager.Runnable { + return &leaderElection{ + Runnable: runnable, + needsLeaderElection: needsLeaderElection, + } +} + +// RequireLeaderElection wraps the given runnable, marking it as requiring leader election. +func RequireLeaderElection(runnable manager.Runnable) manager.Runnable { + return LeaderElection(runnable, true) +} + +// RequireLeaderElection wraps the given runnable, marking it as not requiring leader election. +func NoLeaderElection(runnable manager.Runnable) manager.Runnable { + return LeaderElection(runnable, false) +} + +// NeedLeaderElection implements manager.NeedLeaderElection interface. +func (r *leaderElection) NeedLeaderElection() bool { + return r.needsLeaderElection +} diff --git a/pkg/ext-proc/main.go b/pkg/ext-proc/main.go index 6bdaae66..d51435ac 100644 --- a/pkg/ext-proc/main.go +++ b/pkg/ext-proc/main.go @@ -1,11 +1,11 @@ package main import ( - "context" "flag" "fmt" "net" "net/http" + "os" "strconv" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -21,10 +21,12 @@ import ( klog "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/metrics/filters" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend/vllm" + "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/internal/runnable" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics" runserver "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/server" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging" @@ -88,6 +90,12 @@ func init() { } func main() { + if err := run(); err != nil { + os.Exit(1) + } +} + +func run() error { opts := zap.Options{ Development: true, } @@ -97,11 +105,13 @@ func main() { cfg, err := ctrl.GetConfig() if err != nil { - klog.Fatalf("Failed to get rest config: %v", err) + klog.ErrorS(err, "Failed to get rest config") + return err } // Validate flags if err := validateFlags(); err != nil { - klog.Fatalf("Failed to validate flags: %v", err) + klog.ErrorS(err, "Failed to validate flags") + return err } // Print all flag values @@ -127,37 +137,30 @@ func main() { Config: ctrl.GetConfigOrDie(), Datastore: datastore, } - serverRunner.Setup() - - // Start health and ext-proc servers in goroutines - healthSvr := startHealthServer(datastore, *grpcHealthPort) - extProcSvr := serverRunner.Start( - datastore, - &vllm.PodMetricsClientImpl{}, - ) - // Start metrics handler - metricsSvr := startMetricsHandler(*metricsPort, cfg) - - // Start manager, blocking - serverRunner.StartManager() + if err := serverRunner.Setup(); err != nil { + klog.ErrorS(err, "Failed to setup ext-proc server") + return err + } + mgr := serverRunner.Manager - // Gracefully shutdown servers - if healthSvr != nil { - klog.Info("Health server shutting down") - healthSvr.GracefulStop() + // Register health server. + if err := registerHealthServer(mgr, datastore, *grpcHealthPort); err != nil { + return err } - if extProcSvr != nil { - klog.Info("Ext-proc server shutting down") - extProcSvr.GracefulStop() + + // Register ext-proc server. + if err := mgr.Add(serverRunner.AsRunnable(datastore, &vllm.PodMetricsClientImpl{})); err != nil { + klog.ErrorS(err, "Failed to register ext-proc server") + return err } - if metricsSvr != nil { - klog.Info("Metrics server shutting down") - if err := metricsSvr.Shutdown(context.Background()); err != nil { - klog.Infof("Metrics server Shutdown: %v", err) - } + + // Register metrics handler. + if err := registerMetricsHandler(mgr, *metricsPort, cfg); err != nil { + return err } - klog.Info("All components shutdown") + // Start the manager. + return serverRunner.StartManager(ctrl.SetupSignalHandler()) } func initLogging(opts *zap.Options) { @@ -179,68 +182,69 @@ func initLogging(opts *zap.Options) { klog.SetLogger(logger) } -// startHealthServer starts the gRPC health probe server in a goroutine. -func startHealthServer(ds *backend.K8sDatastore, port int) *grpc.Server { - svr := grpc.NewServer() - healthPb.RegisterHealthServer(svr, &healthServer{datastore: ds}) - - go func() { - lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) - if err != nil { - klog.Fatalf("Health server failed to listen: %v", err) - } - klog.Infof("Health server listening on port: %d", port) - - // Blocking and will return when shutdown is complete. - if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped { - klog.Fatalf("Health server failed: %v", err) - } - klog.Info("Health server shutting down") - }() - return svr +// registerHealthServer adds the Health gRPC server as a Runnable to the given manager. +func registerHealthServer(mgr manager.Manager, ds *backend.K8sDatastore, port int) error { + srv := grpc.NewServer() + healthPb.RegisterHealthServer(srv, &healthServer{datastore: ds}) + if err := mgr.Add( + runnable.NoLeaderElection(runnable.GRPCServer("health", srv, port))); err != nil { + klog.ErrorS(err, "Failed to register health server") + return err + } + return nil } -func startMetricsHandler(port int, cfg *rest.Config) *http.Server { +// registerMetricsHandler adds the metrics HTTP handler as a Runnable to the given manager. +func registerMetricsHandler(mgr manager.Manager, port int, cfg *rest.Config) error { metrics.Register() - var svr *http.Server - go func() { - klog.Info("Starting metrics HTTP handler ...") + // Init HTTP server. + h, err := metricsHandlerWithAuthenticationAndAuthorization(cfg) + if err != nil { + return err + } + + mux := http.NewServeMux() + mux.Handle(defaultMetricsEndpoint, h) - mux := http.NewServeMux() - mux.Handle(defaultMetricsEndpoint, metricsHandlerWithAuthenticationAndAuthorization(cfg)) + srv := &http.Server{ + Addr: net.JoinHostPort("", strconv.Itoa(port)), + Handler: mux, + } - svr = &http.Server{ - Addr: net.JoinHostPort("", strconv.Itoa(port)), - Handler: mux, - } - if err := svr.ListenAndServe(); err != http.ErrServerClosed { - klog.Fatalf("failed to start metrics HTTP handler: %v", err) - } - }() - return svr + if err := mgr.Add(&manager.Server{ + Name: "metrics", + Server: srv, + }); err != nil { + klog.ErrorS(err, "Failed to register metrics HTTP handler") + return err + } + return nil } -func metricsHandlerWithAuthenticationAndAuthorization(cfg *rest.Config) http.Handler { +func metricsHandlerWithAuthenticationAndAuthorization(cfg *rest.Config) (http.Handler, error) { h := promhttp.HandlerFor( legacyregistry.DefaultGatherer, promhttp.HandlerOpts{}, ) httpClient, err := rest.HTTPClientFor(cfg) if err != nil { - klog.Fatalf("failed to create http client for metrics auth: %v", err) + klog.ErrorS(err, "Failed to create http client for metrics auth") + return nil, err } filter, err := filters.WithAuthenticationAndAuthorization(cfg, httpClient) if err != nil { - klog.Fatalf("failed to create metrics filter for auth: %v", err) + klog.ErrorS(err, "Failed to create metrics filter for auth") + return nil, err } metricsLogger := klog.LoggerWithValues(klog.NewKlogr(), "path", defaultMetricsEndpoint) metricsAuthHandler, err := filter(metricsLogger, h) if err != nil { - klog.Fatalf("failed to create metrics auth handler: %v", err) + klog.ErrorS(err, "Failed to create metrics auth handler") + return nil, err } - return metricsAuthHandler + return metricsAuthHandler, nil } func validateFlags() error { diff --git a/pkg/ext-proc/server/runserver.go b/pkg/ext-proc/server/runserver.go index affb4b6c..71499e8f 100644 --- a/pkg/ext-proc/server/runserver.go +++ b/pkg/ext-proc/server/runserver.go @@ -1,8 +1,9 @@ package server import ( + "context" + "errors" "fmt" - "net" "time" extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3" @@ -12,8 +13,10 @@ import ( "k8s.io/client-go/rest" klog "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/handlers" + "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/internal/runnable" "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling" ) @@ -31,7 +34,7 @@ type ExtProcServerRunner struct { Scheme *runtime.Scheme Config *rest.Config Datastore *backend.K8sDatastore - manager ctrl.Manager + Manager ctrl.Manager } // Default values for CLI flags in main @@ -63,13 +66,13 @@ func NewDefaultExtProcServerRunner() *ExtProcServerRunner { } // Setup creates the reconcilers for pools, models, and endpointSlices and starts the manager. -func (r *ExtProcServerRunner) Setup() { +func (r *ExtProcServerRunner) Setup() error { // Create a new manager to manage controllers mgr, err := ctrl.NewManager(r.Config, ctrl.Options{Scheme: r.Scheme}) if err != nil { - klog.Fatalf("Failed to create controller manager: %v", err) + return fmt.Errorf("failed to create controller manager: %w", err) } - r.manager = mgr + r.Manager = mgr // Create the controllers and register them with the manager if err := (&backend.InferencePoolReconciler{ @@ -82,7 +85,7 @@ func (r *ExtProcServerRunner) Setup() { }, Record: mgr.GetEventRecorderFor("InferencePool"), }).SetupWithManager(mgr); err != nil { - klog.Fatalf("Failed setting up InferencePoolReconciler: %v", err) + return fmt.Errorf("failed setting up InferencePoolReconciler: %w", err) } if err := (&backend.InferenceModelReconciler{ @@ -95,7 +98,7 @@ func (r *ExtProcServerRunner) Setup() { }, Record: mgr.GetEventRecorderFor("InferenceModel"), }).SetupWithManager(mgr); err != nil { - klog.Fatalf("Failed setting up InferenceModelReconciler: %v", err) + return fmt.Errorf("failed setting up InferenceModelReconciler: %w", err) } if err := (&backend.EndpointSliceReconciler{ @@ -106,54 +109,50 @@ func (r *ExtProcServerRunner) Setup() { ServiceName: r.ServiceName, Zone: r.Zone, }).SetupWithManager(mgr); err != nil { - klog.Fatalf("Failed setting up EndpointSliceReconciler: %v", err) + return fmt.Errorf("failed setting up EndpointSliceReconciler: %v", err) } + return nil } -// Start starts the Envoy external processor server in a goroutine. -func (r *ExtProcServerRunner) Start( +// AsRunnable returns a Runnable that can be used to start the ext-proc gRPC server. +// The runnable implements LeaderElectionRunnable with leader election disabled. +func (r *ExtProcServerRunner) AsRunnable( podDatastore *backend.K8sDatastore, podMetricsClient backend.PodMetricsClient, -) *grpc.Server { - svr := grpc.NewServer() - - go func() { - lis, err := net.Listen("tcp", fmt.Sprintf(":%d", r.GrpcPort)) - if err != nil { - klog.Fatalf("Ext-proc server failed to listen: %v", err) - } - klog.Infof("Ext-proc server listening on port: %d", r.GrpcPort) - +) manager.Runnable { + return runnable.NoLeaderElection(manager.RunnableFunc(func(ctx context.Context) error { // Initialize backend provider pp := backend.NewProvider(podMetricsClient, podDatastore) if err := pp.Init(r.RefreshPodsInterval, r.RefreshMetricsInterval, r.RefreshPrometheusMetricsInterval); err != nil { - klog.Fatalf("Failed to initialize backend provider: %v", err) + klog.ErrorS(err, "Failed to initialize backend provider") + return err } - // Register ext_proc handlers + // Init the server. + srv := grpc.NewServer() extProcPb.RegisterExternalProcessorServer( - svr, + srv, handlers.NewServer(pp, scheduling.NewScheduler(pp), r.TargetEndpointKey, r.Datastore), ) - // Blocking and will return when shutdown is complete. - if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped { - klog.Fatalf("Ext-proc server failed: %v", err) - } - klog.Info("Ext-proc server shutting down") - }() - return svr + // Forward to the gRPC runnable. + return runnable.GRPCServer("ext-proc", srv, r.GrpcPort).Start(ctx) + })) } -func (r *ExtProcServerRunner) StartManager() { - if r.manager == nil { - klog.Fatalf("Runner has no manager setup to run: %v", r) +func (r *ExtProcServerRunner) StartManager(ctx context.Context) error { + if r.Manager == nil { + err := errors.New("runner manager is not set") + klog.ErrorS(err, "Runner has no manager setup to run") + return err } + // Start the controller manager. Blocking and will return when shutdown is complete. - klog.Infof("Starting controller manager") - mgr := r.manager - if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { - klog.Fatalf("Error starting controller manager: %v", err) + klog.InfoS("Controller manager starting") + if err := r.Manager.Start(ctx); err != nil { + klog.ErrorS(err, "Error starting controller manager") + return err } - klog.Info("Controller manager shutting down") + klog.InfoS("Controller manager terminated") + return nil } diff --git a/pkg/ext-proc/server/runserver_test.go b/pkg/ext-proc/server/runserver_test.go new file mode 100644 index 00000000..df2081aa --- /dev/null +++ b/pkg/ext-proc/server/runserver_test.go @@ -0,0 +1,21 @@ +package server_test + +import ( + "testing" + + "sigs.k8s.io/controller-runtime/pkg/manager" + + "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/server" +) + +func TestRunnable(t *testing.T) { + // Make sure AsRunnable() does not use leader election. + runner := server.NewDefaultExtProcServerRunner().AsRunnable(nil, nil) + r, ok := runner.(manager.LeaderElectionRunnable) + if !ok { + t.Fatal("runner is not LeaderElectionRunnable") + } + if r.NeedLeaderElection() { + t.Error("runner returned NeedLeaderElection = true, expected false") + } +} diff --git a/test/integration/hermetic_test.go b/test/integration/hermetic_test.go index e94be1a0..74c9f049 100644 --- a/test/integration/hermetic_test.go +++ b/test/integration/hermetic_test.go @@ -28,6 +28,7 @@ import ( k8syaml "k8s.io/apimachinery/pkg/util/yaml" clientgoscheme "k8s.io/client-go/kubernetes/scheme" klog "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" k8sclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/envtest" "sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1" @@ -406,7 +407,6 @@ func TestKubeInferenceModelRequest(t *testing.T) { } func setUpHermeticServer(pods []*backend.PodMetrics) (client extProcPb.ExternalProcessor_ProcessClient, cleanup func()) { - ps := make(backend.PodSet) pms := make(map[backend.Pod]*backend.PodMetrics) for _, pod := range pods { @@ -415,7 +415,14 @@ func setUpHermeticServer(pods []*backend.PodMetrics) (client extProcPb.ExternalP } pmc := &backend.FakePodMetricsClient{Res: pms} - server := serverRunner.Start(backend.NewK8sDataStore(backend.WithPods(pods)), pmc) + serverCtx, stopServer := context.WithCancel(context.Background()) + go func() { + if err := serverRunner.AsRunnable( + backend.NewK8sDataStore(backend.WithPods(pods)), pmc, + ).Start(serverCtx); err != nil { + log.Fatalf("Failed to start ext-proc server: %v", err) + } + }() // Wait the reconciler to populate the datastore. time.Sleep(10 * time.Second) @@ -435,7 +442,7 @@ func setUpHermeticServer(pods []*backend.PodMetrics) (client extProcPb.ExternalP return client, func() { cancel() conn.Close() - server.GracefulStop() + stopServer() } } @@ -447,7 +454,6 @@ func BeforeSuit() { ErrorIfCRDPathMissing: true, } cfg, err := testEnv.Start() - if err != nil { log.Fatalf("Failed to start test environment, cfg: %v error: %v", cfg, err) } @@ -469,11 +475,15 @@ func BeforeSuit() { serverRunner.Config = cfg serverRunner.Datastore = backend.NewK8sDataStore() - serverRunner.Setup() + if err := serverRunner.Setup(); err != nil { + log.Fatalf("Failed to start server runner: %v", err) + } // Start the controller manager in go routine, not blocking go func() { - serverRunner.StartManager() + if err := serverRunner.StartManager(ctrl.SetupSignalHandler()); err != nil { + log.Fatalf("Failed to start manager: %v", err) + } }() klog.Info("Setting up hermetic ExtProc server")