Skip to content

Commit 42b4cbb

Browse files
Add updated hermetic test with k8s client API, these pull from example object yamls.
1 parent e93541d commit 42b4cbb

File tree

4 files changed

+496
-49
lines changed

4 files changed

+496
-49
lines changed

Diff for: pkg/ext-proc/main.go

+59-45
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/metrics"
2121
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling"
2222
"k8s.io/apimachinery/pkg/runtime"
23-
"k8s.io/apimachinery/pkg/types"
2423
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
2524
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
2625
"k8s.io/client-go/rest"
@@ -103,56 +102,26 @@ func main() {
103102
})
104103
klog.Info(flags)
105104

106-
// Create a new manager to manage controllers
107-
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{Scheme: scheme})
108-
if err != nil {
109-
klog.Fatalf("Failed to create controller manager: %v", err)
110-
}
111-
112-
// Create the data store used to cache watched resources
113105
datastore := backend.NewK8sDataStore()
114106

115-
// Create the controllers and register them with the manager
116-
if err := (&backend.InferencePoolReconciler{
117-
Datastore: datastore,
118-
Scheme: mgr.GetScheme(),
119-
Client: mgr.GetClient(),
120-
PoolNamespacedName: types.NamespacedName{
121-
Name: *poolName,
122-
Namespace: *poolNamespace,
123-
},
124-
Record: mgr.GetEventRecorderFor("InferencePool"),
125-
}).SetupWithManager(mgr); err != nil {
126-
klog.Fatalf("Failed setting up InferencePoolReconciler: %v", err)
127-
}
128-
129-
if err := (&backend.InferenceModelReconciler{
130-
Datastore: datastore,
131-
Scheme: mgr.GetScheme(),
132-
Client: mgr.GetClient(),
133-
PoolNamespacedName: types.NamespacedName{
134-
Name: *poolName,
135-
Namespace: *poolNamespace,
136-
},
137-
Record: mgr.GetEventRecorderFor("InferenceModel"),
138-
}).SetupWithManager(mgr); err != nil {
139-
klog.Fatalf("Failed setting up InferenceModelReconciler: %v", err)
140-
}
141-
142-
if err := (&backend.EndpointSliceReconciler{
143-
Datastore: datastore,
144-
Scheme: mgr.GetScheme(),
145-
Client: mgr.GetClient(),
146-
Record: mgr.GetEventRecorderFor("endpointslice"),
147-
ServiceName: *serviceName,
148-
Zone: *zone,
149-
}).SetupWithManager(mgr); err != nil {
150-
klog.Fatalf("Failed setting up EndpointSliceReconciler: %v", err)
107+
runner := &runserver.ExtProcServerRunner{
108+
GrpcPort: *grpcPort,
109+
TargetPodHeader: *targetPodHeader,
110+
PoolName: *poolName,
111+
PoolNamespace: *poolNamespace,
112+
ServiceName: *serviceName,
113+
Zone: *zone,
114+
RefreshPodsInterval: *refreshPodsInterval,
115+
RefreshMetricsInterval: *refreshMetricsInterval,
116+
Scheme: scheme,
117+
Config: ctrl.GetConfigOrDie(),
118+
Datastore: datastore,
151119
}
120+
runner.Setup()
152121

153122
// Start health and ext-proc servers in goroutines
154123
healthSvr := startHealthServer(datastore, *grpcHealthPort)
155-
extProcSvr := startExternalProcessorServer(
124+
extProcSvr := runner.Start(
156125
datastore,
157126
*grpcPort,
158127
*refreshPodsInterval,
@@ -289,6 +258,51 @@ func metricsHandlerWithAuthenticationAndAuthorization(cfg *rest.Config) http.Han
289258
return metricsAuthHandler
290259
}
291260

261+
func startMetricsHandler(port int, cfg *rest.Config) *http.Server {
262+
metrics.Register()
263+
264+
var svr *http.Server
265+
go func() {
266+
klog.Info("Starting metrics HTTP handler ...")
267+
268+
mux := http.NewServeMux()
269+
mux.Handle(defaultMetricsEndpoint, metricsHandlerWithAuthenticationAndAuthorization(cfg))
270+
271+
svr = &http.Server{
272+
Addr: net.JoinHostPort("", strconv.Itoa(port)),
273+
Handler: mux,
274+
}
275+
if err := svr.ListenAndServe(); err != http.ErrServerClosed {
276+
klog.Fatalf("failed to start metrics HTTP handler: %v", err)
277+
}
278+
}()
279+
return svr
280+
}
281+
282+
func metricsHandlerWithAuthenticationAndAuthorization(cfg *rest.Config) http.Handler {
283+
h := promhttp.HandlerFor(
284+
legacyregistry.DefaultGatherer,
285+
promhttp.HandlerOpts{},
286+
)
287+
httpClient, err := rest.HTTPClientFor(cfg)
288+
if err != nil {
289+
klog.Fatalf("failed to create http client for metrics auth: %v", err)
290+
}
291+
292+
filter, err := filters.WithAuthenticationAndAuthorization(cfg, httpClient)
293+
if err != nil {
294+
klog.Fatalf("failed to create metrics filter for auth: %v", err)
295+
}
296+
metricsLogger := klog.LoggerWithValues(klog.NewKlogr(), "path", defaultMetricsEndpoint)
297+
metricsAuthHandler, err := filter(metricsLogger, h)
298+
if err != nil {
299+
klog.Fatalf("failed to create metrics auth handler: %v", err)
300+
}
301+
return metricsAuthHandler
302+
}
303+
304+
=======
305+
>>>>>>> ad32d85 (Add updated hermetic test with k8s client API, these pull from example object yamls.)
292306
func validateFlags() error {
293307
if *poolName == "" {
294308
return fmt.Errorf("required %q flag not set", "poolName")

Diff for: pkg/ext-proc/server/runserver.go

+125
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package server
2+
3+
import (
4+
"fmt"
5+
"net"
6+
"time"
7+
8+
extProcPb "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
9+
"google.golang.org/grpc"
10+
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
11+
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/handlers"
12+
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/scheduling"
13+
"k8s.io/apimachinery/pkg/runtime"
14+
"k8s.io/apimachinery/pkg/types"
15+
"k8s.io/client-go/rest"
16+
klog "k8s.io/klog/v2"
17+
ctrl "sigs.k8s.io/controller-runtime"
18+
)
19+
20+
// ExtProcServerRunner provides methods to manage an external process server.
21+
type ExtProcServerRunner struct {
22+
GrpcPort int
23+
TargetPodHeader string
24+
PoolName string
25+
PoolNamespace string
26+
ServiceName string
27+
Zone string
28+
RefreshPodsInterval time.Duration
29+
RefreshMetricsInterval time.Duration
30+
Scheme *runtime.Scheme
31+
Config *rest.Config
32+
Datastore *backend.K8sDatastore
33+
}
34+
35+
// Setup creates the reconcilers for pools, models, and endpointSlices and starts the manager.
36+
func (r *ExtProcServerRunner) Setup() {
37+
// Create a new manager to manage controllers
38+
mgr, err := ctrl.NewManager(r.Config, ctrl.Options{Scheme: r.Scheme})
39+
if err != nil {
40+
klog.Fatalf("Failed to create controller manager: %v", err)
41+
}
42+
43+
// Create the controllers and register them with the manager
44+
if err := (&backend.InferencePoolReconciler{
45+
Datastore: r.Datastore,
46+
Scheme: mgr.GetScheme(),
47+
Client: mgr.GetClient(),
48+
PoolNamespacedName: types.NamespacedName{
49+
Name: r.PoolName,
50+
Namespace: r.PoolNamespace,
51+
},
52+
Record: mgr.GetEventRecorderFor("InferencePool"),
53+
}).SetupWithManager(mgr); err != nil {
54+
klog.Fatalf("Failed setting up InferencePoolReconciler: %v", err)
55+
}
56+
57+
if err := (&backend.InferenceModelReconciler{
58+
Datastore: r.Datastore,
59+
Scheme: mgr.GetScheme(),
60+
Client: mgr.GetClient(),
61+
PoolNamespacedName: types.NamespacedName{
62+
Name: r.PoolName,
63+
Namespace: r.PoolNamespace,
64+
},
65+
Record: mgr.GetEventRecorderFor("InferenceModel"),
66+
}).SetupWithManager(mgr); err != nil {
67+
klog.Fatalf("Failed setting up InferenceModelReconciler: %v", err)
68+
}
69+
70+
if err := (&backend.EndpointSliceReconciler{
71+
Datastore: r.Datastore,
72+
Scheme: mgr.GetScheme(),
73+
Client: mgr.GetClient(),
74+
Record: mgr.GetEventRecorderFor("endpointslice"),
75+
ServiceName: r.ServiceName,
76+
Zone: r.Zone,
77+
}).SetupWithManager(mgr); err != nil {
78+
klog.Fatalf("Failed setting up EndpointSliceReconciler: %v", err)
79+
}
80+
81+
// Start the controller manager. Blocking and will return when shutdown is complete.
82+
errChan := make(chan error)
83+
klog.Infof("Starting controller manager")
84+
go func() {
85+
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
86+
klog.Error(err, "Error running manager")
87+
errChan <- err
88+
}
89+
}()
90+
}
91+
92+
// Start starts the Envoy external processor server in a goroutine.
93+
func (r *ExtProcServerRunner) Start(
94+
podDatastore *backend.K8sDatastore,
95+
podMetricsClient backend.PodMetricsClient,
96+
) *grpc.Server {
97+
svr := grpc.NewServer()
98+
99+
go func() {
100+
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", r.GrpcPort))
101+
if err != nil {
102+
klog.Fatalf("Ext-proc server failed to listen: %v", err)
103+
}
104+
klog.Infof("Ext-proc server listening on port: %d", r.GrpcPort)
105+
106+
// Initialize backend provider
107+
pp := backend.NewProvider(podMetricsClient, podDatastore)
108+
if err := pp.Init(r.RefreshPodsInterval, r.RefreshMetricsInterval); err != nil {
109+
klog.Fatalf("Failed to initialize backend provider: %v", err)
110+
}
111+
112+
// Register ext_proc handlers
113+
extProcPb.RegisterExternalProcessorServer(
114+
svr,
115+
handlers.NewServer(pp, scheduling.NewScheduler(pp), r.TargetPodHeader, r.Datastore),
116+
)
117+
118+
// Blocking and will return when shutdown is complete.
119+
if err := svr.Serve(lis); err != nil && err != grpc.ErrServerStopped {
120+
klog.Fatalf("Ext-proc server failed: %v", err)
121+
}
122+
klog.Info("Ext-proc server shutting down")
123+
}()
124+
return svr
125+
}

0 commit comments

Comments
 (0)