Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace EndpointSlice reconciler with pod list backed by informer #271

Merged
merged 2 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ linters:
- dupword
- durationcheck
- fatcontext
- gci
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not enforcing a specific import format, and so was causing issues.

- ginkgolinter
- gocritic
- govet
Expand Down
132 changes: 115 additions & 17 deletions pkg/ext-proc/backend/datastore.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,36 @@
package backend

import (
"context"
"errors"
"math/rand"
"sync"
"time"

"github.com/google/go-cmp/cmp"
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
informersv1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
clientset "k8s.io/client-go/kubernetes"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)

func NewK8sDataStore(options ...K8sDatastoreOption) *K8sDatastore {
store := &K8sDatastore{
poolMu: sync.RWMutex{},
InferenceModels: &sync.Map{},
pods: &sync.Map{},
}

store.podListerFactory = store.createPodLister
for _, opt := range options {
opt(store)
}
Expand All @@ -25,29 +39,68 @@ func NewK8sDataStore(options ...K8sDatastoreOption) *K8sDatastore {

// The datastore is a local cache of relevant data for the given InferencePool (currently all pulled from k8s-api)
type K8sDatastore struct {
client kubernetes.Interface
// poolMu is used to synchronize access to the inferencePool.
poolMu sync.RWMutex
inferencePool *v1alpha1.InferencePool
InferenceModels *sync.Map
pods *sync.Map
poolMu sync.RWMutex
inferencePool *v1alpha1.InferencePool
podListerFactory PodListerFactory
podLister *PodLister
InferenceModels *sync.Map
}

type K8sDatastoreOption func(*K8sDatastore)
type PodListerFactory func(*v1alpha1.InferencePool) *PodLister

// WithPods can be used in tests to override the pods.
func WithPods(pods []*PodMetrics) K8sDatastoreOption {
func WithPodListerFactory(factory PodListerFactory) K8sDatastoreOption {
return func(store *K8sDatastore) {
store.pods = &sync.Map{}
for _, pod := range pods {
store.pods.Store(pod.Pod, true)
}
store.podListerFactory = factory
}
}

type PodLister struct {
Lister listersv1.PodLister
sharedInformer informers.SharedInformerFactory
ctx context.Context
}

func (l *PodLister) list(selector labels.Selector) ([]*corev1.Pod, error) {
return l.Lister.List(selector)

}

func (ds *K8sDatastore) SetClient(client kubernetes.Interface) {
ds.client = client
}

func (ds *K8sDatastore) setInferencePool(pool *v1alpha1.InferencePool) {
ds.poolMu.Lock()
defer ds.poolMu.Unlock()

if ds.inferencePool != nil && cmp.Equal(ds.inferencePool.Spec.Selector, pool.Spec.Selector) {
// Pool updated, but the selector stayed the same, so no need to change the informer.
ds.inferencePool = pool
return
}

// New pool or selector updated.
ds.inferencePool = pool

if ds.podLister != nil && ds.podLister.sharedInformer != nil {
// Shutdown the old informer async since this takes a few seconds.
go func() {
ds.podLister.sharedInformer.Shutdown()
}()
}

if ds.podListerFactory != nil {
// Create a new informer with the new selector.
ds.podLister = ds.podListerFactory(ds.inferencePool)
if ds.podLister != nil && ds.podLister.sharedInformer != nil {
ds.podLister.sharedInformer.Start(ds.podLister.ctx.Done())
ds.podLister.sharedInformer.WaitForCacheSync(ds.podLister.ctx.Done())
}
}
}

func (ds *K8sDatastore) getInferencePool() (*v1alpha1.InferencePool, error) {
Expand All @@ -59,13 +112,58 @@ func (ds *K8sDatastore) getInferencePool() (*v1alpha1.InferencePool, error) {
return ds.inferencePool, nil
}

func (ds *K8sDatastore) GetPodIPs() []string {
var ips []string
ds.pods.Range(func(name, pod any) bool {
ips = append(ips, pod.(*corev1.Pod).Status.PodIP)
return true
})
return ips
func (ds *K8sDatastore) createPodLister(pool *v1alpha1.InferencePool) *PodLister {
if ds.client == nil {
return nil
}
klog.V(logutil.DEFAULT).Infof("Creating informer for pool %v", pool.Name)
selectorSet := make(map[string]string)
for k, v := range pool.Spec.Selector {
selectorSet[string(k)] = string(v)
}

newPodInformer := func(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
informer := informersv1.NewFilteredPodInformer(cs, pool.Namespace, 0, nil, func(options *metav1.ListOptions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can you leave a comment on what the 0 and nil parameters mean?

options.LabelSelector = labels.SelectorFromSet(selectorSet).String()
})
err := informer.SetTransform(func(obj interface{}) (interface{}, error) {
// Remove unnecessary fields to improve memory footprint.
if accessor, err := meta.Accessor(obj); err == nil {
if accessor.GetManagedFields() != nil {
accessor.SetManagedFields(nil)
}
}
return obj, nil
})
if err != nil {
klog.Errorf("Failed to set pod transformer: %v", err)
}
return informer
}
sharedInformer := informers.NewSharedInformerFactory(ds.client, 0)
sharedInformer.InformerFor(&v1.Pod{}, newPodInformer)

return &PodLister{
Lister: sharedInformer.Core().V1().Pods().Lister(),
sharedInformer: sharedInformer,
ctx: context.Background(),
}
}

func (ds *K8sDatastore) getPods() []*corev1.Pod {
ds.poolMu.RLock()
defer ds.poolMu.RUnlock()
if ds.podLister == nil {
klog.V(logutil.DEFAULT).Info("InferencePool not yet initialized")
return []*corev1.Pod{}
}

pods, err := ds.podLister.list(labels.Everything())
if err != nil {
klog.Errorf("Failed to list pods for pool %s/%s: %v", ds.inferencePool.Namespace, ds.inferencePool.Name, err)
return []*corev1.Pod{}
}
return pods
}

func (s *K8sDatastore) FetchModelData(modelName string) (returnModel *v1alpha1.InferenceModel) {
Expand Down
109 changes: 0 additions & 109 deletions pkg/ext-proc/backend/endpointslice_reconciler.go

This file was deleted.

Loading