Skip to content

Revert "Replace EndpointSlice reconciler with pod list backed by informers" #301

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ linters:
- dupword
- durationcheck
- fatcontext
- gci
- ginkgolinter
- gocritic
- govet
Expand Down
132 changes: 17 additions & 115 deletions pkg/ext-proc/backend/datastore.go
Original file line number Diff line number Diff line change
@@ -1,36 +1,22 @@
package backend

import (
"context"
"errors"
"math/rand"
"sync"
"time"

"github.com/google/go-cmp/cmp"
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
informersv1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
clientset "k8s.io/client-go/kubernetes"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)

func NewK8sDataStore(options ...K8sDatastoreOption) *K8sDatastore {
store := &K8sDatastore{
poolMu: sync.RWMutex{},
InferenceModels: &sync.Map{},
pods: &sync.Map{},
}

store.podListerFactory = store.createPodLister
for _, opt := range options {
opt(store)
}
Expand All @@ -39,68 +25,29 @@ func NewK8sDataStore(options ...K8sDatastoreOption) *K8sDatastore {

// The datastore is a local cache of relevant data for the given InferencePool (currently all pulled from k8s-api)
type K8sDatastore struct {
client kubernetes.Interface
// poolMu is used to synchronize access to the inferencePool.
poolMu sync.RWMutex
inferencePool *v1alpha1.InferencePool
podListerFactory PodListerFactory
podLister *PodLister
InferenceModels *sync.Map
poolMu sync.RWMutex
inferencePool *v1alpha1.InferencePool
InferenceModels *sync.Map
pods *sync.Map
}

type K8sDatastoreOption func(*K8sDatastore)
type PodListerFactory func(*v1alpha1.InferencePool) *PodLister

// WithPods can be used in tests to override the pods.
func WithPodListerFactory(factory PodListerFactory) K8sDatastoreOption {
func WithPods(pods []*PodMetrics) K8sDatastoreOption {
return func(store *K8sDatastore) {
store.podListerFactory = factory
store.pods = &sync.Map{}
for _, pod := range pods {
store.pods.Store(pod.Pod, true)
}
}
}

type PodLister struct {
Lister listersv1.PodLister
sharedInformer informers.SharedInformerFactory
}

func (l *PodLister) listEverything() ([]*corev1.Pod, error) {
return l.Lister.List(labels.Everything())

}

func (ds *K8sDatastore) SetClient(client kubernetes.Interface) {
ds.client = client
}

func (ds *K8sDatastore) setInferencePool(pool *v1alpha1.InferencePool) {
ds.poolMu.Lock()
defer ds.poolMu.Unlock()

if ds.inferencePool != nil && cmp.Equal(ds.inferencePool.Spec.Selector, pool.Spec.Selector) {
// Pool updated, but the selector stayed the same, so no need to change the informer.
ds.inferencePool = pool
return
}

// New pool or selector updated.
ds.inferencePool = pool

if ds.podLister != nil && ds.podLister.sharedInformer != nil {
// Shutdown the old informer async since this takes a few seconds.
go func() {
ds.podLister.sharedInformer.Shutdown()
}()
}

if ds.podListerFactory != nil {
// Create a new informer with the new selector.
ds.podLister = ds.podListerFactory(ds.inferencePool)
if ds.podLister != nil && ds.podLister.sharedInformer != nil {
ctx := context.Background()
ds.podLister.sharedInformer.Start(ctx.Done())
ds.podLister.sharedInformer.WaitForCacheSync(ctx.Done())
}
}
}

func (ds *K8sDatastore) getInferencePool() (*v1alpha1.InferencePool, error) {
Expand All @@ -112,58 +59,13 @@ func (ds *K8sDatastore) getInferencePool() (*v1alpha1.InferencePool, error) {
return ds.inferencePool, nil
}

func (ds *K8sDatastore) createPodLister(pool *v1alpha1.InferencePool) *PodLister {
if ds.client == nil {
return nil
}
klog.V(logutil.DEFAULT).Infof("Creating informer for pool %v", pool.Name)
selectorSet := make(map[string]string)
for k, v := range pool.Spec.Selector {
selectorSet[string(k)] = string(v)
}

newPodInformer := func(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
informer := informersv1.NewFilteredPodInformer(cs, pool.Namespace, resyncPeriod, cache.Indexers{}, func(options *metav1.ListOptions) {
options.LabelSelector = labels.SelectorFromSet(selectorSet).String()
})
err := informer.SetTransform(func(obj interface{}) (interface{}, error) {
// Remove unnecessary fields to improve memory footprint.
if accessor, err := meta.Accessor(obj); err == nil {
if accessor.GetManagedFields() != nil {
accessor.SetManagedFields(nil)
}
}
return obj, nil
})
if err != nil {
klog.Errorf("Failed to set pod transformer: %v", err)
}
return informer
}
// 0 means we disable resyncing, it is not really useful to resync every hour (the controller-runtime default),
// if things go wrong in the watch, no one will wait for an hour for things to get fixed.
// As precedence, kube-scheduler also disables this since it is expensive to list all pods from the api-server regularly.
resyncPeriod := time.Duration(0)
sharedInformer := informers.NewSharedInformerFactory(ds.client, resyncPeriod)
sharedInformer.InformerFor(&v1.Pod{}, newPodInformer)

return &PodLister{
Lister: sharedInformer.Core().V1().Pods().Lister(),
sharedInformer: sharedInformer,
}
}

func (ds *K8sDatastore) getPods() ([]*corev1.Pod, error) {
ds.poolMu.RLock()
defer ds.poolMu.RUnlock()
if !ds.HasSynced() {
return nil, errors.New("InferencePool is not initialized in datastore")
}
pods, err := ds.podLister.listEverything()
if err != nil {
return nil, err
}
return pods, nil
func (ds *K8sDatastore) GetPodIPs() []string {
var ips []string
ds.pods.Range(func(name, pod any) bool {
ips = append(ips, pod.(*corev1.Pod).Status.PodIP)
return true
})
return ips
}

func (s *K8sDatastore) FetchModelData(modelName string) (returnModel *v1alpha1.InferenceModel) {
Expand Down
109 changes: 109 additions & 0 deletions pkg/ext-proc/backend/endpointslice_reconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package backend

import (
"context"
"strconv"
"time"

"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
klog "k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

var (
serviceOwnerLabel = "kubernetes.io/service-name"
)

type EndpointSliceReconciler struct {
client.Client
Scheme *runtime.Scheme
Record record.EventRecorder
ServiceName string
Zone string
Datastore *K8sDatastore
}

func (c *EndpointSliceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
inferencePool, err := c.Datastore.getInferencePool()
if err != nil {
klog.V(logutil.DEFAULT).Infof("Skipping reconciling EndpointSlice because the InferencePool is not available yet: %v", err)
return ctrl.Result{Requeue: true, RequeueAfter: time.Second}, nil
}

klog.V(logutil.DEFAULT).Info("Reconciling EndpointSlice ", req.NamespacedName)

endpointSlice := &discoveryv1.EndpointSlice{}
if err := c.Get(ctx, req.NamespacedName, endpointSlice); err != nil {
klog.Errorf("Unable to get EndpointSlice: %v", err)
return ctrl.Result{}, err
}
c.updateDatastore(endpointSlice, inferencePool)

return ctrl.Result{}, nil
}

// TODO: Support multiple endpointslices for a single service
func (c *EndpointSliceReconciler) updateDatastore(
slice *discoveryv1.EndpointSlice,
inferencePool *v1alpha1.InferencePool) {
podMap := make(map[Pod]bool)

for _, endpoint := range slice.Endpoints {
klog.V(logutil.DEFAULT).Infof("Zone: %v \n endpoint: %+v \n", c.Zone, endpoint)
if c.validPod(endpoint) {
pod := Pod{
Name: endpoint.TargetRef.Name,
Address: endpoint.Addresses[0] + ":" + strconv.Itoa(int(inferencePool.Spec.TargetPortNumber)),
}
podMap[pod] = true
klog.V(logutil.DEFAULT).Infof("Storing pod %v", pod)
c.Datastore.pods.Store(pod, true)
}
}

removeOldPods := func(k, v any) bool {
pod, ok := k.(Pod)
if !ok {
klog.Errorf("Unable to cast key to Pod: %v", k)
return false
}
if _, ok := podMap[pod]; !ok {
klog.V(logutil.DEFAULT).Infof("Removing pod %v", pod)
c.Datastore.pods.Delete(pod)
}
return true
}
c.Datastore.pods.Range(removeOldPods)
}

func (c *EndpointSliceReconciler) SetupWithManager(mgr ctrl.Manager) error {
ownsEndPointSlice := func(object client.Object) bool {
// Check if the object is an EndpointSlice
endpointSlice, ok := object.(*discoveryv1.EndpointSlice)
if !ok {
return false
}

gotLabel := endpointSlice.ObjectMeta.Labels[serviceOwnerLabel]
wantLabel := c.ServiceName
return gotLabel == wantLabel
}

return ctrl.NewControllerManagedBy(mgr).
For(&discoveryv1.EndpointSlice{},
builder.WithPredicates(predicate.NewPredicateFuncs(ownsEndPointSlice))).
Complete(c)
}

func (c *EndpointSliceReconciler) validPod(endpoint discoveryv1.Endpoint) bool {
validZone := c.Zone == "" || c.Zone != "" && *endpoint.Zone == c.Zone
return validZone && *endpoint.Conditions.Ready

}
Loading