Skip to content

Commit beed7f1

Browse files
committed
Addressed comments
1 parent 3254cca commit beed7f1

File tree

5 files changed

+55
-53
lines changed

5 files changed

+55
-53
lines changed

pkg/ext-proc/backend/datastore.go

+17-17
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,10 @@ func WithPodListerFactory(factory PodListerFactory) K8sDatastoreOption {
6161
type PodLister struct {
6262
Lister listersv1.PodLister
6363
sharedInformer informers.SharedInformerFactory
64-
ctx context.Context
6564
}
6665

67-
func (l *PodLister) list(selector labels.Selector) ([]*corev1.Pod, error) {
68-
return l.Lister.List(selector)
66+
func (l *PodLister) listEverything() ([]*corev1.Pod, error) {
67+
return l.Lister.List(labels.Everything())
6968

7069
}
7170

@@ -97,8 +96,9 @@ func (ds *K8sDatastore) setInferencePool(pool *v1alpha1.InferencePool) {
9796
// Create a new informer with the new selector.
9897
ds.podLister = ds.podListerFactory(ds.inferencePool)
9998
if ds.podLister != nil && ds.podLister.sharedInformer != nil {
100-
ds.podLister.sharedInformer.Start(ds.podLister.ctx.Done())
101-
ds.podLister.sharedInformer.WaitForCacheSync(ds.podLister.ctx.Done())
99+
ctx := context.Background()
100+
ds.podLister.sharedInformer.Start(ctx.Done())
101+
ds.podLister.sharedInformer.WaitForCacheSync(ctx.Done())
102102
}
103103
}
104104
}
@@ -123,7 +123,7 @@ func (ds *K8sDatastore) createPodLister(pool *v1alpha1.InferencePool) *PodLister
123123
}
124124

125125
newPodInformer := func(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
126-
informer := informersv1.NewFilteredPodInformer(cs, pool.Namespace, 0, nil, func(options *metav1.ListOptions) {
126+
informer := informersv1.NewFilteredPodInformer(cs, pool.Namespace, resyncPeriod, cache.Indexers{}, func(options *metav1.ListOptions) {
127127
options.LabelSelector = labels.SelectorFromSet(selectorSet).String()
128128
})
129129
err := informer.SetTransform(func(obj interface{}) (interface{}, error) {
@@ -140,30 +140,30 @@ func (ds *K8sDatastore) createPodLister(pool *v1alpha1.InferencePool) *PodLister
140140
}
141141
return informer
142142
}
143-
sharedInformer := informers.NewSharedInformerFactory(ds.client, 0)
143+
// 0 means we disable resyncing, it is not really useful to resync every hour (the controller-runtime default),
144+
// if things go wrong in the watch, no one will wait for an hour for things to get fixed.
145+
// As precedence, kube-scheduler also disables this since it is expensive to list all pods from the api-server regularly.
146+
resyncPeriod := time.Duration(0)
147+
sharedInformer := informers.NewSharedInformerFactory(ds.client, resyncPeriod)
144148
sharedInformer.InformerFor(&v1.Pod{}, newPodInformer)
145149

146150
return &PodLister{
147151
Lister: sharedInformer.Core().V1().Pods().Lister(),
148152
sharedInformer: sharedInformer,
149-
ctx: context.Background(),
150153
}
151154
}
152155

153-
func (ds *K8sDatastore) getPods() []*corev1.Pod {
156+
func (ds *K8sDatastore) getPods() ([]*corev1.Pod, error) {
154157
ds.poolMu.RLock()
155158
defer ds.poolMu.RUnlock()
156-
if ds.podLister == nil {
157-
klog.V(logutil.DEFAULT).Info("InferencePool not yet initialized")
158-
return []*corev1.Pod{}
159+
if !ds.HasSynced() {
160+
return nil, errors.New("InferencePool is not initialized in datastore")
159161
}
160-
161-
pods, err := ds.podLister.list(labels.Everything())
162+
pods, err := ds.podLister.listEverything()
162163
if err != nil {
163-
klog.Errorf("Failed to list pods for pool %s/%s: %v", ds.inferencePool.Namespace, ds.inferencePool.Name, err)
164-
return []*corev1.Pod{}
164+
return nil, err
165165
}
166-
return pods
166+
return pods, nil
167167
}
168168

169169
func (s *K8sDatastore) FetchModelData(modelName string) (returnModel *v1alpha1.InferenceModel) {

pkg/ext-proc/backend/provider.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -104,14 +104,18 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio
104104
// refreshPodsOnce lists pods and updates keys in the podMetrics map.
105105
// Note this function doesn't update the PodMetrics value, it's done separately.
106106
func (p *Provider) refreshPodsOnce() {
107-
pool, err := p.datastore.getInferencePool()
107+
pods, err := p.datastore.getPods()
108108
if err != nil {
109-
klog.V(logutil.DEFAULT).Infof("Pool not ready: %v", err)
109+
klog.V(logutil.DEFAULT).Infof("Couldn't list pods: %v", err)
110110
p.podMetrics.Clear()
111111
return
112112
}
113-
114-
pods := p.datastore.getPods()
113+
pool, _ := p.datastore.getInferencePool()
114+
// revision is used to track which entries we need to remove in the next iteration that removes
115+
// metrics for pods that don't exist anymore. Otherwise we have to build a map of the listed pods,
116+
// which is not efficient. Revision can be any random id as long as it is different from the last
117+
// refresh, so it should be very reliable (as reliable as the probability of randomly picking two
118+
// different numbers from range 0 - maxInt).
115119
revision := rand.Int()
116120
ready := 0
117121
for _, pod := range pods {

pkg/ext-proc/server/runserver.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ type ExtProcServerRunner struct {
3434
// Default values for CLI flags in main
3535
const (
3636
DefaultGrpcPort = 9002 // default for --grpcPort
37-
DefaultTargetEndpointKey = "x-gateway-destination-endpoint" // default for --targetPodHeader
37+
DefaultTargetEndpointKey = "x-gateway-destination-endpoint" // default for --targetEndpointKey
3838
DefaultPoolName = "" // required but no default
3939
DefaultPoolNamespace = "default" // default for --poolNamespace
4040
DefaultRefreshPodsInterval = 10 * time.Second // default for --refreshPodsInterval
@@ -130,8 +130,7 @@ func (r *ExtProcServerRunner) StartManager() {
130130
}
131131
// Start the controller manager. Blocking and will return when shutdown is complete.
132132
klog.Infof("Starting controller manager")
133-
mgr := r.Manager
134-
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
133+
if err := r.Manager.Start(ctrl.SetupSignalHandler()); err != nil {
135134
klog.Fatalf("Error starting controller manager: %v", err)
136135
}
137136
klog.Info("Controller manager shutting down")

pkg/ext-proc/test/utils.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval time.Dur
2424
pms[pod.Pod.Name] = pod
2525
}
2626
pmc := &backend.FakePodMetricsClient{Res: pms}
27-
pp := backend.NewProvider(pmc, backend.NewK8sDataStore()) // backend.WithPods(pods)
27+
pp := backend.NewProvider(pmc, backend.NewK8sDataStore())
2828
if err := pp.Init(refreshPodsInterval, refreshMetricsInterval); err != nil {
2929
klog.Fatalf("failed to initialize: %v", err)
3030
}

test/integration/hermetic_test.go

+27-28
Original file line numberDiff line numberDiff line change
@@ -206,44 +206,38 @@ func TestKubeInferenceModelRequest(t *testing.T) {
206206
},
207207
}
208208

209-
pods := []*backend.PodMetrics{
209+
metrics := []*backend.Metrics{
210210
{
211-
Metrics: backend.Metrics{
212-
WaitingQueueSize: 0,
213-
KVCacheUsagePercent: 0.2,
214-
ActiveModels: map[string]int{
215-
"foo": 1,
216-
"bar": 1,
217-
},
211+
WaitingQueueSize: 0,
212+
KVCacheUsagePercent: 0.2,
213+
ActiveModels: map[string]int{
214+
"foo": 1,
215+
"bar": 1,
218216
},
219217
},
220218
{
221-
Metrics: backend.Metrics{
222-
WaitingQueueSize: 0,
223-
KVCacheUsagePercent: 0.1,
224-
ActiveModels: map[string]int{
225-
"foo": 1,
226-
"sql-lora-1fdg2": 1,
227-
},
219+
WaitingQueueSize: 0,
220+
KVCacheUsagePercent: 0.1,
221+
ActiveModels: map[string]int{
222+
"foo": 1,
223+
"sql-lora-1fdg2": 1,
228224
},
229225
},
230226
{
231-
Metrics: backend.Metrics{
232-
WaitingQueueSize: 10,
233-
KVCacheUsagePercent: 0.2,
234-
ActiveModels: map[string]int{
235-
"foo": 1,
236-
},
227+
WaitingQueueSize: 10,
228+
KVCacheUsagePercent: 0.2,
229+
ActiveModels: map[string]int{
230+
"foo": 1,
237231
},
238232
},
239233
}
240234

241235
// Set up global k8sclient and extproc server runner with test environment config
242-
BeforeSuit(pods)
236+
podMetrics := BeforeSuit(metrics)
243237

244238
for _, test := range tests {
245239
t.Run(test.name, func(t *testing.T) {
246-
client, cleanup := setUpHermeticServer(t, pods)
240+
client, cleanup := setUpHermeticServer(t, podMetrics)
247241
t.Cleanup(cleanup)
248242
want := &extProcPb.ProcessingResponse{
249243
Response: &extProcPb.ProcessingResponse_RequestBody{
@@ -374,7 +368,7 @@ func setUpHermeticServer(t *testing.T, pods []*backend.PodMetrics) (client extPr
374368
}
375369

376370
// Sets up a test environment and returns the runner struct
377-
func BeforeSuit(metrics []*backend.PodMetrics) {
371+
func BeforeSuit(metrics []*backend.Metrics) []*backend.PodMetrics {
378372
// Set up mock k8s API Client
379373
testEnv = &envtest.Environment{
380374
CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")},
@@ -396,17 +390,21 @@ func BeforeSuit(metrics []*backend.PodMetrics) {
396390
log.Fatalf("No error, but returned kubernetes client is nil, cfg: %v", cfg)
397391
}
398392

393+
var podMetrics []*backend.PodMetrics
399394
fakeLister := &testingutil.FakePodLister{
400395
PodsList: []*corev1.Pod{},
401396
}
402397
for i, m := range metrics {
403398
podName := "pod-" + strconv.Itoa(i)
404399
pod := testingutil.MakePod(podName).SetReady().SetPodIP(podName).Obj()
405400
fakeLister.PodsList = append(fakeLister.PodsList, pod)
406-
m.Pod = backend.Pod{
407-
Name: pod.Name,
408-
Address: pod.Status.PodIP + ":8000",
409-
}
401+
podMetrics = append(podMetrics, &backend.PodMetrics{
402+
Pod: backend.Pod{
403+
Name: pod.Name,
404+
Address: pod.Status.PodIP + ":8000",
405+
},
406+
Metrics: *m,
407+
})
410408
}
411409

412410
serverRunner = runserver.NewDefaultExtProcServerRunner()
@@ -431,6 +429,7 @@ func BeforeSuit(metrics []*backend.PodMetrics) {
431429

432430
// Wait the reconcilers to populate the datastore.
433431
time.Sleep(5 * time.Second)
432+
return podMetrics
434433
}
435434

436435
func sendRequest(t *testing.T, client extProcPb.ExternalProcessor_ProcessClient, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) {

0 commit comments

Comments
 (0)