Skip to content

Commit f29c899

Browse files
Set up constants for main flags in extproc server package, improve formatting and helper functions for runserver.
1 parent 5d96d8e commit f29c899

File tree

4 files changed

+56
-41
lines changed

4 files changed

+56
-41
lines changed

Diff for: pkg/ext-proc/main.go

+12-13
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"net"
88
"net/http"
99
"strconv"
10-
"time"
1110

1211
"github.com/prometheus/client_golang/prometheus/promhttp"
1312
"google.golang.org/grpc"
@@ -34,7 +33,7 @@ const (
3433
var (
3534
grpcPort = flag.Int(
3635
"grpcPort",
37-
9002,
36+
runserver.DefaultGrpcPort,
3837
"The gRPC port used for communicating with Envoy proxy")
3938
grpcHealthPort = flag.Int(
4039
"grpcHealthPort",
@@ -44,31 +43,31 @@ var (
4443
"metricsPort", 9090, "The metrics port")
4544
targetPodHeader = flag.String(
4645
"targetPodHeader",
47-
"target-pod",
46+
runserver.DefaultTargetPodHeader,
4847
"Header key used by Envoy to route to the appropriate pod. This must match Envoy configuration.")
4948
poolName = flag.String(
5049
"poolName",
51-
"",
50+
runserver.DefaultPoolName,
5251
"Name of the InferencePool this Endpoint Picker is associated with.")
5352
poolNamespace = flag.String(
5453
"poolNamespace",
55-
"default",
54+
runserver.DefaultPoolNamespace,
5655
"Namespace of the InferencePool this Endpoint Picker is associated with.")
5756
serviceName = flag.String(
5857
"serviceName",
59-
"",
58+
runserver.DefaultServiceName,
6059
"Name of the Service that will be used to read EndpointSlices from")
6160
zone = flag.String(
6261
"zone",
63-
"",
62+
runserver.DefaultZone,
6463
"The zone that this instance is created in. Will be passed to the corresponding endpointSlice. ")
6564
refreshPodsInterval = flag.Duration(
6665
"refreshPodsInterval",
67-
10*time.Second,
66+
runserver.DefaultRefreshPodsInterval,
6867
"interval to refresh pods")
6968
refreshMetricsInterval = flag.Duration(
7069
"refreshMetricsInterval",
71-
50*time.Millisecond,
70+
runserver.DefaultRefreshMetricsInterval,
7271
"interval to refresh metrics")
7372

7473
scheme = runtime.NewScheme()
@@ -102,7 +101,7 @@ func main() {
102101

103102
datastore := backend.NewK8sDataStore()
104103

105-
runner := &runserver.ExtProcServerRunner{
104+
serverRunner := &runserver.ExtProcServerRunner{
106105
GrpcPort: *grpcPort,
107106
TargetPodHeader: *targetPodHeader,
108107
PoolName: *poolName,
@@ -115,19 +114,19 @@ func main() {
115114
Config: ctrl.GetConfigOrDie(),
116115
Datastore: datastore,
117116
}
118-
runner.Setup()
117+
serverRunner.Setup()
119118

120119
// Start health and ext-proc servers in goroutines
121120
healthSvr := startHealthServer(datastore, *grpcHealthPort)
122-
extProcSvr := runner.Start(
121+
extProcSvr := serverRunner.Start(
123122
datastore,
124123
&vllm.PodMetricsClientImpl{},
125124
)
126125
// Start metrics handler
127126
metricsSvr := startMetricsHandler(*metricsPort, cfg)
128127

129128
// Start manager, blocking
130-
runner.StartManager()
129+
serverRunner.StartManager()
131130

132131
// Gracefully shutdown servers
133132
if healthSvr != nil {

Diff for: pkg/ext-proc/server/runserver.go

+29-3
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,33 @@ type ExtProcServerRunner struct {
3030
Scheme *runtime.Scheme
3131
Config *rest.Config
3232
Datastore *backend.K8sDatastore
33-
manager *ctrl.Manager
33+
manager ctrl.Manager
34+
}
35+
36+
// Default values for CLI flags in main
37+
const (
38+
DefaultGrpcPort = 9002 // default for --grpcPort
39+
DefaultTargetPodHeader = "target-pod" // default for --targetPodHeader
40+
DefaultPoolName = "" // required but no default
41+
DefaultPoolNamespace = "default" // default for --poolNamespace
42+
DefaultServiceName = "" // required but no default
43+
DefaultZone = "" // default for --zone
44+
DefaultRefreshPodsInterval = 10 * time.Second // default for --refreshPodsInterval
45+
DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval
46+
)
47+
48+
func NewDefaultExtProcServerRunner() *ExtProcServerRunner {
49+
return &ExtProcServerRunner{
50+
GrpcPort: DefaultGrpcPort,
51+
TargetPodHeader: DefaultTargetPodHeader,
52+
PoolName: DefaultPoolName,
53+
PoolNamespace: DefaultPoolNamespace,
54+
ServiceName: DefaultServiceName,
55+
Zone: DefaultZone,
56+
RefreshPodsInterval: DefaultRefreshPodsInterval,
57+
RefreshMetricsInterval: DefaultRefreshMetricsInterval,
58+
// Scheme, Config, and Datastore can be assigned later.
59+
}
3460
}
3561

3662
// Setup creates the reconcilers for pools, models, and endpointSlices and starts the manager.
@@ -40,7 +66,7 @@ func (r *ExtProcServerRunner) Setup() {
4066
if err != nil {
4167
klog.Fatalf("Failed to create controller manager: %v", err)
4268
}
43-
r.manager = &mgr
69+
r.manager = mgr
4470

4571
// Create the controllers and register them with the manager
4672
if err := (&backend.InferencePoolReconciler{
@@ -122,7 +148,7 @@ func (r *ExtProcServerRunner) StartManager() {
122148
}
123149
// Start the controller manager. Blocking and will return when shutdown is complete.
124150
klog.Infof("Starting controller manager")
125-
mgr := *r.manager
151+
mgr := r.manager
126152
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
127153
klog.Fatalf("Error starting controller manager: %v", err)
128154
}

Diff for: test/integration/hermetic_test.go

+15-25
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,14 @@ import (
3636
)
3737

3838
const (
39-
port = 9002
39+
port = runserver.DefaultGrpcPort
4040
)
4141

4242
var (
43-
runner *runserver.ExtProcServerRunner
44-
k8sClient k8sclient.Client
45-
testEnv *envtest.Environment
46-
scheme = runtime.NewScheme()
43+
serverRunner *runserver.ExtProcServerRunner
44+
k8sClient k8sclient.Client
45+
testEnv *envtest.Environment
46+
scheme = runtime.NewScheme()
4747
)
4848

4949
func SKIPTestHandleRequestBody(t *testing.T) {
@@ -128,9 +128,6 @@ func SKIPTestHandleRequestBody(t *testing.T) {
128128

129129
for _, test := range tests {
130130
t.Run(test.name, func(t *testing.T) {
131-
132-
// log.Fatalf("inference model: %v", *test.models["my-model"]) // TEMP
133-
134131
client, cleanup := setUpServer(t, test.pods, test.models)
135132
t.Cleanup(cleanup)
136133
want := &extProcPb.ProcessingResponse{
@@ -297,7 +294,7 @@ func setUpHermeticServer(t *testing.T, pods []*backend.PodMetrics) (client extPr
297294
_ = flag.Lookup("v").Value.Set("3")
298295

299296
// Unmarshal CRDs from file into structs
300-
manifestsPath := filepath.Join(".", "artifacts", "inferencepool-with-model-hermetic.yaml")
297+
manifestsPath := filepath.Join("..", "testdata", "inferencepool-with-model-hermetic.yaml")
301298
docs, err := readDocuments(manifestsPath)
302299
if err != nil {
303300
log.Fatalf("Can't read object manifests at path %v, %v", manifestsPath, err)
@@ -330,7 +327,7 @@ func setUpHermeticServer(t *testing.T, pods []*backend.PodMetrics) (client extPr
330327
}
331328
pmc := &backend.FakePodMetricsClient{Res: pms}
332329

333-
server := runner.Start(backend.NewK8sDataStore(backend.WithPods(pods)), pmc)
330+
server := serverRunner.Start(backend.NewK8sDataStore(backend.WithPods(pods)), pmc)
334331
if err != nil {
335332
log.Fatalf("Ext-proc failed with the err: %v", err)
336333
}
@@ -380,25 +377,18 @@ func BeforeSuit() {
380377
log.Fatalf("No error, but returned kubernetes client is nil, cfg: %v", cfg)
381378
}
382379

383-
runner = &runserver.ExtProcServerRunner{
384-
GrpcPort: port,
385-
TargetPodHeader: "target-pod",
386-
PoolName: "vllm-llama2-7b-pool",
387-
PoolNamespace: "default",
388-
ServiceName: "",
389-
Zone: "",
390-
RefreshPodsInterval: 10 * time.Second,
391-
RefreshMetricsInterval: 50 * time.Millisecond,
392-
Scheme: scheme,
393-
Config: cfg,
394-
Datastore: backend.NewK8sDataStore(),
395-
}
380+
serverRunner = runserver.NewDefaultExtProcServerRunner()
381+
// Adjust from defaults
382+
serverRunner.PoolName = "vllm-llama2-7b-pool"
383+
serverRunner.Scheme = scheme
384+
serverRunner.Config = cfg
385+
serverRunner.Datastore = backend.NewK8sDataStore()
396386

397-
runner.Setup()
387+
serverRunner.Setup()
398388

399389
// Start the controller manager in go routine, not blocking
400390
go func() {
401-
runner.StartManager()
391+
serverRunner.StartManager()
402392
}()
403393
}
404394

0 commit comments

Comments
 (0)