1
1
package backend
2
2
3
3
import (
4
+ "context"
4
5
"errors"
5
6
"math/rand"
6
7
"sync"
8
+ "time"
7
9
10
+ "github.com/google/go-cmp/cmp"
8
11
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
9
12
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
10
13
corev1 "k8s.io/api/core/v1"
14
+ v1 "k8s.io/api/core/v1"
15
+ "k8s.io/apimachinery/pkg/api/meta"
16
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17
+ "k8s.io/apimachinery/pkg/labels"
18
+ "k8s.io/client-go/informers"
19
+ informersv1 "k8s.io/client-go/informers/core/v1"
20
+ "k8s.io/client-go/kubernetes"
21
+ clientset "k8s.io/client-go/kubernetes"
22
+ listersv1 "k8s.io/client-go/listers/core/v1"
23
+ "k8s.io/client-go/tools/cache"
11
24
"k8s.io/klog/v2"
12
25
)
13
26
14
27
func NewK8sDataStore (options ... K8sDatastoreOption ) * K8sDatastore {
15
28
store := & K8sDatastore {
16
29
poolMu : sync.RWMutex {},
17
30
InferenceModels : & sync.Map {},
18
- pods : & sync.Map {},
19
31
}
32
+
33
+ store .podListerFactory = store .createPodLister
20
34
for _ , opt := range options {
21
35
opt (store )
22
36
}
@@ -25,29 +39,68 @@ func NewK8sDataStore(options ...K8sDatastoreOption) *K8sDatastore {
25
39
26
40
// The datastore is a local cache of relevant data for the given InferencePool (currently all pulled from k8s-api)
27
41
type K8sDatastore struct {
42
+ client kubernetes.Interface
28
43
// poolMu is used to synchronize access to the inferencePool.
29
- poolMu sync.RWMutex
30
- inferencePool * v1alpha1.InferencePool
31
- InferenceModels * sync.Map
32
- pods * sync.Map
44
+ poolMu sync.RWMutex
45
+ inferencePool * v1alpha1.InferencePool
46
+ podListerFactory PodListerFactory
47
+ podLister * PodLister
48
+ InferenceModels * sync.Map
33
49
}
34
50
35
51
type K8sDatastoreOption func (* K8sDatastore )
52
+ type PodListerFactory func (* v1alpha1.InferencePool ) * PodLister
36
53
37
54
// WithPods can be used in tests to override the pods.
38
- func WithPods ( pods [] * PodMetrics ) K8sDatastoreOption {
55
+ func WithPodListerFactory ( factory PodListerFactory ) K8sDatastoreOption {
39
56
return func (store * K8sDatastore ) {
40
- store .pods = & sync.Map {}
41
- for _ , pod := range pods {
42
- store .pods .Store (pod .Pod , true )
43
- }
57
+ store .podListerFactory = factory
44
58
}
45
59
}
46
60
61
+ type PodLister struct {
62
+ Lister listersv1.PodLister
63
+ sharedInformer informers.SharedInformerFactory
64
+ }
65
+
66
+ func (l * PodLister ) listEverything () ([]* corev1.Pod , error ) {
67
+ return l .Lister .List (labels .Everything ())
68
+
69
+ }
70
+
71
+ func (ds * K8sDatastore ) SetClient (client kubernetes.Interface ) {
72
+ ds .client = client
73
+ }
74
+
47
75
func (ds * K8sDatastore ) setInferencePool (pool * v1alpha1.InferencePool ) {
48
76
ds .poolMu .Lock ()
49
77
defer ds .poolMu .Unlock ()
78
+
79
+ if ds .inferencePool != nil && cmp .Equal (ds .inferencePool .Spec .Selector , pool .Spec .Selector ) {
80
+ // Pool updated, but the selector stayed the same, so no need to change the informer.
81
+ ds .inferencePool = pool
82
+ return
83
+ }
84
+
85
+ // New pool or selector updated.
50
86
ds .inferencePool = pool
87
+
88
+ if ds .podLister != nil && ds .podLister .sharedInformer != nil {
89
+ // Shutdown the old informer async since this takes a few seconds.
90
+ go func () {
91
+ ds .podLister .sharedInformer .Shutdown ()
92
+ }()
93
+ }
94
+
95
+ if ds .podListerFactory != nil {
96
+ // Create a new informer with the new selector.
97
+ ds .podLister = ds .podListerFactory (ds .inferencePool )
98
+ if ds .podLister != nil && ds .podLister .sharedInformer != nil {
99
+ ctx := context .Background ()
100
+ ds .podLister .sharedInformer .Start (ctx .Done ())
101
+ ds .podLister .sharedInformer .WaitForCacheSync (ctx .Done ())
102
+ }
103
+ }
51
104
}
52
105
53
106
func (ds * K8sDatastore ) getInferencePool () (* v1alpha1.InferencePool , error ) {
@@ -59,13 +112,58 @@ func (ds *K8sDatastore) getInferencePool() (*v1alpha1.InferencePool, error) {
59
112
return ds .inferencePool , nil
60
113
}
61
114
62
- func (ds * K8sDatastore ) GetPodIPs () []string {
63
- var ips []string
64
- ds .pods .Range (func (name , pod any ) bool {
65
- ips = append (ips , pod .(* corev1.Pod ).Status .PodIP )
66
- return true
67
- })
68
- return ips
115
+ func (ds * K8sDatastore ) createPodLister (pool * v1alpha1.InferencePool ) * PodLister {
116
+ if ds .client == nil {
117
+ return nil
118
+ }
119
+ klog .V (logutil .DEFAULT ).Infof ("Creating informer for pool %v" , pool .Name )
120
+ selectorSet := make (map [string ]string )
121
+ for k , v := range pool .Spec .Selector {
122
+ selectorSet [string (k )] = string (v )
123
+ }
124
+
125
+ newPodInformer := func (cs clientset.Interface , resyncPeriod time.Duration ) cache.SharedIndexInformer {
126
+ informer := informersv1 .NewFilteredPodInformer (cs , pool .Namespace , resyncPeriod , cache.Indexers {}, func (options * metav1.ListOptions ) {
127
+ options .LabelSelector = labels .SelectorFromSet (selectorSet ).String ()
128
+ })
129
+ err := informer .SetTransform (func (obj interface {}) (interface {}, error ) {
130
+ // Remove unnecessary fields to improve memory footprint.
131
+ if accessor , err := meta .Accessor (obj ); err == nil {
132
+ if accessor .GetManagedFields () != nil {
133
+ accessor .SetManagedFields (nil )
134
+ }
135
+ }
136
+ return obj , nil
137
+ })
138
+ if err != nil {
139
+ klog .Errorf ("Failed to set pod transformer: %v" , err )
140
+ }
141
+ return informer
142
+ }
143
+ // 0 means we disable resyncing, it is not really useful to resync every hour (the controller-runtime default),
144
+ // if things go wrong in the watch, no one will wait for an hour for things to get fixed.
145
+ // As precedence, kube-scheduler also disables this since it is expensive to list all pods from the api-server regularly.
146
+ resyncPeriod := time .Duration (0 )
147
+ sharedInformer := informers .NewSharedInformerFactory (ds .client , resyncPeriod )
148
+ sharedInformer .InformerFor (& v1.Pod {}, newPodInformer )
149
+
150
+ return & PodLister {
151
+ Lister : sharedInformer .Core ().V1 ().Pods ().Lister (),
152
+ sharedInformer : sharedInformer ,
153
+ }
154
+ }
155
+
156
+ func (ds * K8sDatastore ) getPods () ([]* corev1.Pod , error ) {
157
+ ds .poolMu .RLock ()
158
+ defer ds .poolMu .RUnlock ()
159
+ if ! ds .HasSynced () {
160
+ return nil , errors .New ("InferencePool is not initialized in datastore" )
161
+ }
162
+ pods , err := ds .podLister .listEverything ()
163
+ if err != nil {
164
+ return nil , err
165
+ }
166
+ return pods , nil
69
167
}
70
168
71
169
func (s * K8sDatastore ) FetchModelData (modelName string ) (returnModel * v1alpha1.InferenceModel ) {
0 commit comments