Skip to content

Commit 050c6ae

Browse files
committed
Manage all runnable components using manager
Instead of managing all runnable components using errgroup.Group, they are all wrapped in manager.Runnable interface as the manager is being used to govern the init and termination process.
1 parent 2b99c6b commit 050c6ae

File tree

5 files changed

+149
-154
lines changed

5 files changed

+149
-154
lines changed

internal/runnable/grpc.go

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package runnable
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
8+
"google.golang.org/grpc"
9+
ctrl "sigs.k8s.io/controller-runtime"
10+
"sigs.k8s.io/controller-runtime/pkg/manager"
11+
)
12+
13+
func GRPCServer(name string, srv *grpc.Server, port int) manager.Runnable {
14+
return manager.RunnableFunc(func(ctx context.Context) error {
15+
// Use "name" key as that is what manager.Server does as well.
16+
log := ctrl.Log.WithValues("name", name)
17+
log.Info("gRPC server starting")
18+
19+
// Start listening.
20+
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
21+
if err != nil {
22+
log.Error(err, "gRPC server failed to listen")
23+
return err
24+
}
25+
26+
log.Info("gRPC server listening", "port", port)
27+
28+
// Shutdown on context closed.
29+
// Terminate the server on context closed.
30+
// Make sure the goroutine does not leak.
31+
doneCh := make(chan struct{})
32+
defer close(doneCh)
33+
go func() {
34+
select {
35+
case <-ctx.Done():
36+
log.Info("gRPC server shutting down")
37+
srv.GracefulStop()
38+
case <-doneCh:
39+
}
40+
}()
41+
42+
// Keep serving until terminated.
43+
if err := srv.Serve(lis); err != nil && err != grpc.ErrServerStopped {
44+
log.Error(err, "gRPC server failed")
45+
return err
46+
}
47+
log.Info("gRPC server terminated")
48+
return nil
49+
})
50+
}

internal/runnable/leader_election.go

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package runnable
2+
3+
import "sigs.k8s.io/controller-runtime/pkg/manager"
4+
5+
type leaderElection struct {
6+
manager.Runnable
7+
needsLeaderElection bool
8+
}
9+
10+
func LeaderElection(runnable manager.Runnable, needsLeaderElection bool) manager.Runnable {
11+
return &leaderElection{
12+
Runnable: runnable,
13+
needsLeaderElection: needsLeaderElection,
14+
}
15+
}
16+
17+
func RequireLeaderElection(runnable manager.Runnable) manager.Runnable {
18+
return LeaderElection(runnable, true)
19+
}
20+
21+
func NoLeaderElection(runnable manager.Runnable) manager.Runnable {
22+
return LeaderElection(runnable, false)
23+
}
24+
25+
func (r *leaderElection) NeedLeaderElection() bool {
26+
return r.needsLeaderElection
27+
}

pkg/ext-proc/main.go

+49-99
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package main
22

33
import (
4-
"context"
54
"flag"
65
"fmt"
76
"net"
@@ -10,10 +9,10 @@ import (
109
"strconv"
1110

1211
"github.com/prometheus/client_golang/prometheus/promhttp"
13-
"golang.org/x/sync/errgroup"
1412
"google.golang.org/grpc"
1513
healthPb "google.golang.org/grpc/health/grpc_health_v1"
1614
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
15+
"inference.networking.x-k8s.io/gateway-api-inference-extension/internal/runnable"
1716
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
1817
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend/vllm"
1918
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics"
@@ -26,6 +25,7 @@ import (
2625
"k8s.io/component-base/metrics/legacyregistry"
2726
klog "k8s.io/klog/v2"
2827
ctrl "sigs.k8s.io/controller-runtime"
28+
"sigs.k8s.io/controller-runtime/pkg/manager"
2929
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
3030
)
3131

@@ -119,123 +119,73 @@ func run() error {
119119
klog.ErrorS(err, "Failed to setup ext-proc server")
120120
return err
121121
}
122+
mgr := serverRunner.Manager
122123

123-
k8sClient, err := kubernetes.NewForConfigAndClient(cfg, serverRunner.Manager.GetHTTPClient())
124+
k8sClient, err := kubernetes.NewForConfigAndClient(cfg, mgr.GetHTTPClient())
124125
if err != nil {
125126
klog.ErrorS(err, "Failed to create client")
126127
return err
127128
}
128129
datastore.SetClient(k8sClient)
129130

130-
if err := serverRunner.Setup(); err != nil {
131-
klog.ErrorS(err, "Failed to setup server runner")
131+
// Register health server.
132+
if err := registerHealthServer(mgr, datastore, *grpcHealthPort); err != nil {
132133
return err
133134
}
134135

135-
// Start processing signals and init the group to manage goroutines.
136-
g, ctx := errgroup.WithContext(ctrl.SetupSignalHandler())
137-
138-
// Start health server.
139-
startHealthServer(ctx, g, datastore, *grpcHealthPort)
140-
141-
// Start ext-proc server.
142-
g.Go(func() error {
143-
return serverRunner.Start(ctx, &vllm.PodMetricsClientImpl{})
144-
})
145-
146-
// Start metrics handler.
147-
startMetricsHandler(ctx, g, *metricsPort, cfg)
136+
// Register ext-proc server.
137+
if err := mgr.Add(serverRunner.AsRunnable(&vllm.PodMetricsClientImpl{})); err != nil {
138+
klog.ErrorS(err, "Failed to register ext-proc server")
139+
return err
140+
}
148141

149-
// Start manager.
150-
g.Go(func() error {
151-
return serverRunner.StartManager(ctx)
152-
})
142+
// Register metrics handler.
143+
if err := registerMetricsHandler(mgr, *metricsPort, cfg); err != nil {
144+
return err
145+
}
153146

154-
err = g.Wait()
155-
klog.InfoS("All components terminated")
156-
return err
147+
// Start the manager.
148+
return serverRunner.StartManager(ctrl.SetupSignalHandler())
157149
}
158150

159-
// startHealthServer starts the gRPC health probe server using the given errgroup.
160-
func startHealthServer(ctx context.Context, g *errgroup.Group, ds *backend.K8sDatastore, port int) {
161-
g.Go(func() error {
162-
klog.InfoS("Health server starting...")
163-
164-
// Start listening.
165-
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
166-
if err != nil {
167-
klog.ErrorS(err, "Health server failed to listen")
168-
return err
169-
}
170-
171-
klog.InfoS("Health server listening", "port", port)
172-
173-
svr := grpc.NewServer()
174-
healthPb.RegisterHealthServer(svr, &healthServer{datastore: ds})
175-
176-
// Shutdown on context closed.
177-
g.Go(func() error {
178-
<-ctx.Done()
179-
klog.InfoS("Health server shutting down...")
180-
svr.GracefulStop()
181-
return nil
182-
})
183-
184-
// Keep serving until terminated.
185-
if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped {
186-
klog.ErrorS(err, "Health server failed")
187-
return err
188-
}
189-
klog.InfoS("Health server terminated")
190-
return nil
191-
})
151+
// registerHealthServer adds the Health gRPC server as a Runnable to the given manager.
152+
func registerHealthServer(mgr manager.Manager, ds *backend.K8sDatastore, port int) error {
153+
srv := grpc.NewServer()
154+
healthPb.RegisterHealthServer(srv, &healthServer{datastore: ds})
155+
if err := mgr.Add(
156+
runnable.NoLeaderElection(runnable.GRPCServer("health", srv, port))); err != nil {
157+
klog.ErrorS(err, "Failed to register health server")
158+
return err
159+
}
160+
return nil
192161
}
193162

194-
// startMetricsHandler starts the metrics HTTP handler using the given errgroup.
195-
func startMetricsHandler(ctx context.Context, g *errgroup.Group, port int, cfg *rest.Config) {
196-
g.Go(func() error {
197-
metrics.Register()
198-
klog.InfoS("Metrics HTTP handler starting...")
163+
// registerMetricsHandler adds the metrics HTTP handler as a Runnable to the given manager.
164+
func registerMetricsHandler(mgr manager.Manager, port int, cfg *rest.Config) error {
165+
metrics.Register()
199166

200-
// Start listening.
201-
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
202-
if err != nil {
203-
klog.ErrorS(err, "Metrics HTTP handler failed to listen")
204-
return err
205-
}
206-
207-
klog.InfoS("Metrics HTTP handler listening", "port", port)
208-
209-
// Init HTTP server.
210-
h, err := metricsHandlerWithAuthenticationAndAuthorization(cfg)
211-
if err != nil {
212-
return err
213-
}
214-
215-
mux := http.NewServeMux()
216-
mux.Handle(defaultMetricsEndpoint, h)
167+
// Init HTTP server.
168+
h, err := metricsHandlerWithAuthenticationAndAuthorization(cfg)
169+
if err != nil {
170+
return err
171+
}
217172

218-
svr := &http.Server{
219-
Addr: net.JoinHostPort("", strconv.Itoa(port)),
220-
Handler: mux,
221-
}
173+
mux := http.NewServeMux()
174+
mux.Handle(defaultMetricsEndpoint, h)
222175

223-
// Shutdown on interrupt.
224-
g.Go(func() error {
225-
<-ctx.Done()
226-
klog.InfoS("Metrics HTTP handler shutting down...")
227-
_ = svr.Shutdown(context.Background())
228-
return nil
229-
})
176+
srv := &http.Server{
177+
Addr: net.JoinHostPort("", strconv.Itoa(port)),
178+
Handler: mux,
179+
}
230180

231-
// Keep serving until terminated.
232-
if err := svr.Serve(lis); err != http.ErrServerClosed {
233-
klog.ErrorS(err, "Metrics HTTP handler failed")
234-
return err
235-
}
236-
klog.InfoS("Metrics HTTP handler terminated")
237-
return nil
238-
})
181+
if err := mgr.Add(&manager.Server{
182+
Name: "metrics",
183+
Server: srv,
184+
}); err != nil {
185+
klog.ErrorS(err, "Failed to register metrics HTTP handler")
186+
return err
187+
}
188+
return nil
239189
}
240190

241191
func metricsHandlerWithAuthenticationAndAuthorization(cfg *rest.Config) (http.Handler, error) {

pkg/ext-proc/server/runserver.go

+22-54
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"net"
87
"time"
98

109
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
1110
"google.golang.org/grpc"
11+
"inference.networking.x-k8s.io/gateway-api-inference-extension/internal/runnable"
1212
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
1313
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/handlers"
1414
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling"
@@ -17,6 +17,7 @@ import (
1717
"k8s.io/client-go/rest"
1818
klog "k8s.io/klog/v2"
1919
ctrl "sigs.k8s.io/controller-runtime"
20+
"sigs.k8s.io/controller-runtime/pkg/manager"
2021
)
2122

2223
// ExtProcServerRunner provides methods to manage an external process server.
@@ -93,60 +94,27 @@ func (r *ExtProcServerRunner) Setup() error {
9394
return nil
9495
}
9596

96-
// Start starts the Envoy external processor server and blocks
97-
// until the context is canceled or an error encountered.
98-
func (r *ExtProcServerRunner) Start(
99-
ctx context.Context,
100-
podMetricsClient backend.PodMetricsClient,
101-
) error {
102-
klog.InfoS("Ext-proc server starting...")
103-
104-
// Start listening.
105-
svr := grpc.NewServer()
106-
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", r.GrpcPort))
107-
if err != nil {
108-
klog.ErrorS(err, "Ext-proc server failed to listen", "port", r.GrpcPort)
109-
return err
110-
}
111-
// The listener will be closed by the server,
112-
// but the function may also return earlier on error.
113-
defer lis.Close()
114-
115-
klog.InfoS("Ext-proc server listening", "port", r.GrpcPort)
116-
117-
// Initialize backend provider
118-
pp := backend.NewProvider(podMetricsClient, r.Datastore)
119-
if err := pp.Init(r.RefreshPodsInterval, r.RefreshMetricsInterval); err != nil {
120-
klog.ErrorS(err, "Failed to initialize backend provider")
121-
return err
122-
}
123-
124-
// Register ext_proc handlers
125-
extProcPb.RegisterExternalProcessorServer(
126-
svr,
127-
handlers.NewServer(pp, scheduling.NewScheduler(pp), r.TargetEndpointKey, r.Datastore),
128-
)
129-
130-
// Terminate the server on context closed.
131-
// Make sure the goroutine does not leak.
132-
doneCh := make(chan struct{})
133-
defer close(doneCh)
134-
go func() {
135-
select {
136-
case <-ctx.Done():
137-
klog.InfoS("Ext-proc server shutting down...")
138-
svr.GracefulStop()
139-
case <-doneCh:
97+
// AsRunnable returns a Runnable that can be used to start the ext-proc gRPC server.
98+
// The runnable implements LeaderElectionRunnable with leader election disabled.
99+
func (r *ExtProcServerRunner) AsRunnable(podMetricsClient backend.PodMetricsClient) manager.Runnable {
100+
return runnable.NoLeaderElection(manager.RunnableFunc(func(ctx context.Context) error {
101+
// Initialize backend provider
102+
pp := backend.NewProvider(podMetricsClient, r.Datastore)
103+
if err := pp.Init(r.RefreshPodsInterval, r.RefreshMetricsInterval); err != nil {
104+
klog.ErrorS(err, "Failed to initialize backend provider")
105+
return err
140106
}
141-
}()
142107

143-
// Block until terminated.
144-
if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped {
145-
klog.ErrorS(err, "Ext-proc server failed")
146-
return err
147-
}
148-
klog.InfoS("Ext-proc server terminated")
149-
return nil
108+
// Init the server.
109+
srv := grpc.NewServer()
110+
extProcPb.RegisterExternalProcessorServer(
111+
srv,
112+
handlers.NewServer(pp, scheduling.NewScheduler(pp), r.TargetEndpointKey, r.Datastore),
113+
)
114+
115+
// Forward to the gRPC runnable.
116+
return runnable.GRPCServer("ext-proc", srv, r.GrpcPort).Start(ctx)
117+
}))
150118
}
151119

152120
func (r *ExtProcServerRunner) StartManager(ctx context.Context) error {
@@ -157,7 +125,7 @@ func (r *ExtProcServerRunner) StartManager(ctx context.Context) error {
157125
}
158126

159127
// Start the controller manager. Blocking and will return when shutdown is complete.
160-
klog.InfoS("Controller manager starting...")
128+
klog.InfoS("Controller manager starting")
161129
if err := r.Manager.Start(ctx); err != nil {
162130
klog.ErrorS(err, "Error starting controller manager")
163131
return err

test/integration/hermetic_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ func setUpHermeticServer(t *testing.T, pods []*backend.PodMetrics) (client extPr
343343

344344
serverCtx, stopServer := context.WithCancel(context.Background())
345345
go func() {
346-
if err := serverRunner.Start(serverCtx, pmc); err != nil {
346+
if err := serverRunner.AsRunnable(pmc).Start(serverCtx); err != nil {
347347
log.Fatalf("Failed to start ext-proc server: %v", err)
348348
}
349349
}()

0 commit comments

Comments
 (0)