1
1
package backend
2
2
3
3
import (
4
+ "context"
4
5
"errors"
5
6
"math/rand"
6
7
"sync"
8
+ "time"
7
9
10
+ "github.com/google/go-cmp/cmp"
8
11
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
9
12
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
10
13
corev1 "k8s.io/api/core/v1"
14
+ v1 "k8s.io/api/core/v1"
15
+ "k8s.io/apimachinery/pkg/api/meta"
16
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17
+ "k8s.io/apimachinery/pkg/labels"
18
+ "k8s.io/client-go/informers"
19
+ informersv1 "k8s.io/client-go/informers/core/v1"
20
+ "k8s.io/client-go/kubernetes"
21
+ clientset "k8s.io/client-go/kubernetes"
22
+ listersv1 "k8s.io/client-go/listers/core/v1"
23
+ "k8s.io/client-go/tools/cache"
11
24
"k8s.io/klog/v2"
12
25
)
13
26
14
27
func NewK8sDataStore (options ... K8sDatastoreOption ) * K8sDatastore {
15
28
store := & K8sDatastore {
16
29
poolMu : sync.RWMutex {},
17
30
InferenceModels : & sync.Map {},
18
- pods : & sync.Map {},
19
31
}
32
+
33
+ store .podListerFactory = store .createPodLister
20
34
for _ , opt := range options {
21
35
opt (store )
22
36
}
@@ -25,29 +39,68 @@ func NewK8sDataStore(options ...K8sDatastoreOption) *K8sDatastore {
25
39
26
40
// The datastore is a local cache of relevant data for the given InferencePool (currently all pulled from k8s-api)
27
41
type K8sDatastore struct {
42
+ client kubernetes.Interface
28
43
// poolMu is used to synchronize access to the inferencePool.
29
- poolMu sync.RWMutex
30
- inferencePool * v1alpha1.InferencePool
31
- InferenceModels * sync.Map
32
- pods * sync.Map
44
+ poolMu sync.RWMutex
45
+ inferencePool * v1alpha1.InferencePool
46
+ podListerFactory PodListerFactory
47
+ podLister * PodLister
48
+ InferenceModels * sync.Map
33
49
}
34
50
35
51
type K8sDatastoreOption func (* K8sDatastore )
52
+ type PodListerFactory func (* v1alpha1.InferencePool ) * PodLister
36
53
37
54
// WithPods can be used in tests to override the pods.
38
- func WithPods ( pods [] * PodMetrics ) K8sDatastoreOption {
55
+ func WithPodListerFactory ( factory PodListerFactory ) K8sDatastoreOption {
39
56
return func (store * K8sDatastore ) {
40
- store .pods = & sync.Map {}
41
- for _ , pod := range pods {
42
- store .pods .Store (pod .Pod , true )
43
- }
57
+ store .podListerFactory = factory
44
58
}
45
59
}
46
60
61
+ type PodLister struct {
62
+ Lister listersv1.PodLister
63
+ sharedInformer informers.SharedInformerFactory
64
+ ctx context.Context
65
+ }
66
+
67
+ func (l * PodLister ) list (selector labels.Selector ) ([]* corev1.Pod , error ) {
68
+ return l .Lister .List (selector )
69
+
70
+ }
71
+
72
+ func (ds * K8sDatastore ) SetClient (client kubernetes.Interface ) {
73
+ ds .client = client
74
+ }
75
+
47
76
func (ds * K8sDatastore ) setInferencePool (pool * v1alpha1.InferencePool ) {
48
77
ds .poolMu .Lock ()
49
78
defer ds .poolMu .Unlock ()
79
+
80
+ if ds .inferencePool != nil && cmp .Equal (ds .inferencePool .Spec .Selector , pool .Spec .Selector ) {
81
+ // Pool updated, but the selector stayed the same, so no need to change the informer.
82
+ ds .inferencePool = pool
83
+ return
84
+ }
85
+
86
+ // New pool or selector updated.
50
87
ds .inferencePool = pool
88
+
89
+ if ds .podLister != nil && ds .podLister .sharedInformer != nil {
90
+ // Shutdown the old informer async since this takes a few seconds.
91
+ go func () {
92
+ ds .podLister .sharedInformer .Shutdown ()
93
+ }()
94
+ }
95
+
96
+ if ds .podListerFactory != nil {
97
+ // Create a new informer with the new selector.
98
+ ds .podLister = ds .podListerFactory (ds .inferencePool )
99
+ if ds .podLister != nil && ds .podLister .sharedInformer != nil {
100
+ ds .podLister .sharedInformer .Start (ds .podLister .ctx .Done ())
101
+ ds .podLister .sharedInformer .WaitForCacheSync (ds .podLister .ctx .Done ())
102
+ }
103
+ }
51
104
}
52
105
53
106
func (ds * K8sDatastore ) getInferencePool () (* v1alpha1.InferencePool , error ) {
@@ -59,13 +112,58 @@ func (ds *K8sDatastore) getInferencePool() (*v1alpha1.InferencePool, error) {
59
112
return ds .inferencePool , nil
60
113
}
61
114
62
- func (ds * K8sDatastore ) GetPodIPs () []string {
63
- var ips []string
64
- ds .pods .Range (func (name , pod any ) bool {
65
- ips = append (ips , pod .(* corev1.Pod ).Status .PodIP )
66
- return true
67
- })
68
- return ips
115
+ func (ds * K8sDatastore ) createPodLister (pool * v1alpha1.InferencePool ) * PodLister {
116
+ if ds .client == nil {
117
+ return nil
118
+ }
119
+ klog .V (logutil .DEFAULT ).Infof ("Creating informer for pool %v" , pool .Name )
120
+ selectorSet := make (map [string ]string )
121
+ for k , v := range pool .Spec .Selector {
122
+ selectorSet [string (k )] = string (v )
123
+ }
124
+
125
+ newPodInformer := func (cs clientset.Interface , resyncPeriod time.Duration ) cache.SharedIndexInformer {
126
+ informer := informersv1 .NewFilteredPodInformer (cs , pool .Namespace , 0 , nil , func (options * metav1.ListOptions ) {
127
+ options .LabelSelector = labels .SelectorFromSet (selectorSet ).String ()
128
+ })
129
+ err := informer .SetTransform (func (obj interface {}) (interface {}, error ) {
130
+ // Remove unnecessary fields to improve memory footprint.
131
+ if accessor , err := meta .Accessor (obj ); err == nil {
132
+ if accessor .GetManagedFields () != nil {
133
+ accessor .SetManagedFields (nil )
134
+ }
135
+ }
136
+ return obj , nil
137
+ })
138
+ if err != nil {
139
+ klog .Errorf ("Failed to set pod transformer: %v" , err )
140
+ }
141
+ return informer
142
+ }
143
+ sharedInformer := informers .NewSharedInformerFactory (ds .client , 0 )
144
+ sharedInformer .InformerFor (& v1.Pod {}, newPodInformer )
145
+
146
+ return & PodLister {
147
+ Lister : sharedInformer .Core ().V1 ().Pods ().Lister (),
148
+ sharedInformer : sharedInformer ,
149
+ ctx : context .Background (),
150
+ }
151
+ }
152
+
153
+ func (ds * K8sDatastore ) getPods () []* corev1.Pod {
154
+ ds .poolMu .RLock ()
155
+ defer ds .poolMu .RUnlock ()
156
+ if ds .podLister == nil {
157
+ klog .V (logutil .DEFAULT ).Info ("InferencePool not yet initialized" )
158
+ return []* corev1.Pod {}
159
+ }
160
+
161
+ pods , err := ds .podLister .list (labels .Everything ())
162
+ if err != nil {
163
+ klog .Errorf ("Failed to list pods for pool %s/%s: %v" , ds .inferencePool .Namespace , ds .inferencePool .Name , err )
164
+ return []* corev1.Pod {}
165
+ }
166
+ return pods
69
167
}
70
168
71
169
func (s * K8sDatastore ) FetchModelData (modelName string ) (returnModel * v1alpha1.InferenceModel ) {
0 commit comments