Skip to content

Commit d7fdc8f

Browse files
Refactor ext-proc Main with Server Package Add Hermetic Test with k8s API Client for EPP (#222)
* Add updated hermetic test with k8s client API, these pull from example object yamls. * Fix linting errors, remove unused variables and whitespace, remove commented out logging code. * Pre-allocate inferenceModels in hermetic test. * Fix import order for linting in hermetic test. * Move test object yamls to test/artifacts directory in ext-proc, make start manager in main blocking * move hermetic test for extproc into a new integration package and move k8s API logic into a BeforeSuite() function run before the hermetic tests * Set up constants for main flags in extproc server package, improve formatting and helper functions for runserver. * rebase fork with main
1 parent e93541d commit d7fdc8f

File tree

5 files changed

+647
-282
lines changed

5 files changed

+647
-282
lines changed

pkg/ext-proc/main.go

+26-104
Original file line numberDiff line numberDiff line change
@@ -7,20 +7,16 @@ import (
77
"net"
88
"net/http"
99
"strconv"
10-
"time"
1110

12-
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
1311
"github.com/prometheus/client_golang/prometheus/promhttp"
1412
"google.golang.org/grpc"
1513
healthPb "google.golang.org/grpc/health/grpc_health_v1"
1614
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
1715
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
1816
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend/vllm"
19-
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/handlers"
2017
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics"
21-
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling"
18+
runserver "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/server"
2219
"k8s.io/apimachinery/pkg/runtime"
23-
"k8s.io/apimachinery/pkg/types"
2420
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2521
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
2622
"k8s.io/client-go/rest"
@@ -37,7 +33,7 @@ const (
3733
var (
3834
grpcPort = flag.Int(
3935
"grpcPort",
40-
9002,
36+
runserver.DefaultGrpcPort,
4137
"The gRPC port used for communicating with Envoy proxy")
4238
grpcHealthPort = flag.Int(
4339
"grpcHealthPort",
@@ -47,31 +43,31 @@ var (
4743
"metricsPort", 9090, "The metrics port")
4844
targetPodHeader = flag.String(
4945
"targetPodHeader",
50-
"target-pod",
46+
runserver.DefaultTargetPodHeader,
5147
"Header key used by Envoy to route to the appropriate pod. This must match Envoy configuration.")
5248
poolName = flag.String(
5349
"poolName",
54-
"",
50+
runserver.DefaultPoolName,
5551
"Name of the InferencePool this Endpoint Picker is associated with.")
5652
poolNamespace = flag.String(
5753
"poolNamespace",
58-
"default",
54+
runserver.DefaultPoolNamespace,
5955
"Namespace of the InferencePool this Endpoint Picker is associated with.")
6056
serviceName = flag.String(
6157
"serviceName",
62-
"",
58+
runserver.DefaultServiceName,
6359
"Name of the Service that will be used to read EndpointSlices from")
6460
zone = flag.String(
6561
"zone",
66-
"",
62+
runserver.DefaultZone,
6763
"The zone that this instance is created in. Will be passed to the corresponding endpointSlice. ")
6864
refreshPodsInterval = flag.Duration(
6965
"refreshPodsInterval",
70-
10*time.Second,
66+
runserver.DefaultRefreshPodsInterval,
7167
"interval to refresh pods")
7268
refreshMetricsInterval = flag.Duration(
7369
"refreshMetricsInterval",
74-
50*time.Millisecond,
70+
runserver.DefaultRefreshMetricsInterval,
7571
"interval to refresh metrics")
7672

7773
scheme = runtime.NewScheme()
@@ -103,71 +99,34 @@ func main() {
10399
})
104100
klog.Info(flags)
105101

106-
// Create a new manager to manage controllers
107-
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme: scheme})
108-
if err != nil {
109-
klog.Fatalf("Failed to create controller manager: %v", err)
110-
}
111-
112-
// Create the data store used to cache watched resources
113102
datastore := backend.NewK8sDataStore()
114103

115-
// Create the controllers and register them with the manager
116-
if err := (&backend.InferencePoolReconciler{
117-
Datastore: datastore,
118-
Scheme: mgr.GetScheme(),
119-
Client: mgr.GetClient(),
120-
PoolNamespacedName: types.NamespacedName{
121-
Name: *poolName,
122-
Namespace: *poolNamespace,
123-
},
124-
Record: mgr.GetEventRecorderFor("InferencePool"),
125-
}).SetupWithManager(mgr); err != nil {
126-
klog.Fatalf("Failed setting up InferencePoolReconciler: %v", err)
127-
}
128-
129-
if err := (&backend.InferenceModelReconciler{
130-
Datastore: datastore,
131-
Scheme: mgr.GetScheme(),
132-
Client: mgr.GetClient(),
133-
PoolNamespacedName: types.NamespacedName{
134-
Name: *poolName,
135-
Namespace: *poolNamespace,
136-
},
137-
Record: mgr.GetEventRecorderFor("InferenceModel"),
138-
}).SetupWithManager(mgr); err != nil {
139-
klog.Fatalf("Failed setting up InferenceModelReconciler: %v", err)
140-
}
141-
142-
if err := (&backend.EndpointSliceReconciler{
143-
Datastore: datastore,
144-
Scheme: mgr.GetScheme(),
145-
Client: mgr.GetClient(),
146-
Record: mgr.GetEventRecorderFor("endpointslice"),
147-
ServiceName: *serviceName,
148-
Zone: *zone,
149-
}).SetupWithManager(mgr); err != nil {
150-
klog.Fatalf("Failed setting up EndpointSliceReconciler: %v", err)
104+
serverRunner := &runserver.ExtProcServerRunner{
105+
GrpcPort: *grpcPort,
106+
TargetPodHeader: *targetPodHeader,
107+
PoolName: *poolName,
108+
PoolNamespace: *poolNamespace,
109+
ServiceName: *serviceName,
110+
Zone: *zone,
111+
RefreshPodsInterval: *refreshPodsInterval,
112+
RefreshMetricsInterval: *refreshMetricsInterval,
113+
Scheme: scheme,
114+
Config: ctrl.GetConfigOrDie(),
115+
Datastore: datastore,
151116
}
117+
serverRunner.Setup()
152118

153119
// Start health and ext-proc servers in goroutines
154120
healthSvr := startHealthServer(datastore, *grpcHealthPort)
155-
extProcSvr := startExternalProcessorServer(
121+
extProcSvr := serverRunner.Start(
156122
datastore,
157-
*grpcPort,
158-
*refreshPodsInterval,
159-
*refreshMetricsInterval,
160-
*targetPodHeader,
123+
&vllm.PodMetricsClientImpl{},
161124
)
162125
// Start metrics handler
163126
metricsSvr := startMetricsHandler(*metricsPort, cfg)
164127

165-
// Start the controller manager. Blocking and will return when shutdown is complete.
166-
klog.Infof("Starting controller manager")
167-
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
168-
klog.Fatalf("Error starting controller manager: %v", err)
169-
}
170-
klog.Info("Controller manager shutting down")
128+
// Start manager, blocking
129+
serverRunner.StartManager()
171130

172131
// Gracefully shutdown servers
173132
if healthSvr != nil {
@@ -209,43 +168,6 @@ func startHealthServer(ds *backend.K8sDatastore, port int) *grpc.Server {
209168
return svr
210169
}
211170

212-
// startExternalProcessorServer starts the Envoy external processor server in a goroutine.
213-
func startExternalProcessorServer(
214-
datastore *backend.K8sDatastore,
215-
port int,
216-
refreshPodsInterval, refreshMetricsInterval time.Duration,
217-
targetPodHeader string,
218-
) *grpc.Server {
219-
svr := grpc.NewServer()
220-
221-
go func() {
222-
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
223-
if err != nil {
224-
klog.Fatalf("Ext-proc server failed to listen: %v", err)
225-
}
226-
klog.Infof("Ext-proc server listening on port: %d", port)
227-
228-
// Initialize backend provider
229-
pp := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore)
230-
if err := pp.Init(refreshPodsInterval, refreshMetricsInterval); err != nil {
231-
klog.Fatalf("Failed to initialize backend provider: %v", err)
232-
}
233-
234-
// Register ext_proc handlers
235-
extProcPb.RegisterExternalProcessorServer(
236-
svr,
237-
handlers.NewServer(pp, scheduling.NewScheduler(pp), targetPodHeader, datastore),
238-
)
239-
240-
// Blocking and will return when shutdown is complete.
241-
if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped {
242-
klog.Fatalf("Ext-proc server failed: %v", err)
243-
}
244-
klog.Info("Ext-proc server shutting down")
245-
}()
246-
return svr
247-
}
248-
249171
func startMetricsHandler(port int, cfg *rest.Config) *http.Server {
250172
metrics.Register()
251173

pkg/ext-proc/server/runserver.go

+156
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
package server
2+
3+
import (
4+
"fmt"
5+
"net"
6+
"time"
7+
8+
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
9+
"google.golang.org/grpc"
10+
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
11+
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/handlers"
12+
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling"
13+
"k8s.io/apimachinery/pkg/runtime"
14+
"k8s.io/apimachinery/pkg/types"
15+
"k8s.io/client-go/rest"
16+
klog "k8s.io/klog/v2"
17+
ctrl "sigs.k8s.io/controller-runtime"
18+
)
19+
20+
// ExtProcServerRunner provides methods to manage an external process server.
21+
type ExtProcServerRunner struct {
22+
GrpcPort int
23+
TargetPodHeader string
24+
PoolName string
25+
PoolNamespace string
26+
ServiceName string
27+
Zone string
28+
RefreshPodsInterval time.Duration
29+
RefreshMetricsInterval time.Duration
30+
Scheme *runtime.Scheme
31+
Config *rest.Config
32+
Datastore *backend.K8sDatastore
33+
manager ctrl.Manager
34+
}
35+
36+
// Default values for CLI flags in main
37+
const (
38+
DefaultGrpcPort = 9002 // default for --grpcPort
39+
DefaultTargetPodHeader = "target-pod" // default for --targetPodHeader
40+
DefaultPoolName = "" // required but no default
41+
DefaultPoolNamespace = "default" // default for --poolNamespace
42+
DefaultServiceName = "" // required but no default
43+
DefaultZone = "" // default for --zone
44+
DefaultRefreshPodsInterval = 10 * time.Second // default for --refreshPodsInterval
45+
DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval
46+
)
47+
48+
func NewDefaultExtProcServerRunner() *ExtProcServerRunner {
49+
return &ExtProcServerRunner{
50+
GrpcPort: DefaultGrpcPort,
51+
TargetPodHeader: DefaultTargetPodHeader,
52+
PoolName: DefaultPoolName,
53+
PoolNamespace: DefaultPoolNamespace,
54+
ServiceName: DefaultServiceName,
55+
Zone: DefaultZone,
56+
RefreshPodsInterval: DefaultRefreshPodsInterval,
57+
RefreshMetricsInterval: DefaultRefreshMetricsInterval,
58+
// Scheme, Config, and Datastore can be assigned later.
59+
}
60+
}
61+
62+
// Setup creates the reconcilers for pools, models, and endpointSlices and starts the manager.
63+
func (r *ExtProcServerRunner) Setup() {
64+
// Create a new manager to manage controllers
65+
mgr, err := ctrl.NewManager(r.Config, ctrl.Options{Scheme: r.Scheme})
66+
if err != nil {
67+
klog.Fatalf("Failed to create controller manager: %v", err)
68+
}
69+
r.manager = mgr
70+
71+
// Create the controllers and register them with the manager
72+
if err := (&backend.InferencePoolReconciler{
73+
Datastore: r.Datastore,
74+
Scheme: mgr.GetScheme(),
75+
Client: mgr.GetClient(),
76+
PoolNamespacedName: types.NamespacedName{
77+
Name: r.PoolName,
78+
Namespace: r.PoolNamespace,
79+
},
80+
Record: mgr.GetEventRecorderFor("InferencePool"),
81+
}).SetupWithManager(mgr); err != nil {
82+
klog.Fatalf("Failed setting up InferencePoolReconciler: %v", err)
83+
}
84+
85+
if err := (&backend.InferenceModelReconciler{
86+
Datastore: r.Datastore,
87+
Scheme: mgr.GetScheme(),
88+
Client: mgr.GetClient(),
89+
PoolNamespacedName: types.NamespacedName{
90+
Name: r.PoolName,
91+
Namespace: r.PoolNamespace,
92+
},
93+
Record: mgr.GetEventRecorderFor("InferenceModel"),
94+
}).SetupWithManager(mgr); err != nil {
95+
klog.Fatalf("Failed setting up InferenceModelReconciler: %v", err)
96+
}
97+
98+
if err := (&backend.EndpointSliceReconciler{
99+
Datastore: r.Datastore,
100+
Scheme: mgr.GetScheme(),
101+
Client: mgr.GetClient(),
102+
Record: mgr.GetEventRecorderFor("endpointslice"),
103+
ServiceName: r.ServiceName,
104+
Zone: r.Zone,
105+
}).SetupWithManager(mgr); err != nil {
106+
klog.Fatalf("Failed setting up EndpointSliceReconciler: %v", err)
107+
}
108+
}
109+
110+
// Start starts the Envoy external processor server in a goroutine.
111+
func (r *ExtProcServerRunner) Start(
112+
podDatastore *backend.K8sDatastore,
113+
podMetricsClient backend.PodMetricsClient,
114+
) *grpc.Server {
115+
svr := grpc.NewServer()
116+
117+
go func() {
118+
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", r.GrpcPort))
119+
if err != nil {
120+
klog.Fatalf("Ext-proc server failed to listen: %v", err)
121+
}
122+
klog.Infof("Ext-proc server listening on port: %d", r.GrpcPort)
123+
124+
// Initialize backend provider
125+
pp := backend.NewProvider(podMetricsClient, podDatastore)
126+
if err := pp.Init(r.RefreshPodsInterval, r.RefreshMetricsInterval); err != nil {
127+
klog.Fatalf("Failed to initialize backend provider: %v", err)
128+
}
129+
130+
// Register ext_proc handlers
131+
extProcPb.RegisterExternalProcessorServer(
132+
svr,
133+
handlers.NewServer(pp, scheduling.NewScheduler(pp), r.TargetPodHeader, r.Datastore),
134+
)
135+
136+
// Blocking and will return when shutdown is complete.
137+
if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped {
138+
klog.Fatalf("Ext-proc server failed: %v", err)
139+
}
140+
klog.Info("Ext-proc server shutting down")
141+
}()
142+
return svr
143+
}
144+
145+
func (r *ExtProcServerRunner) StartManager() {
146+
if r.manager == nil {
147+
klog.Fatalf("Runner has no manager setup to run: %v", r)
148+
}
149+
// Start the controller manager. Blocking and will return when shutdown is complete.
150+
klog.Infof("Starting controller manager")
151+
mgr := r.manager
152+
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
153+
klog.Fatalf("Error starting controller manager: %v", err)
154+
}
155+
klog.Info("Controller manager shutting down")
156+
}

0 commit comments

Comments
 (0)