Skip to content

Commit ec26973

Browse files
committed
Addressed comments
1 parent 3254cca commit ec26973

File tree

4 files changed

+28
-25
lines changed

4 files changed

+28
-25
lines changed

pkg/ext-proc/backend/datastore.go

+17-17
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,10 @@ func WithPodListerFactory(factory PodListerFactory) K8sDatastoreOption {
6161
type PodLister struct {
6262
Lister listersv1.PodLister
6363
sharedInformer informers.SharedInformerFactory
64-
ctx context.Context
6564
}
6665

67-
func (l *PodLister) list(selector labels.Selector) ([]*corev1.Pod, error) {
68-
return l.Lister.List(selector)
66+
func (l *PodLister) listEverything() ([]*corev1.Pod, error) {
67+
return l.Lister.List(labels.Everything())
6968

7069
}
7170

@@ -97,8 +96,9 @@ func (ds *K8sDatastore) setInferencePool(pool *v1alpha1.InferencePool) {
9796
// Create a new informer with the new selector.
9897
ds.podLister = ds.podListerFactory(ds.inferencePool)
9998
if ds.podLister != nil && ds.podLister.sharedInformer != nil {
100-
ds.podLister.sharedInformer.Start(ds.podLister.ctx.Done())
101-
ds.podLister.sharedInformer.WaitForCacheSync(ds.podLister.ctx.Done())
99+
ctx := context.Background()
100+
ds.podLister.sharedInformer.Start(ctx.Done())
101+
ds.podLister.sharedInformer.WaitForCacheSync(ctx.Done())
102102
}
103103
}
104104
}
@@ -123,7 +123,7 @@ func (ds *K8sDatastore) createPodLister(pool *v1alpha1.InferencePool) *PodLister
123123
}
124124

125125
newPodInformer := func(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
126-
informer := informersv1.NewFilteredPodInformer(cs, pool.Namespace, 0, nil, func(options *metav1.ListOptions) {
126+
informer := informersv1.NewFilteredPodInformer(cs, pool.Namespace, resyncPeriod, cache.Indexers{}, func(options *metav1.ListOptions) {
127127
options.LabelSelector = labels.SelectorFromSet(selectorSet).String()
128128
})
129129
err := informer.SetTransform(func(obj interface{}) (interface{}, error) {
@@ -140,30 +140,30 @@ func (ds *K8sDatastore) createPodLister(pool *v1alpha1.InferencePool) *PodLister
140140
}
141141
return informer
142142
}
143-
sharedInformer := informers.NewSharedInformerFactory(ds.client, 0)
143+
// 0 means we disable resyncing, it is not really useful to resync every hour (the controller-runtime default),
144+
// if things go wrong in the watch, no one will wait for an hour for things to get fixed.
145+
// As precedence, kube-scheduler also disables this since it is expensive to list all pods from the api-server regularly.
146+
resyncPeriod := time.Duration(0)
147+
sharedInformer := informers.NewSharedInformerFactory(ds.client, resyncPeriod)
144148
sharedInformer.InformerFor(&v1.Pod{}, newPodInformer)
145149

146150
return &PodLister{
147151
Lister: sharedInformer.Core().V1().Pods().Lister(),
148152
sharedInformer: sharedInformer,
149-
ctx: context.Background(),
150153
}
151154
}
152155

153-
func (ds *K8sDatastore) getPods() []*corev1.Pod {
156+
func (ds *K8sDatastore) getPods() ([]*corev1.Pod, error) {
154157
ds.poolMu.RLock()
155158
defer ds.poolMu.RUnlock()
156-
if ds.podLister == nil {
157-
klog.V(logutil.DEFAULT).Info("InferencePool not yet initialized")
158-
return []*corev1.Pod{}
159+
if !ds.HasSynced() {
160+
return nil, errors.New("InferencePool is not initialized in datastore")
159161
}
160-
161-
pods, err := ds.podLister.list(labels.Everything())
162+
pods, err := ds.podLister.listEverything()
162163
if err != nil {
163-
klog.Errorf("Failed to list pods for pool %s/%s: %v", ds.inferencePool.Namespace, ds.inferencePool.Name, err)
164-
return []*corev1.Pod{}
164+
return nil, err
165165
}
166-
return pods
166+
return pods, nil
167167
}
168168

169169
func (s *K8sDatastore) FetchModelData(modelName string) (returnModel *v1alpha1.InferenceModel) {

pkg/ext-proc/backend/provider.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -104,14 +104,18 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio
104104
// refreshPodsOnce lists pods and updates keys in the podMetrics map.
105105
// Note this function doesn't update the PodMetrics value, it's done separately.
106106
func (p *Provider) refreshPodsOnce() {
107-
pool, err := p.datastore.getInferencePool()
107+
pods, err := p.datastore.getPods()
108108
if err != nil {
109-
klog.V(logutil.DEFAULT).Infof("Pool not ready: %v", err)
109+
klog.V(logutil.DEFAULT).Infof("Couldn't list pods: %v", err)
110110
p.podMetrics.Clear()
111111
return
112112
}
113-
114-
pods := p.datastore.getPods()
113+
pool, _ := p.datastore.getInferencePool()
114+
// revision is used to track which entries we need to remove in the next iteration that removes
115+
// metrics for pods that don't exist anymore. Otherwise we have to build a map of the listed pods,
116+
// which is not efficient. Revision can be any random id as long as it is different from the last
117+
// refresh, so it should be very reliable (as reliable as the probability of randomly picking two
118+
// different numbers from range 0 - maxInt).
115119
revision := rand.Int()
116120
ready := 0
117121
for _, pod := range pods {

pkg/ext-proc/server/runserver.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ type ExtProcServerRunner struct {
3434
// Default values for CLI flags in main
3535
const (
3636
DefaultGrpcPort = 9002 // default for --grpcPort
37-
DefaultTargetEndpointKey = "x-gateway-destination-endpoint" // default for --targetPodHeader
37+
DefaultTargetEndpointKey = "x-gateway-destination-endpoint" // default for --targetEndpointKey
3838
DefaultPoolName = "" // required but no default
3939
DefaultPoolNamespace = "default" // default for --poolNamespace
4040
DefaultRefreshPodsInterval = 10 * time.Second // default for --refreshPodsInterval
@@ -130,8 +130,7 @@ func (r *ExtProcServerRunner) StartManager() {
130130
}
131131
// Start the controller manager. Blocking and will return when shutdown is complete.
132132
klog.Infof("Starting controller manager")
133-
mgr := r.Manager
134-
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
133+
if err := r.Manager.Start(ctrl.SetupSignalHandler()); err != nil {
135134
klog.Fatalf("Error starting controller manager: %v", err)
136135
}
137136
klog.Info("Controller manager shutting down")

pkg/ext-proc/test/utils.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval time.Dur
2424
pms[pod.Pod.Name] = pod
2525
}
2626
pmc := &backend.FakePodMetricsClient{Res: pms}
27-
pp := backend.NewProvider(pmc, backend.NewK8sDataStore()) // backend.WithPods(pods)
27+
pp := backend.NewProvider(pmc, backend.NewK8sDataStore())
2828
if err := pp.Init(refreshPodsInterval, refreshMetricsInterval); err != nil {
2929
klog.Fatalf("failed to initialize: %v", err)
3030
}

0 commit comments

Comments
 (0)