Skip to content

Commit 40054b5

Browse files
committed
Revert "Replace EndpointSlice reconciler with pod list backed by informer (kubernetes-sigs#271)"
This reverts commit 9298849.
1 parent 056adfd commit 40054b5

21 files changed

+499
-448
lines changed

.golangci.yml

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ linters:
1414
- dupword
1515
- durationcheck
1616
- fatcontext
17+
- gci
1718
- ginkgolinter
1819
- gocritic
1920
- govet

pkg/ext-proc/backend/datastore.go

+17-115
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,22 @@
11
package backend
22

33
import (
4-
"context"
54
"errors"
65
"math/rand"
76
"sync"
8-
"time"
97

10-
"github.com/google/go-cmp/cmp"
118
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
129
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1310
corev1 "k8s.io/api/core/v1"
14-
v1 "k8s.io/api/core/v1"
15-
"k8s.io/apimachinery/pkg/api/meta"
16-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17-
"k8s.io/apimachinery/pkg/labels"
18-
"k8s.io/client-go/informers"
19-
informersv1 "k8s.io/client-go/informers/core/v1"
20-
"k8s.io/client-go/kubernetes"
21-
clientset "k8s.io/client-go/kubernetes"
22-
listersv1 "k8s.io/client-go/listers/core/v1"
23-
"k8s.io/client-go/tools/cache"
2411
"k8s.io/klog/v2"
2512
)
2613

2714
func NewK8sDataStore(options ...K8sDatastoreOption) *K8sDatastore {
2815
store := &K8sDatastore{
2916
poolMu: sync.RWMutex{},
3017
InferenceModels: &sync.Map{},
18+
pods: &sync.Map{},
3119
}
32-
33-
store.podListerFactory = store.createPodLister
3420
for _, opt := range options {
3521
opt(store)
3622
}
@@ -39,68 +25,29 @@ func NewK8sDataStore(options ...K8sDatastoreOption) *K8sDatastore {
3925

4026
// The datastore is a local cache of relevant data for the given InferencePool (currently all pulled from k8s-api)
4127
type K8sDatastore struct {
42-
client kubernetes.Interface
4328
// poolMu is used to synchronize access to the inferencePool.
44-
poolMu sync.RWMutex
45-
inferencePool *v1alpha1.InferencePool
46-
podListerFactory PodListerFactory
47-
podLister *PodLister
48-
InferenceModels *sync.Map
29+
poolMu sync.RWMutex
30+
inferencePool *v1alpha1.InferencePool
31+
InferenceModels *sync.Map
32+
pods *sync.Map
4933
}
5034

5135
type K8sDatastoreOption func(*K8sDatastore)
52-
type PodListerFactory func(*v1alpha1.InferencePool) *PodLister
5336

5437
// WithPods can be used in tests to override the pods.
55-
func WithPodListerFactory(factory PodListerFactory) K8sDatastoreOption {
38+
func WithPods(pods []*PodMetrics) K8sDatastoreOption {
5639
return func(store *K8sDatastore) {
57-
store.podListerFactory = factory
40+
store.pods = &sync.Map{}
41+
for _, pod := range pods {
42+
store.pods.Store(pod.Pod, true)
43+
}
5844
}
5945
}
6046

61-
type PodLister struct {
62-
Lister listersv1.PodLister
63-
sharedInformer informers.SharedInformerFactory
64-
}
65-
66-
func (l *PodLister) listEverything() ([]*corev1.Pod, error) {
67-
return l.Lister.List(labels.Everything())
68-
69-
}
70-
71-
func (ds *K8sDatastore) SetClient(client kubernetes.Interface) {
72-
ds.client = client
73-
}
74-
7547
func (ds *K8sDatastore) setInferencePool(pool *v1alpha1.InferencePool) {
7648
ds.poolMu.Lock()
7749
defer ds.poolMu.Unlock()
78-
79-
if ds.inferencePool != nil && cmp.Equal(ds.inferencePool.Spec.Selector, pool.Spec.Selector) {
80-
// Pool updated, but the selector stayed the same, so no need to change the informer.
81-
ds.inferencePool = pool
82-
return
83-
}
84-
85-
// New pool or selector updated.
8650
ds.inferencePool = pool
87-
88-
if ds.podLister != nil && ds.podLister.sharedInformer != nil {
89-
// Shutdown the old informer async since this takes a few seconds.
90-
go func() {
91-
ds.podLister.sharedInformer.Shutdown()
92-
}()
93-
}
94-
95-
if ds.podListerFactory != nil {
96-
// Create a new informer with the new selector.
97-
ds.podLister = ds.podListerFactory(ds.inferencePool)
98-
if ds.podLister != nil && ds.podLister.sharedInformer != nil {
99-
ctx := context.Background()
100-
ds.podLister.sharedInformer.Start(ctx.Done())
101-
ds.podLister.sharedInformer.WaitForCacheSync(ctx.Done())
102-
}
103-
}
10451
}
10552

10653
func (ds *K8sDatastore) getInferencePool() (*v1alpha1.InferencePool, error) {
@@ -112,58 +59,13 @@ func (ds *K8sDatastore) getInferencePool() (*v1alpha1.InferencePool, error) {
11259
return ds.inferencePool, nil
11360
}
11461

115-
func (ds *K8sDatastore) createPodLister(pool *v1alpha1.InferencePool) *PodLister {
116-
if ds.client == nil {
117-
return nil
118-
}
119-
klog.V(logutil.DEFAULT).Infof("Creating informer for pool %v", pool.Name)
120-
selectorSet := make(map[string]string)
121-
for k, v := range pool.Spec.Selector {
122-
selectorSet[string(k)] = string(v)
123-
}
124-
125-
newPodInformer := func(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
126-
informer := informersv1.NewFilteredPodInformer(cs, pool.Namespace, resyncPeriod, cache.Indexers{}, func(options *metav1.ListOptions) {
127-
options.LabelSelector = labels.SelectorFromSet(selectorSet).String()
128-
})
129-
err := informer.SetTransform(func(obj interface{}) (interface{}, error) {
130-
// Remove unnecessary fields to improve memory footprint.
131-
if accessor, err := meta.Accessor(obj); err == nil {
132-
if accessor.GetManagedFields() != nil {
133-
accessor.SetManagedFields(nil)
134-
}
135-
}
136-
return obj, nil
137-
})
138-
if err != nil {
139-
klog.Errorf("Failed to set pod transformer: %v", err)
140-
}
141-
return informer
142-
}
143-
// 0 means we disable resyncing, it is not really useful to resync every hour (the controller-runtime default),
144-
// if things go wrong in the watch, no one will wait for an hour for things to get fixed.
145-
// As precedence, kube-scheduler also disables this since it is expensive to list all pods from the api-server regularly.
146-
resyncPeriod := time.Duration(0)
147-
sharedInformer := informers.NewSharedInformerFactory(ds.client, resyncPeriod)
148-
sharedInformer.InformerFor(&v1.Pod{}, newPodInformer)
149-
150-
return &PodLister{
151-
Lister: sharedInformer.Core().V1().Pods().Lister(),
152-
sharedInformer: sharedInformer,
153-
}
154-
}
155-
156-
func (ds *K8sDatastore) getPods() ([]*corev1.Pod, error) {
157-
ds.poolMu.RLock()
158-
defer ds.poolMu.RUnlock()
159-
if !ds.HasSynced() {
160-
return nil, errors.New("InferencePool is not initialized in datastore")
161-
}
162-
pods, err := ds.podLister.listEverything()
163-
if err != nil {
164-
return nil, err
165-
}
166-
return pods, nil
62+
func (ds *K8sDatastore) GetPodIPs() []string {
63+
var ips []string
64+
ds.pods.Range(func(name, pod any) bool {
65+
ips = append(ips, pod.(*corev1.Pod).Status.PodIP)
66+
return true
67+
})
68+
return ips
16769
}
16870

16971
func (s *K8sDatastore) FetchModelData(modelName string) (returnModel *v1alpha1.InferenceModel) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package backend
2+
3+
import (
4+
"context"
5+
"strconv"
6+
"time"
7+
8+
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
9+
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
10+
discoveryv1 "k8s.io/api/discovery/v1"
11+
"k8s.io/apimachinery/pkg/runtime"
12+
"k8s.io/client-go/tools/record"
13+
klog "k8s.io/klog/v2"
14+
ctrl "sigs.k8s.io/controller-runtime"
15+
"sigs.k8s.io/controller-runtime/pkg/builder"
16+
"sigs.k8s.io/controller-runtime/pkg/client"
17+
"sigs.k8s.io/controller-runtime/pkg/predicate"
18+
)
19+
20+
var (
21+
serviceOwnerLabel = "kubernetes.io/service-name"
22+
)
23+
24+
type EndpointSliceReconciler struct {
25+
client.Client
26+
Scheme *runtime.Scheme
27+
Record record.EventRecorder
28+
ServiceName string
29+
Zone string
30+
Datastore *K8sDatastore
31+
}
32+
33+
func (c *EndpointSliceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
34+
inferencePool, err := c.Datastore.getInferencePool()
35+
if err != nil {
36+
klog.V(logutil.DEFAULT).Infof("Skipping reconciling EndpointSlice because the InferencePool is not available yet: %v", err)
37+
return ctrl.Result{Requeue: true, RequeueAfter: time.Second}, nil
38+
}
39+
40+
klog.V(logutil.DEFAULT).Info("Reconciling EndpointSlice ", req.NamespacedName)
41+
42+
endpointSlice := &discoveryv1.EndpointSlice{}
43+
if err := c.Get(ctx, req.NamespacedName, endpointSlice); err != nil {
44+
klog.Errorf("Unable to get EndpointSlice: %v", err)
45+
return ctrl.Result{}, err
46+
}
47+
c.updateDatastore(endpointSlice, inferencePool)
48+
49+
return ctrl.Result{}, nil
50+
}
51+
52+
// TODO: Support multiple endpointslices for a single service
53+
func (c *EndpointSliceReconciler) updateDatastore(
54+
slice *discoveryv1.EndpointSlice,
55+
inferencePool *v1alpha1.InferencePool) {
56+
podMap := make(map[Pod]bool)
57+
58+
for _, endpoint := range slice.Endpoints {
59+
klog.V(logutil.DEFAULT).Infof("Zone: %v \n endpoint: %+v \n", c.Zone, endpoint)
60+
if c.validPod(endpoint) {
61+
pod := Pod{
62+
Name: endpoint.TargetRef.Name,
63+
Address: endpoint.Addresses[0] + ":" + strconv.Itoa(int(inferencePool.Spec.TargetPortNumber)),
64+
}
65+
podMap[pod] = true
66+
klog.V(logutil.DEFAULT).Infof("Storing pod %v", pod)
67+
c.Datastore.pods.Store(pod, true)
68+
}
69+
}
70+
71+
removeOldPods := func(k, v any) bool {
72+
pod, ok := k.(Pod)
73+
if !ok {
74+
klog.Errorf("Unable to cast key to Pod: %v", k)
75+
return false
76+
}
77+
if _, ok := podMap[pod]; !ok {
78+
klog.V(logutil.DEFAULT).Infof("Removing pod %v", pod)
79+
c.Datastore.pods.Delete(pod)
80+
}
81+
return true
82+
}
83+
c.Datastore.pods.Range(removeOldPods)
84+
}
85+
86+
func (c *EndpointSliceReconciler) SetupWithManager(mgr ctrl.Manager) error {
87+
ownsEndPointSlice := func(object client.Object) bool {
88+
// Check if the object is an EndpointSlice
89+
endpointSlice, ok := object.(*discoveryv1.EndpointSlice)
90+
if !ok {
91+
return false
92+
}
93+
94+
gotLabel := endpointSlice.ObjectMeta.Labels[serviceOwnerLabel]
95+
wantLabel := c.ServiceName
96+
return gotLabel == wantLabel
97+
}
98+
99+
return ctrl.NewControllerManagedBy(mgr).
100+
For(&discoveryv1.EndpointSlice{},
101+
builder.WithPredicates(predicate.NewPredicateFuncs(ownsEndPointSlice))).
102+
Complete(c)
103+
}
104+
105+
func (c *EndpointSliceReconciler) validPod(endpoint discoveryv1.Endpoint) bool {
106+
validZone := c.Zone == "" || c.Zone != "" && *endpoint.Zone == c.Zone
107+
return validZone && *endpoint.Conditions.Ready
108+
109+
}

0 commit comments

Comments
 (0)