Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 30e6024

Browse files
committedFeb 3, 2025·
Replace EndpointSlice reconciler with pod list backed by informer
1 parent 5d32bf3 commit 30e6024

21 files changed

+421
-475
lines changed
 

‎.golangci.yml

-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ linters:
1414
- dupword
1515
- durationcheck
1616
- fatcontext
17-
- gci
1817
- ginkgolinter
1918
- gocritic
2019
- govet

‎pkg/ext-proc/backend/datastore.go

+115-17
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,36 @@
11
package backend
22

33
import (
4+
"context"
45
"errors"
56
"math/rand"
67
"sync"
8+
"time"
79

10+
"github.com/google/go-cmp/cmp"
811
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
912
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1013
corev1 "k8s.io/api/core/v1"
14+
v1 "k8s.io/api/core/v1"
15+
"k8s.io/apimachinery/pkg/api/meta"
16+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
17+
"k8s.io/apimachinery/pkg/labels"
18+
"k8s.io/client-go/informers"
19+
informersv1 "k8s.io/client-go/informers/core/v1"
20+
"k8s.io/client-go/kubernetes"
21+
clientset "k8s.io/client-go/kubernetes"
22+
listersv1 "k8s.io/client-go/listers/core/v1"
23+
"k8s.io/client-go/tools/cache"
1124
"k8s.io/klog/v2"
1225
)
1326

1427
func NewK8sDataStore(options ...K8sDatastoreOption) *K8sDatastore {
1528
store := &K8sDatastore{
1629
poolMu: sync.RWMutex{},
1730
InferenceModels: &sync.Map{},
18-
pods: &sync.Map{},
1931
}
32+
33+
store.podListerFactory = store.createPodLister
2034
for _, opt := range options {
2135
opt(store)
2236
}
@@ -25,29 +39,68 @@ func NewK8sDataStore(options ...K8sDatastoreOption) *K8sDatastore {
2539

2640
// The datastore is a local cache of relevant data for the given InferencePool (currently all pulled from k8s-api)
2741
type K8sDatastore struct {
42+
client kubernetes.Interface
2843
// poolMu is used to synchronize access to the inferencePool.
29-
poolMu sync.RWMutex
30-
inferencePool *v1alpha1.InferencePool
31-
InferenceModels *sync.Map
32-
pods *sync.Map
44+
poolMu sync.RWMutex
45+
inferencePool *v1alpha1.InferencePool
46+
podListerFactory PodListerFactory
47+
podLister *PodLister
48+
InferenceModels *sync.Map
3349
}
3450

3551
type K8sDatastoreOption func(*K8sDatastore)
52+
type PodListerFactory func(*v1alpha1.InferencePool) *PodLister
3653

3754
// WithPods can be used in tests to override the pods.
38-
func WithPods(pods []*PodMetrics) K8sDatastoreOption {
55+
func WithPodListerFactory(factory PodListerFactory) K8sDatastoreOption {
3956
return func(store *K8sDatastore) {
40-
store.pods = &sync.Map{}
41-
for _, pod := range pods {
42-
store.pods.Store(pod.Pod, true)
43-
}
57+
store.podListerFactory = factory
4458
}
4559
}
4660

61+
type PodLister struct {
62+
Lister listersv1.PodLister
63+
sharedInformer informers.SharedInformerFactory
64+
ctx context.Context
65+
}
66+
67+
func (l *PodLister) list(selector labels.Selector) ([]*corev1.Pod, error) {
68+
return l.Lister.List(selector)
69+
70+
}
71+
72+
func (ds *K8sDatastore) SetClient(client kubernetes.Interface) {
73+
ds.client = client
74+
}
75+
4776
func (ds *K8sDatastore) setInferencePool(pool *v1alpha1.InferencePool) {
4877
ds.poolMu.Lock()
4978
defer ds.poolMu.Unlock()
79+
80+
if ds.inferencePool != nil && cmp.Equal(ds.inferencePool.Spec.Selector, pool.Spec.Selector) {
81+
// Pool updated, but the selector stayed the same, so no need to change the informer.
82+
ds.inferencePool = pool
83+
return
84+
}
85+
86+
// New pool or selector updated.
5087
ds.inferencePool = pool
88+
89+
if ds.podLister != nil && ds.podLister.sharedInformer != nil {
90+
// Shutdown the old informer async since this takes a few seconds.
91+
go func() {
92+
ds.podLister.sharedInformer.Shutdown()
93+
}()
94+
}
95+
96+
if ds.podListerFactory != nil {
97+
// Create a new informer with the new selector.
98+
ds.podLister = ds.podListerFactory(ds.inferencePool)
99+
if ds.podLister != nil && ds.podLister.sharedInformer != nil {
100+
ds.podLister.sharedInformer.Start(ds.podLister.ctx.Done())
101+
ds.podLister.sharedInformer.WaitForCacheSync(ds.podLister.ctx.Done())
102+
}
103+
}
51104
}
52105

53106
func (ds *K8sDatastore) getInferencePool() (*v1alpha1.InferencePool, error) {
@@ -59,13 +112,58 @@ func (ds *K8sDatastore) getInferencePool() (*v1alpha1.InferencePool, error) {
59112
return ds.inferencePool, nil
60113
}
61114

62-
func (ds *K8sDatastore) GetPodIPs() []string {
63-
var ips []string
64-
ds.pods.Range(func(name, pod any) bool {
65-
ips = append(ips, pod.(*corev1.Pod).Status.PodIP)
66-
return true
67-
})
68-
return ips
115+
func (ds *K8sDatastore) createPodLister(pool *v1alpha1.InferencePool) *PodLister {
116+
if ds.client == nil {
117+
return nil
118+
}
119+
klog.V(logutil.DEFAULT).Infof("Creating informer for pool %v", pool.Name)
120+
selectorSet := make(map[string]string)
121+
for k, v := range pool.Spec.Selector {
122+
selectorSet[string(k)] = string(v)
123+
}
124+
125+
newPodInformer := func(cs clientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
126+
informer := informersv1.NewFilteredPodInformer(cs, pool.Namespace, 0, nil, func(options *metav1.ListOptions) {
127+
options.LabelSelector = labels.SelectorFromSet(selectorSet).String()
128+
})
129+
err := informer.SetTransform(func(obj interface{}) (interface{}, error) {
130+
// Remove unnecessary fields to improve memory footprint.
131+
if accessor, err := meta.Accessor(obj); err == nil {
132+
if accessor.GetManagedFields() != nil {
133+
accessor.SetManagedFields(nil)
134+
}
135+
}
136+
return obj, nil
137+
})
138+
if err != nil {
139+
klog.Errorf("Failed to set pod transformer: %v", err)
140+
}
141+
return informer
142+
}
143+
sharedInformer := informers.NewSharedInformerFactory(ds.client, 0)
144+
sharedInformer.InformerFor(&v1.Pod{}, newPodInformer)
145+
146+
return &PodLister{
147+
Lister: sharedInformer.Core().V1().Pods().Lister(),
148+
sharedInformer: sharedInformer,
149+
ctx: context.Background(),
150+
}
151+
}
152+
153+
func (ds *K8sDatastore) getPods() []*corev1.Pod {
154+
ds.poolMu.RLock()
155+
defer ds.poolMu.RUnlock()
156+
if ds.podLister == nil {
157+
klog.V(logutil.DEFAULT).Info("InferencePool not yet initialized")
158+
return []*corev1.Pod{}
159+
}
160+
161+
pods, err := ds.podLister.list(labels.Everything())
162+
if err != nil {
163+
klog.Errorf("Failed to list pods for pool %s/%s: %v", ds.inferencePool.Namespace, ds.inferencePool.Name, err)
164+
return []*corev1.Pod{}
165+
}
166+
return pods
69167
}
70168

71169
func (s *K8sDatastore) FetchModelData(modelName string) (returnModel *v1alpha1.InferenceModel) {

‎pkg/ext-proc/backend/endpointslice_reconciler.go

-109
This file was deleted.

‎pkg/ext-proc/backend/endpointslice_reconcilier_test.go

-202
This file was deleted.

‎pkg/ext-proc/backend/fake.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,16 @@ import (
88
)
99

1010
type FakePodMetricsClient struct {
11-
Err map[Pod]error
12-
Res map[Pod]*PodMetrics
11+
Err map[string]error
12+
Res map[string]*PodMetrics
1313
}
1414

1515
func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, pod Pod, existing *PodMetrics) (*PodMetrics, error) {
16-
if err, ok := f.Err[pod]; ok {
16+
if err, ok := f.Err[pod.Name]; ok {
1717
return nil, err
1818
}
19-
klog.V(1).Infof("pod: %+v\n existing: %+v \n new: %+v \n", pod, existing, f.Res[pod])
20-
return f.Res[pod], nil
19+
klog.V(1).Infof("pod: %+v\n existing: %+v \n new: %+v \n", pod, existing, f.Res[pod.Name])
20+
return f.Res[pod.Name], nil
2121
}
2222

2323
type FakeDataStore struct {

‎pkg/ext-proc/backend/inferencemodel_reconciler_test.go

+21
Original file line numberDiff line numberDiff line change
@@ -146,3 +146,24 @@ func populateServiceMap(services ...*v1alpha1.InferenceModel) *sync.Map {
146146
}
147147
return returnVal
148148
}
149+
150+
func mapsEqual(map1, map2 *sync.Map) bool {
151+
equal := true
152+
153+
map1.Range(func(k, v any) bool {
154+
if _, ok := map2.Load(k); !ok {
155+
equal = false
156+
return false
157+
}
158+
return true
159+
})
160+
map2.Range(func(k, v any) bool {
161+
if _, ok := map1.Load(k); !ok {
162+
equal = false
163+
return false
164+
}
165+
return true
166+
})
167+
168+
return equal
169+
}

‎pkg/ext-proc/backend/inferencepool_reconciler.go

-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ type InferencePoolReconciler struct {
2121
Record record.EventRecorder
2222
PoolNamespacedName types.NamespacedName
2323
Datastore *K8sDatastore
24-
Zone string
2524
}
2625

2726
func (c *InferencePoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {

‎pkg/ext-proc/backend/provider.go

+58-20
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@ package backend
33
import (
44
"context"
55
"fmt"
6+
"math/rand"
7+
"strconv"
68
"sync"
79
"time"
810

911
"go.uber.org/multierr"
1012
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
13+
corev1 "k8s.io/api/core/v1"
1114
klog "k8s.io/klog/v2"
1215
)
1316

@@ -47,11 +50,11 @@ func (p *Provider) AllPodMetrics() []*PodMetrics {
4750
}
4851

4952
func (p *Provider) UpdatePodMetrics(pod Pod, pm *PodMetrics) {
50-
p.podMetrics.Store(pod, pm)
53+
p.podMetrics.Store(pod.Name, pm)
5154
}
5255

5356
func (p *Provider) GetPodMetrics(pod Pod) (*PodMetrics, bool) {
54-
val, ok := p.podMetrics.Load(pod)
57+
val, ok := p.podMetrics.Load(pod.Name)
5558
if ok {
5659
return val.(*PodMetrics), true
5760
}
@@ -101,31 +104,66 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio
101104
// refreshPodsOnce lists pods and updates keys in the podMetrics map.
102105
// Note this function doesn't update the PodMetrics value, it's done separately.
103106
func (p *Provider) refreshPodsOnce() {
104-
// merge new pods with cached ones.
105-
// add new pod to the map
106-
addNewPods := func(k, v any) bool {
107-
pod := k.(Pod)
108-
if _, ok := p.podMetrics.Load(pod); !ok {
109-
new := &PodMetrics{
110-
Pod: pod,
111-
Metrics: Metrics{
112-
ActiveModels: make(map[string]int),
113-
},
114-
}
115-
p.podMetrics.Store(pod, new)
107+
pool, err := p.datastore.getInferencePool()
108+
if err != nil {
109+
klog.V(logutil.DEFAULT).Infof("Pool not ready: %v", err)
110+
p.podMetrics.Clear()
111+
return
112+
}
113+
114+
pods := p.datastore.getPods()
115+
revision := rand.Int()
116+
ready := 0
117+
for _, pod := range pods {
118+
if !podIsReady(pod) {
119+
continue
116120
}
117-
return true
121+
// a ready pod
122+
ready++
123+
if val, ok := p.podMetrics.Load(pod.Name); ok {
124+
// pod already exists
125+
pm := val.(*PodMetrics)
126+
pm.revision = revision
127+
continue
128+
}
129+
// new pod, add to the store for probing
130+
new := &PodMetrics{
131+
Pod: Pod{
132+
Name: pod.Name,
133+
Address: pod.Status.PodIP + ":" + strconv.Itoa(int(pool.Spec.TargetPortNumber)),
134+
},
135+
Metrics: Metrics{
136+
ActiveModels: make(map[string]int),
137+
},
138+
revision: revision,
139+
}
140+
p.podMetrics.Store(pod.Name, new)
118141
}
142+
143+
klog.V(logutil.DEFAULT).Infof("Pods in pool %s/%s with selector %v: total=%v ready=%v",
144+
pool.Namespace, pool.Name, pool.Spec.Selector, len(pods), ready)
145+
119146
// remove pods that don't exist any more.
120147
mergeFn := func(k, v any) bool {
121-
pod := k.(Pod)
122-
if _, ok := p.datastore.pods.Load(pod); !ok {
123-
p.podMetrics.Delete(pod)
148+
pm := v.(*PodMetrics)
149+
if pm.revision != revision {
150+
p.podMetrics.Delete(pm.Pod.Name)
124151
}
125152
return true
126153
}
127154
p.podMetrics.Range(mergeFn)
128-
p.datastore.pods.Range(addNewPods)
155+
}
156+
157+
func podIsReady(pod *corev1.Pod) bool {
158+
if pod.DeletionTimestamp != nil {
159+
return false
160+
}
161+
for _, condition := range pod.Status.Conditions {
162+
if condition.Type == corev1.PodReady {
163+
return condition.Status == corev1.ConditionTrue
164+
}
165+
}
166+
return false
129167
}
130168

131169
func (p *Provider) refreshMetricsOnce() error {
@@ -141,8 +179,8 @@ func (p *Provider) refreshMetricsOnce() error {
141179
errCh := make(chan error)
142180
processOnePod := func(key, value any) bool {
143181
klog.V(logutil.TRACE).Infof("Processing pod %v and metric %v", key, value)
144-
pod := key.(Pod)
145182
existing := value.(*PodMetrics)
183+
pod := existing.Pod
146184
wg.Add(1)
147185
go func() {
148186
defer wg.Done()

‎pkg/ext-proc/backend/provider_test.go

+105-38
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,18 @@ package backend
22

33
import (
44
"errors"
5-
"sync"
65
"testing"
7-
"time"
86

97
"github.com/google/go-cmp/cmp"
108
"github.com/google/go-cmp/cmp/cmpopts"
9+
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
10+
testingutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/testing"
11+
corev1 "k8s.io/api/core/v1"
1112
)
1213

1314
var (
1415
pod1 = &PodMetrics{
15-
Pod: Pod{Name: "pod1"},
16+
Pod: Pod{Name: "pod1", Address: "address1:9009"},
1617
Metrics: Metrics{
1718
WaitingQueueSize: 0,
1819
KVCacheUsagePercent: 0.2,
@@ -24,7 +25,7 @@ var (
2425
},
2526
}
2627
pod2 = &PodMetrics{
27-
Pod: Pod{Name: "pod2"},
28+
Pod: Pod{Name: "pod2", Address: "address2:9009"},
2829
Metrics: Metrics{
2930
WaitingQueueSize: 1,
3031
KVCacheUsagePercent: 0.2,
@@ -38,44 +39,67 @@ var (
3839
)
3940

4041
func TestProvider(t *testing.T) {
42+
allPodsLister := &testingutil.FakePodLister{
43+
PodsList: []*corev1.Pod{
44+
testingutil.MakePod(pod1.Pod.Name).SetReady().SetPodIP("address1").Obj(),
45+
testingutil.MakePod(pod2.Pod.Name).SetReady().SetPodIP("address2").Obj(),
46+
},
47+
}
48+
allPodsMetricsClient := &FakePodMetricsClient{
49+
Res: map[string]*PodMetrics{
50+
pod1.Pod.Name: pod1,
51+
pod2.Pod.Name: pod2,
52+
},
53+
}
54+
4155
tests := []struct {
42-
name string
43-
pmc PodMetricsClient
44-
datastore *K8sDatastore
45-
initErr bool
46-
want []*PodMetrics
56+
name string
57+
initPodMetrics []*PodMetrics
58+
lister *testingutil.FakePodLister
59+
pmc PodMetricsClient
60+
step func(*Provider)
61+
want []*PodMetrics
4762
}{
4863
{
49-
name: "Init success",
50-
datastore: &K8sDatastore{
51-
pods: populateMap(pod1.Pod, pod2.Pod),
64+
name: "Init without refreshing pods",
65+
initPodMetrics: []*PodMetrics{pod1, pod2},
66+
lister: allPodsLister,
67+
pmc: allPodsMetricsClient,
68+
step: func(p *Provider) {
69+
_ = p.refreshMetricsOnce()
5270
},
53-
pmc: &FakePodMetricsClient{
54-
Res: map[Pod]*PodMetrics{
55-
pod1.Pod: pod1,
56-
pod2.Pod: pod2,
57-
},
71+
want: []*PodMetrics{pod1, pod2},
72+
},
73+
{
74+
name: "Fetching all success",
75+
lister: allPodsLister,
76+
pmc: allPodsMetricsClient,
77+
step: func(p *Provider) {
78+
p.refreshPodsOnce()
79+
_ = p.refreshMetricsOnce()
5880
},
5981
want: []*PodMetrics{pod1, pod2},
6082
},
6183
{
62-
name: "Fetch metrics error",
84+
name: "Fetch metrics error",
85+
lister: allPodsLister,
6386
pmc: &FakePodMetricsClient{
64-
Err: map[Pod]error{
65-
pod2.Pod: errors.New("injected error"),
87+
Err: map[string]error{
88+
pod2.Pod.Name: errors.New("injected error"),
6689
},
67-
Res: map[Pod]*PodMetrics{
68-
pod1.Pod: pod1,
90+
Res: map[string]*PodMetrics{
91+
pod1.Pod.Name: pod1,
6992
},
7093
},
71-
datastore: &K8sDatastore{
72-
pods: populateMap(pod1.Pod, pod2.Pod),
94+
step: func(p *Provider) {
95+
p.refreshPodsOnce()
96+
_ = p.refreshMetricsOnce()
7397
},
7498
want: []*PodMetrics{
7599
pod1,
76100
// Failed to fetch pod2 metrics so it remains the default values.
77101
{
78-
Pod: Pod{Name: "pod2"},
102+
Pod: pod2.Pod,
79103
Metrics: Metrics{
80104
WaitingQueueSize: 0,
81105
KVCacheUsagePercent: 0,
@@ -85,30 +109,73 @@ func TestProvider(t *testing.T) {
85109
},
86110
},
87111
},
112+
{
113+
name: "A new pod added",
114+
initPodMetrics: []*PodMetrics{pod2},
115+
lister: allPodsLister,
116+
pmc: allPodsMetricsClient,
117+
step: func(p *Provider) {
118+
p.refreshPodsOnce()
119+
_ = p.refreshMetricsOnce()
120+
},
121+
want: []*PodMetrics{pod1, pod2},
122+
},
123+
{
124+
name: "A pod removed",
125+
initPodMetrics: []*PodMetrics{pod1, pod2},
126+
lister: &testingutil.FakePodLister{
127+
PodsList: []*corev1.Pod{
128+
testingutil.MakePod(pod2.Pod.Name).SetReady().SetPodIP("address2").Obj(),
129+
},
130+
},
131+
pmc: allPodsMetricsClient,
132+
step: func(p *Provider) {
133+
p.refreshPodsOnce()
134+
_ = p.refreshMetricsOnce()
135+
},
136+
want: []*PodMetrics{pod2},
137+
},
138+
{
139+
name: "A pod removed, another added",
140+
initPodMetrics: []*PodMetrics{pod1},
141+
lister: &testingutil.FakePodLister{
142+
PodsList: []*corev1.Pod{
143+
testingutil.MakePod(pod1.Pod.Name).SetReady().SetPodIP("address1").Obj(),
144+
},
145+
},
146+
pmc: allPodsMetricsClient,
147+
step: func(p *Provider) {
148+
p.refreshPodsOnce()
149+
_ = p.refreshMetricsOnce()
150+
},
151+
want: []*PodMetrics{pod1},
152+
},
88153
}
89154

90155
for _, test := range tests {
91156
t.Run(test.name, func(t *testing.T) {
92-
p := NewProvider(test.pmc, test.datastore)
93-
err := p.Init(time.Millisecond, time.Millisecond)
94-
if test.initErr != (err != nil) {
95-
t.Fatalf("Unexpected error, got: %v, want: %v", err, test.initErr)
157+
datastore := NewK8sDataStore(WithPodListerFactory(
158+
func(pool *v1alpha1.InferencePool) *PodLister {
159+
return &PodLister{
160+
Lister: test.lister,
161+
}
162+
}))
163+
datastore.setInferencePool(&v1alpha1.InferencePool{
164+
Spec: v1alpha1.InferencePoolSpec{TargetPortNumber: 9009},
165+
})
166+
p := NewProvider(test.pmc, datastore)
167+
for _, m := range test.initPodMetrics {
168+
p.UpdatePodMetrics(m.Pod, m)
96169
}
170+
test.step(p)
97171
metrics := p.AllPodMetrics()
98172
lessFunc := func(a, b *PodMetrics) bool {
99173
return a.String() < b.String()
100174
}
101-
if diff := cmp.Diff(test.want, metrics, cmpopts.SortSlices(lessFunc)); diff != "" {
175+
if diff := cmp.Diff(test.want, metrics, cmpopts.SortSlices(lessFunc),
176+
cmpopts.IgnoreFields(PodMetrics{}, "revision")); diff != "" {
102177
t.Errorf("Unexpected output (-want +got): %v", diff)
103178
}
104179
})
105180
}
106181
}
107-
108-
func populateMap(pods ...Pod) *sync.Map {
109-
newMap := &sync.Map{}
110-
for _, pod := range pods {
111-
newMap.Store(pod, true)
112-
}
113-
return newMap
114-
}

‎pkg/ext-proc/backend/types.go

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ type Metrics struct {
2828
type PodMetrics struct {
2929
Pod
3030
Metrics
31+
revision int
3132
}
3233

3334
func (pm *PodMetrics) String() string {

‎pkg/ext-proc/health.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
healthPb "google.golang.org/grpc/health/grpc_health_v1"
88
"google.golang.org/grpc/status"
99
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
10+
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1011
klog "k8s.io/klog/v2"
1112
)
1213

@@ -19,7 +20,7 @@ func (s *healthServer) Check(ctx context.Context, in *healthPb.HealthCheckReques
1920
klog.Infof("gRPC health check not serving: %s", in.String())
2021
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_NOT_SERVING}, nil
2122
}
22-
klog.Infof("gRPC health check serving: %s", in.String())
23+
klog.V(logutil.DEBUG).Infof("gRPC health check serving: %s", in.String())
2324
return &healthPb.HealthCheckResponse{Status: healthPb.HealthCheckResponse_SERVING}, nil
2425
}
2526

‎pkg/ext-proc/main.go

+8-18
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
runserver "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/server"
1919
"k8s.io/apimachinery/pkg/runtime"
2020
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
21+
"k8s.io/client-go/kubernetes"
2122
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
2223
"k8s.io/client-go/rest"
2324
"k8s.io/component-base/metrics/legacyregistry"
@@ -53,14 +54,6 @@ var (
5354
"poolNamespace",
5455
runserver.DefaultPoolNamespace,
5556
"Namespace of the InferencePool this Endpoint Picker is associated with.")
56-
serviceName = flag.String(
57-
"serviceName",
58-
runserver.DefaultServiceName,
59-
"Name of the Service that will be used to read EndpointSlices from")
60-
zone = flag.String(
61-
"zone",
62-
runserver.DefaultZone,
63-
"The zone that this instance is created in. Will be passed to the corresponding endpointSlice. ")
6457
refreshPodsInterval = flag.Duration(
6558
"refreshPodsInterval",
6659
runserver.DefaultRefreshPodsInterval,
@@ -106,8 +99,6 @@ func main() {
10699
TargetPodHeader: *targetPodHeader,
107100
PoolName: *poolName,
108101
PoolNamespace: *poolNamespace,
109-
ServiceName: *serviceName,
110-
Zone: *zone,
111102
RefreshPodsInterval: *refreshPodsInterval,
112103
RefreshMetricsInterval: *refreshMetricsInterval,
113104
Scheme: scheme,
@@ -116,12 +107,15 @@ func main() {
116107
}
117108
serverRunner.Setup()
118109

110+
k8sClient, err := kubernetes.NewForConfigAndClient(cfg, serverRunner.Manager.GetHTTPClient())
111+
if err != nil {
112+
klog.Fatalf("Failed to create client: %v", err)
113+
}
114+
datastore.SetClient(k8sClient)
115+
119116
// Start health and ext-proc servers in goroutines
120117
healthSvr := startHealthServer(datastore, *grpcHealthPort)
121-
extProcSvr := serverRunner.Start(
122-
datastore,
123-
&vllm.PodMetricsClientImpl{},
124-
)
118+
extProcSvr := serverRunner.Start(&vllm.PodMetricsClientImpl{})
125119
// Start metrics handler
126120
metricsSvr := startMetricsHandler(*metricsPort, cfg)
127121

@@ -216,9 +210,5 @@ func validateFlags() error {
216210
return fmt.Errorf("required %q flag not set", "poolName")
217211
}
218212

219-
if *serviceName == "" {
220-
return fmt.Errorf("required %q flag not set", "serviceName")
221-
}
222-
223213
return nil
224214
}

‎pkg/ext-proc/scheduling/filter_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"testing"
66

77
"github.com/google/go-cmp/cmp"
8+
"github.com/google/go-cmp/cmp/cmpopts"
89
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
910
)
1011

@@ -206,7 +207,7 @@ func TestFilter(t *testing.T) {
206207
t.Errorf("Unexpected error, got %v, want %v", err, test.err)
207208
}
208209

209-
if diff := cmp.Diff(test.output, got); diff != "" {
210+
if diff := cmp.Diff(test.output, got, cmpopts.IgnoreFields(backend.PodMetrics{}, "revision")); diff != "" {
210211
t.Errorf("Unexpected output (-want +got): %v", diff)
211212
}
212213
})
@@ -400,7 +401,7 @@ func TestFilterFunc(t *testing.T) {
400401
t.Errorf("Unexpected error, got %v, want %v", err, test.err)
401402
}
402403

403-
if diff := cmp.Diff(test.output, got); diff != "" {
404+
if diff := cmp.Diff(test.output, got, cmpopts.IgnoreFields(backend.PodMetrics{}, "revision")); diff != "" {
404405
t.Errorf("Unexpected output (-want +got): %v", diff)
405406
}
406407
})

‎pkg/ext-proc/server/runserver.go

+6-24
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,12 @@ type ExtProcServerRunner struct {
2323
TargetPodHeader string
2424
PoolName string
2525
PoolNamespace string
26-
ServiceName string
27-
Zone string
2826
RefreshPodsInterval time.Duration
2927
RefreshMetricsInterval time.Duration
3028
Scheme *runtime.Scheme
3129
Config *rest.Config
3230
Datastore *backend.K8sDatastore
33-
manager ctrl.Manager
31+
Manager ctrl.Manager
3432
}
3533

3634
// Default values for CLI flags in main
@@ -39,8 +37,6 @@ const (
3937
DefaultTargetPodHeader = "target-pod" // default for --targetPodHeader
4038
DefaultPoolName = "" // required but no default
4139
DefaultPoolNamespace = "default" // default for --poolNamespace
42-
DefaultServiceName = "" // required but no default
43-
DefaultZone = "" // default for --zone
4440
DefaultRefreshPodsInterval = 10 * time.Second // default for --refreshPodsInterval
4541
DefaultRefreshMetricsInterval = 50 * time.Millisecond // default for --refreshMetricsInterval
4642
)
@@ -51,22 +47,20 @@ func NewDefaultExtProcServerRunner() *ExtProcServerRunner {
5147
TargetPodHeader: DefaultTargetPodHeader,
5248
PoolName: DefaultPoolName,
5349
PoolNamespace: DefaultPoolNamespace,
54-
ServiceName: DefaultServiceName,
55-
Zone: DefaultZone,
5650
RefreshPodsInterval: DefaultRefreshPodsInterval,
5751
RefreshMetricsInterval: DefaultRefreshMetricsInterval,
5852
// Scheme, Config, and Datastore can be assigned later.
5953
}
6054
}
6155

62-
// Setup creates the reconcilers for pools, models, and endpointSlices and starts the manager.
56+
// Setup creates the reconcilers for pools and models and starts the manager.
6357
func (r *ExtProcServerRunner) Setup() {
6458
// Create a new manager to manage controllers
6559
mgr, err := ctrl.NewManager(r.Config, ctrl.Options{Scheme: r.Scheme})
6660
if err != nil {
6761
klog.Fatalf("Failed to create controller manager: %v", err)
6862
}
69-
r.manager = mgr
63+
r.Manager = mgr
7064

7165
// Create the controllers and register them with the manager
7266
if err := (&backend.InferencePoolReconciler{
@@ -94,22 +88,10 @@ func (r *ExtProcServerRunner) Setup() {
9488
}).SetupWithManager(mgr); err != nil {
9589
klog.Fatalf("Failed setting up InferenceModelReconciler: %v", err)
9690
}
97-
98-
if err := (&backend.EndpointSliceReconciler{
99-
Datastore: r.Datastore,
100-
Scheme: mgr.GetScheme(),
101-
Client: mgr.GetClient(),
102-
Record: mgr.GetEventRecorderFor("endpointslice"),
103-
ServiceName: r.ServiceName,
104-
Zone: r.Zone,
105-
}).SetupWithManager(mgr); err != nil {
106-
klog.Fatalf("Failed setting up EndpointSliceReconciler: %v", err)
107-
}
10891
}
10992

11093
// Start starts the Envoy external processor server in a goroutine.
11194
func (r *ExtProcServerRunner) Start(
112-
podDatastore *backend.K8sDatastore,
11395
podMetricsClient backend.PodMetricsClient,
11496
) *grpc.Server {
11597
svr := grpc.NewServer()
@@ -122,7 +104,7 @@ func (r *ExtProcServerRunner) Start(
122104
klog.Infof("Ext-proc server listening on port: %d", r.GrpcPort)
123105

124106
// Initialize backend provider
125-
pp := backend.NewProvider(podMetricsClient, podDatastore)
107+
pp := backend.NewProvider(podMetricsClient, r.Datastore)
126108
if err := pp.Init(r.RefreshPodsInterval, r.RefreshMetricsInterval); err != nil {
127109
klog.Fatalf("Failed to initialize backend provider: %v", err)
128110
}
@@ -143,12 +125,12 @@ func (r *ExtProcServerRunner) Start(
143125
}
144126

145127
func (r *ExtProcServerRunner) StartManager() {
146-
if r.manager == nil {
128+
if r.Manager == nil {
147129
klog.Fatalf("Runner has no manager setup to run: %v", r)
148130
}
149131
// Start the controller manager. Blocking and will return when shutdown is complete.
150132
klog.Infof("Starting controller manager")
151-
mgr := r.manager
133+
mgr := r.Manager
152134
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
153135
klog.Fatalf("Error starting controller manager: %v", err)
154136
}

‎pkg/ext-proc/test/utils.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@ import (
1818

1919
func StartExtProc(port int, refreshPodsInterval, refreshMetricsInterval time.Duration, pods []*backend.PodMetrics, models map[string]*v1alpha1.InferenceModel) *grpc.Server {
2020
ps := make(backend.PodSet)
21-
pms := make(map[backend.Pod]*backend.PodMetrics)
21+
pms := make(map[string]*backend.PodMetrics)
2222
for _, pod := range pods {
2323
ps[pod.Pod] = true
24-
pms[pod.Pod] = pod
24+
pms[pod.Pod.Name] = pod
2525
}
2626
pmc := &backend.FakePodMetricsClient{Res: pms}
27-
pp := backend.NewProvider(pmc, backend.NewK8sDataStore(backend.WithPods(pods)))
27+
pp := backend.NewProvider(pmc, backend.NewK8sDataStore()) // backend.WithPods(pods)
2828
if err := pp.Init(refreshPodsInterval, refreshMetricsInterval); err != nil {
2929
klog.Fatalf("failed to initialize: %v", err)
3030
}

‎pkg/ext-proc/util/testing/lister.go

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package testing
2+
3+
import (
4+
v1 "k8s.io/api/core/v1"
5+
"k8s.io/apimachinery/pkg/labels"
6+
listersv1 "k8s.io/client-go/listers/core/v1"
7+
)
8+
9+
type FakePodLister struct {
10+
PodsList []*v1.Pod
11+
}
12+
13+
func (l *FakePodLister) List(selector labels.Selector) (ret []*v1.Pod, err error) {
14+
return l.PodsList, nil
15+
}
16+
17+
func (l *FakePodLister) Pods(namespace string) listersv1.PodNamespaceLister {
18+
panic("not implemented")
19+
}

‎pkg/ext-proc/util/testing/wrappers.go

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package testing
2+
3+
import (
4+
corev1 "k8s.io/api/core/v1"
5+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
6+
)
7+
8+
// PodWrapper wraps a Pod inside.
9+
type PodWrapper struct{ corev1.Pod }
10+
11+
// MakePod creates a Pod wrapper.
12+
func MakePod(name string) *PodWrapper {
13+
return &PodWrapper{
14+
corev1.Pod{
15+
ObjectMeta: metav1.ObjectMeta{
16+
Name: name,
17+
},
18+
},
19+
}
20+
}
21+
22+
// Obj returns the inner Pod.
23+
func (p *PodWrapper) Obj() *corev1.Pod {
24+
return &p.Pod
25+
}
26+
27+
func (p *PodWrapper) SetReady() *PodWrapper {
28+
p.Status.Conditions = []corev1.PodCondition{{
29+
Type: corev1.PodReady,
30+
Status: corev1.ConditionTrue,
31+
}}
32+
return p
33+
}
34+
35+
func (p *PodWrapper) SetPodIP(podIP string) *PodWrapper {
36+
p.Status.PodIP = podIP
37+
return p
38+
}

‎pkg/manifests/ext_proc.yaml

-2
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,6 @@ spec:
7777
- "vllm-llama2-7b-pool"
7878
- -v
7979
- "3"
80-
- -serviceName
81-
- "vllm-llama2-7b-pool"
8280
- -grpcPort
8381
- "9002"
8482
- -grpcHealthPort

‎pkg/manifests/vllm/deployment.yaml

+1-14
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,3 @@
1-
apiVersion: v1
2-
kind: Service
3-
metadata:
4-
name: vllm-llama2-7b-pool
5-
spec:
6-
selector:
7-
app: vllm-llama2-7b-pool
8-
ports:
9-
- protocol: TCP
10-
port: 8000
11-
targetPort: 8000
12-
type: ClusterIP
13-
---
141
apiVersion: apps/v1
152
kind: Deployment
163
metadata:
@@ -132,4 +119,4 @@ spec:
132119
emptyDir:
133120
medium: Memory
134121
- name: adapters
135-
emptyDir: {}
122+
emptyDir: {}

‎test/e2e/e2e_suite_test.go

-5
Original file line numberDiff line numberDiff line change
@@ -245,11 +245,6 @@ func createModelServer(k8sClient client.Client, secretPath, deployPath string) {
245245

246246
// Wait for the deployment to be available.
247247
testutils.DeploymentAvailable(ctx, k8sClient, deploy, modelReadyTimeout, interval)
248-
249-
// Wait for the service to exist.
250-
testutils.EventuallyExists(ctx, func() error {
251-
return k8sClient.Get(ctx, types.NamespacedName{Namespace: nsName, Name: modelServerName}, &corev1.Service{})
252-
}, existsTimeout, interval)
253248
}
254249

255250
// createEnvoy creates the envoy proxy resources used for testing from the given filePath.

‎test/integration/hermetic_test.go

+36-13
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"log"
1313
"os"
1414
"path/filepath"
15+
"strconv"
1516
"testing"
1617
"time"
1718

@@ -25,6 +26,8 @@ import (
2526
"inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/backend"
2627
runserver "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/server"
2728
extprocutils "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/test"
29+
testingutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/testing"
30+
corev1 "k8s.io/api/core/v1"
2831
"k8s.io/apimachinery/pkg/runtime"
2932
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
3033
k8syaml "k8s.io/apimachinery/pkg/util/yaml"
@@ -112,7 +115,7 @@ func SKIPTestHandleRequestBody(t *testing.T) {
112115
{
113116
Header: &configPb.HeaderValue{
114117
Key: runserver.DefaultTargetPodHeader,
115-
RawValue: []byte("address-1"),
118+
RawValue: []byte("pod-1:8000"),
116119
},
117120
},
118121
{
@@ -177,7 +180,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
177180
{
178181
Header: &configPb.HeaderValue{
179182
Key: runserver.DefaultTargetPodHeader,
180-
RawValue: []byte("address-1"),
183+
RawValue: []byte("pod-1:8000"),
181184
},
182185
},
183186
{
@@ -194,7 +197,6 @@ func TestKubeInferenceModelRequest(t *testing.T) {
194197

195198
pods := []*backend.PodMetrics{
196199
{
197-
Pod: extprocutils.FakePod(0),
198200
Metrics: backend.Metrics{
199201
WaitingQueueSize: 0,
200202
KVCacheUsagePercent: 0.2,
@@ -205,7 +207,6 @@ func TestKubeInferenceModelRequest(t *testing.T) {
205207
},
206208
},
207209
{
208-
Pod: extprocutils.FakePod(1),
209210
Metrics: backend.Metrics{
210211
WaitingQueueSize: 0,
211212
KVCacheUsagePercent: 0.1,
@@ -216,7 +217,6 @@ func TestKubeInferenceModelRequest(t *testing.T) {
216217
},
217218
},
218219
{
219-
Pod: extprocutils.FakePod(2),
220220
Metrics: backend.Metrics{
221221
WaitingQueueSize: 10,
222222
KVCacheUsagePercent: 0.2,
@@ -228,7 +228,7 @@ func TestKubeInferenceModelRequest(t *testing.T) {
228228
}
229229

230230
// Set up global k8sclient and extproc server runner with test environment config
231-
BeforeSuit()
231+
BeforeSuit(pods)
232232

233233
for _, test := range tests {
234234
t.Run(test.name, func(t *testing.T) {
@@ -312,8 +312,8 @@ func setUpHermeticServer(t *testing.T, pods []*backend.PodMetrics) (client extPr
312312
}
313313
}
314314
}
315+
inferencePool := &v1alpha1.InferencePool{}
315316
for _, doc := range docs {
316-
inferencePool := &v1alpha1.InferencePool{}
317317
if err = yaml.Unmarshal(doc, inferencePool); err != nil {
318318
log.Fatalf("Can't unmarshal object: %v", doc)
319319
}
@@ -322,18 +322,19 @@ func setUpHermeticServer(t *testing.T, pods []*backend.PodMetrics) (client extPr
322322
if err := k8sClient.Create(context.Background(), inferencePool); err != nil {
323323
log.Fatalf("unable to create inferencePool %v: %v", inferencePool.Name, err)
324324
}
325+
// expecting a single inferencepool
326+
break
325327
}
326328
}
327329

328330
ps := make(backend.PodSet)
329-
pms := make(map[backend.Pod]*backend.PodMetrics)
331+
pms := make(map[string]*backend.PodMetrics)
330332
for _, pod := range pods {
331333
ps[pod.Pod] = true
332-
pms[pod.Pod] = pod
334+
pms[pod.Pod.Name] = pod
333335
}
334336
pmc := &backend.FakePodMetricsClient{Res: pms}
335-
336-
server := serverRunner.Start(backend.NewK8sDataStore(backend.WithPods(pods)), pmc)
337+
server := serverRunner.Start(pmc)
337338
if err != nil {
338339
log.Fatalf("Ext-proc failed with the err: %v", err)
339340
}
@@ -361,7 +362,7 @@ func setUpHermeticServer(t *testing.T, pods []*backend.PodMetrics) (client extPr
361362
}
362363

363364
// Sets up a test environment and returns the runner struct
364-
func BeforeSuit() {
365+
func BeforeSuit(metrics []*backend.PodMetrics) {
365366
// Set up mock k8s API Client
366367
testEnv = &envtest.Environment{
367368
CRDDirectoryPaths: []string{filepath.Join("..", "..", "config", "crd", "bases")},
@@ -383,19 +384,41 @@ func BeforeSuit() {
383384
log.Fatalf("No error, but returned kubernetes client is nil, cfg: %v", cfg)
384385
}
385386

387+
fakeLister := &testingutil.FakePodLister{
388+
PodsList: []*corev1.Pod{},
389+
}
390+
for i, m := range metrics {
391+
podName := "pod-" + strconv.Itoa(i)
392+
pod := testingutil.MakePod(podName).SetReady().SetPodIP(podName).Obj()
393+
fakeLister.PodsList = append(fakeLister.PodsList, pod)
394+
m.Pod = backend.Pod{
395+
Name: pod.Name,
396+
Address: pod.Status.PodIP + ":8000",
397+
}
398+
}
399+
386400
serverRunner = runserver.NewDefaultExtProcServerRunner()
387401
// Adjust from defaults
388402
serverRunner.PoolName = "vllm-llama2-7b-pool"
389403
serverRunner.Scheme = scheme
390404
serverRunner.Config = cfg
391-
serverRunner.Datastore = backend.NewK8sDataStore()
405+
serverRunner.Datastore = backend.NewK8sDataStore(backend.WithPodListerFactory(
406+
func(pool *v1alpha1.InferencePool) *backend.PodLister {
407+
klog.V(1).Infof("Setting the fake lister %v", len(fakeLister.PodsList))
408+
return &backend.PodLister{
409+
Lister: fakeLister,
410+
}
411+
}))
392412

393413
serverRunner.Setup()
394414

395415
// Start the controller manager in go routine, not blocking
396416
go func() {
397417
serverRunner.StartManager()
398418
}()
419+
420+
// Wait the reconcilers to populate the datastore.
421+
time.Sleep(5 * time.Second)
399422
}
400423

401424
func sendRequest(t *testing.T, client extProcPb.ExternalProcessor_ProcessClient, req *extProcPb.ProcessingRequest) (*extProcPb.ProcessingResponse, error) {

0 commit comments

Comments
 (0)
Please sign in to comment.