Skip to content

Commit f12a2ad

Browse files
committed
Address pod ip address updates
1 parent f3ed12f commit f12a2ad

File tree

8 files changed

+101
-49
lines changed

8 files changed

+101
-49
lines changed

Diff for: pkg/ext-proc/backend/datastore.go

+14-8
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type Datastore interface {
3030
ModelDelete(modelName string)
3131

3232
// PodMetrics operations
33-
PodAddIfNotExist(pod *corev1.Pod) bool
33+
PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool
3434
PodUpdateMetricsIfExist(pm *PodMetrics)
3535
PodGet(namespacedName types.NamespacedName) (*PodMetrics, bool)
3636
PodDelete(namespacedName types.NamespacedName)
@@ -148,21 +148,27 @@ func (ds *datastore) PodDelete(namespacedName types.NamespacedName) {
148148
ds.pods.Delete(namespacedName)
149149
}
150150

151-
func (ds *datastore) PodAddIfNotExist(pod *corev1.Pod) bool {
151+
func (ds *datastore) PodUpdateOrAddIfNotExist(pod *corev1.Pod) bool {
152152
new := &PodMetrics{
153-
NamespacedName: types.NamespacedName{
154-
Name: pod.Name,
155-
Namespace: pod.Namespace,
153+
Pod: Pod{
154+
NamespacedName: types.NamespacedName{
155+
Name: pod.Name,
156+
Namespace: pod.Namespace,
157+
},
158+
Address: pod.Status.PodIP,
156159
},
157-
Address: pod.Status.PodIP,
158160
Metrics: Metrics{
159161
ActiveModels: make(map[string]int),
160162
},
161163
}
162-
if _, ok := ds.pods.Load(new.NamespacedName); !ok {
164+
existing, ok := ds.pods.Load(new.NamespacedName)
165+
if !ok {
163166
ds.pods.Store(new.NamespacedName, new)
164167
return true
165168
}
169+
170+
// Update the pod status parts.
171+
existing.(*PodMetrics).Pod = new.Pod
166172
return false
167173
}
168174

@@ -182,7 +188,7 @@ func (ds *datastore) PodFlushAll(ctx context.Context, ctrlClient client.Client)
182188
for _, pod := range podList.Items {
183189
if podIsReady(&pod) {
184190
activePods[pod.Name] = true
185-
ds.PodAddIfNotExist(&pod)
191+
ds.PodUpdateOrAddIfNotExist(&pod)
186192
}
187193
}
188194

Diff for: pkg/ext-proc/backend/pod_reconciler.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ func (c *PodReconciler) updateDatastore(logger logr.Logger, pod *corev1.Pod) {
6161
logger.V(logutil.DEFAULT).Info("Pod removed or not added", "name", namespacedName)
6262
c.Datastore.PodDelete(namespacedName)
6363
} else {
64-
if c.Datastore.PodAddIfNotExist(pod) {
64+
if c.Datastore.PodUpdateOrAddIfNotExist(pod) {
6565
logger.V(logutil.DEFAULT).Info("Pod added", "name", namespacedName)
6666
} else {
6767
logger.V(logutil.DEFAULT).Info("Pod already exists", "name", namespacedName)

Diff for: pkg/ext-proc/backend/pod_reconciler_test.go

+48-14
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@ import (
1818
)
1919

2020
var (
21-
basePod1 = &PodMetrics{NamespacedName: types.NamespacedName{Name: "pod1"}, Address: ":8000"}
22-
basePod2 = &PodMetrics{NamespacedName: types.NamespacedName{Name: "pod2"}, Address: ":8000"}
23-
basePod3 = &PodMetrics{NamespacedName: types.NamespacedName{Name: "pod3"}, Address: ":8000"}
21+
basePod1 = &PodMetrics{Pod: Pod{NamespacedName: types.NamespacedName{Name: "pod1"}, Address: "address-1"}}
22+
basePod2 = &PodMetrics{Pod: Pod{NamespacedName: types.NamespacedName{Name: "pod2"}, Address: "address-2"}}
23+
basePod3 = &PodMetrics{Pod: Pod{NamespacedName: types.NamespacedName{Name: "pod3"}, Address: "address-3"}}
24+
basePod11 = &PodMetrics{Pod: Pod{NamespacedName: types.NamespacedName{Name: "pod1"}, Address: "address-11"}}
2425
)
2526

2627
func TestUpdateDatastore_PodReconciler(t *testing.T) {
@@ -29,7 +30,7 @@ func TestUpdateDatastore_PodReconciler(t *testing.T) {
2930
name string
3031
datastore Datastore
3132
incomingPod *corev1.Pod
32-
wantPods []types.NamespacedName
33+
wantPods []Pod
3334
req *ctrl.Request
3435
}{
3536
{
@@ -47,12 +48,45 @@ func TestUpdateDatastore_PodReconciler(t *testing.T) {
4748
},
4849
incomingPod: &corev1.Pod{
4950
ObjectMeta: metav1.ObjectMeta{
50-
Name: "pod3",
51+
Name: basePod3.NamespacedName.Name,
52+
Labels: map[string]string{
53+
"some-key": "some-val",
54+
},
55+
},
56+
Status: corev1.PodStatus{
57+
PodIP: basePod3.Address,
58+
Conditions: []corev1.PodCondition{
59+
{
60+
Type: corev1.PodReady,
61+
Status: corev1.ConditionTrue,
62+
},
63+
},
64+
},
65+
},
66+
wantPods: []Pod{basePod1.Pod, basePod2.Pod, basePod3.Pod},
67+
},
68+
{
69+
name: "Update pod1 address",
70+
datastore: &datastore{
71+
pods: populateMap(basePod1, basePod2),
72+
pool: &v1alpha1.InferencePool{
73+
Spec: v1alpha1.InferencePoolSpec{
74+
TargetPortNumber: int32(8000),
75+
Selector: map[v1alpha1.LabelKey]v1alpha1.LabelValue{
76+
"some-key": "some-val",
77+
},
78+
},
79+
},
80+
},
81+
incomingPod: &corev1.Pod{
82+
ObjectMeta: metav1.ObjectMeta{
83+
Name: basePod11.NamespacedName.Name,
5184
Labels: map[string]string{
5285
"some-key": "some-val",
5386
},
5487
},
5588
Status: corev1.PodStatus{
89+
PodIP: basePod11.Address,
5690
Conditions: []corev1.PodCondition{
5791
{
5892
Type: corev1.PodReady,
@@ -61,7 +95,7 @@ func TestUpdateDatastore_PodReconciler(t *testing.T) {
6195
},
6296
},
6397
},
64-
wantPods: []types.NamespacedName{basePod1.NamespacedName, basePod2.NamespacedName, basePod3.NamespacedName},
98+
wantPods: []Pod{basePod11.Pod, basePod2.Pod},
6599
},
66100
{
67101
name: "Delete pod with DeletionTimestamp",
@@ -94,7 +128,7 @@ func TestUpdateDatastore_PodReconciler(t *testing.T) {
94128
},
95129
},
96130
},
97-
wantPods: []types.NamespacedName{basePod2.NamespacedName},
131+
wantPods: []Pod{basePod2.Pod},
98132
},
99133
{
100134
name: "Delete notfound pod",
@@ -110,7 +144,7 @@ func TestUpdateDatastore_PodReconciler(t *testing.T) {
110144
},
111145
},
112146
req: &ctrl.Request{NamespacedName: types.NamespacedName{Name: "pod1"}},
113-
wantPods: []types.NamespacedName{basePod2.NamespacedName},
147+
wantPods: []Pod{basePod2.Pod},
114148
},
115149
{
116150
name: "New pod, not ready, valid selector",
@@ -141,7 +175,7 @@ func TestUpdateDatastore_PodReconciler(t *testing.T) {
141175
},
142176
},
143177
},
144-
wantPods: []types.NamespacedName{basePod1.NamespacedName, basePod2.NamespacedName},
178+
wantPods: []Pod{basePod1.Pod, basePod2.Pod},
145179
},
146180
{
147181
name: "Remove pod that does not match selector",
@@ -172,7 +206,7 @@ func TestUpdateDatastore_PodReconciler(t *testing.T) {
172206
},
173207
},
174208
},
175-
wantPods: []types.NamespacedName{basePod2.NamespacedName},
209+
wantPods: []Pod{basePod2.Pod},
176210
},
177211
{
178212
name: "Remove pod that is not ready",
@@ -203,7 +237,7 @@ func TestUpdateDatastore_PodReconciler(t *testing.T) {
203237
},
204238
},
205239
},
206-
wantPods: []types.NamespacedName{basePod2.NamespacedName},
240+
wantPods: []Pod{basePod2.Pod},
207241
},
208242
}
209243
for _, test := range tests {
@@ -229,15 +263,15 @@ func TestUpdateDatastore_PodReconciler(t *testing.T) {
229263
t.Errorf("Unexpected InferencePool reconcile error: %v", err)
230264
}
231265

232-
var gotPods []types.NamespacedName
266+
var gotPods []Pod
233267
test.datastore.PodRange(func(k, v any) bool {
234268
pod := v.(*PodMetrics)
235269
if v != nil {
236-
gotPods = append(gotPods, pod.NamespacedName)
270+
gotPods = append(gotPods, pod.Pod)
237271
}
238272
return true
239273
})
240-
if !cmp.Equal(gotPods, test.wantPods, cmpopts.SortSlices(func(a, b types.NamespacedName) bool { return a.String() < b.String() })) {
274+
if !cmp.Equal(gotPods, test.wantPods, cmpopts.SortSlices(func(a, b Pod) bool { return a.NamespacedName.String() < b.NamespacedName.String() })) {
241275
t.Errorf("got (%v) != want (%v);", gotPods, test.wantPods)
242276
}
243277
})

Diff for: pkg/ext-proc/backend/provider_test.go

+10-6
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@ import (
1515

1616
var (
1717
pod1 = &PodMetrics{
18-
NamespacedName: types.NamespacedName{
19-
Name: "pod1",
18+
Pod: Pod{
19+
NamespacedName: types.NamespacedName{
20+
Name: "pod1",
21+
},
2022
},
2123
Metrics: Metrics{
2224
WaitingQueueSize: 0,
@@ -29,8 +31,10 @@ var (
2931
},
3032
}
3133
pod2 = &PodMetrics{
32-
NamespacedName: types.NamespacedName{
33-
Name: "pod2",
34+
Pod: Pod{
35+
NamespacedName: types.NamespacedName{
36+
Name: "pod2",
37+
},
3438
},
3539
Metrics: Metrics{
3640
WaitingQueueSize: 1,
@@ -99,7 +103,7 @@ func TestProvider(t *testing.T) {
99103
pod1,
100104
// Failed to fetch pod2 metrics so it remains the default values.
101105
{
102-
NamespacedName: pod2.NamespacedName,
106+
Pod: Pod{NamespacedName: pod2.NamespacedName},
103107
Metrics: Metrics{
104108
WaitingQueueSize: 0,
105109
KVCacheUsagePercent: 0,
@@ -130,7 +134,7 @@ func TestProvider(t *testing.T) {
130134
func populateMap(pods ...*PodMetrics) *sync.Map {
131135
newMap := &sync.Map{}
132136
for _, pod := range pods {
133-
newMap.Store(pod.NamespacedName, &PodMetrics{NamespacedName: pod.NamespacedName})
137+
newMap.Store(pod.NamespacedName, &PodMetrics{Pod: Pod{NamespacedName: pod.NamespacedName, Address: pod.Address}})
134138
}
135139
return newMap
136140
}

Diff for: pkg/ext-proc/backend/types.go

+10-4
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,11 @@ import (
77
"k8s.io/apimachinery/pkg/types"
88
)
99

10+
type Pod struct {
11+
NamespacedName types.NamespacedName
12+
Address string
13+
}
14+
1015
type Metrics struct {
1116
// ActiveModels is a set of models(including LoRA adapters) that are currently cached to GPU.
1217
ActiveModels map[string]int
@@ -19,8 +24,7 @@ type Metrics struct {
1924
}
2025

2126
type PodMetrics struct {
22-
NamespacedName types.NamespacedName
23-
Address string
27+
Pod
2428
Metrics
2529
}
2630

@@ -34,8 +38,10 @@ func (pm *PodMetrics) Clone() *PodMetrics {
3438
cm[k] = v
3539
}
3640
clone := &PodMetrics{
37-
NamespacedName: pm.NamespacedName,
38-
Address: pm.Address,
41+
Pod: Pod{
42+
NamespacedName: pm.NamespacedName,
43+
Address: pm.Address,
44+
},
3945
Metrics: Metrics{
4046
ActiveModels: cm,
4147
RunningQueueSize: pm.RunningQueueSize,

Diff for: pkg/ext-proc/scheduling/filter_test.go

+11-11
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func TestFilter(t *testing.T) {
4141
// model being active, and has low KV cache.
4242
input: []*backend.PodMetrics{
4343
{
44-
NamespacedName: types.NamespacedName{Name: "pod1"},
44+
Pod: backend.Pod{NamespacedName: types.NamespacedName{Name: "pod1"}},
4545
Metrics: backend.Metrics{
4646
WaitingQueueSize: 0,
4747
KVCacheUsagePercent: 0.2,
@@ -53,7 +53,7 @@ func TestFilter(t *testing.T) {
5353
},
5454
},
5555
{
56-
NamespacedName: types.NamespacedName{Name: "pod2"},
56+
Pod: backend.Pod{NamespacedName: types.NamespacedName{Name: "pod2"}},
5757
Metrics: backend.Metrics{
5858
WaitingQueueSize: 3,
5959
KVCacheUsagePercent: 0.1,
@@ -65,7 +65,7 @@ func TestFilter(t *testing.T) {
6565
},
6666
},
6767
{
68-
NamespacedName: types.NamespacedName{Name: "pod3"},
68+
Pod: backend.Pod{NamespacedName: types.NamespacedName{Name: "pod3"}},
6969
Metrics: backend.Metrics{
7070
WaitingQueueSize: 10,
7171
KVCacheUsagePercent: 0.2,
@@ -78,7 +78,7 @@ func TestFilter(t *testing.T) {
7878
},
7979
output: []*backend.PodMetrics{
8080
{
81-
NamespacedName: types.NamespacedName{Name: "pod2"},
81+
Pod: backend.Pod{NamespacedName: types.NamespacedName{Name: "pod2"}},
8282
Metrics: backend.Metrics{
8383
WaitingQueueSize: 3,
8484
KVCacheUsagePercent: 0.1,
@@ -102,7 +102,7 @@ func TestFilter(t *testing.T) {
102102
// pod1 will be picked because it has capacity for the sheddable request.
103103
input: []*backend.PodMetrics{
104104
{
105-
NamespacedName: types.NamespacedName{Name: "pod1"},
105+
Pod: backend.Pod{NamespacedName: types.NamespacedName{Name: "pod1"}},
106106
Metrics: backend.Metrics{
107107
WaitingQueueSize: 0,
108108
KVCacheUsagePercent: 0.2,
@@ -114,7 +114,7 @@ func TestFilter(t *testing.T) {
114114
},
115115
},
116116
{
117-
NamespacedName: types.NamespacedName{Name: "pod2"},
117+
Pod: backend.Pod{NamespacedName: types.NamespacedName{Name: "pod2"}},
118118
Metrics: backend.Metrics{
119119
WaitingQueueSize: 3,
120120
KVCacheUsagePercent: 0.1,
@@ -126,7 +126,7 @@ func TestFilter(t *testing.T) {
126126
},
127127
},
128128
{
129-
NamespacedName: types.NamespacedName{Name: "pod3"},
129+
Pod: backend.Pod{NamespacedName: types.NamespacedName{Name: "pod3"}},
130130
Metrics: backend.Metrics{
131131
WaitingQueueSize: 10,
132132
KVCacheUsagePercent: 0.2,
@@ -139,7 +139,7 @@ func TestFilter(t *testing.T) {
139139
},
140140
output: []*backend.PodMetrics{
141141
{
142-
NamespacedName: types.NamespacedName{Name: "pod1"},
142+
Pod: backend.Pod{NamespacedName: types.NamespacedName{Name: "pod1"}},
143143
Metrics: backend.Metrics{
144144
WaitingQueueSize: 0,
145145
KVCacheUsagePercent: 0.2,
@@ -164,7 +164,7 @@ func TestFilter(t *testing.T) {
164164
// dropped.
165165
input: []*backend.PodMetrics{
166166
{
167-
NamespacedName: types.NamespacedName{Name: "pod1"},
167+
Pod: backend.Pod{NamespacedName: types.NamespacedName{Name: "pod1"}},
168168
Metrics: backend.Metrics{
169169
WaitingQueueSize: 10,
170170
KVCacheUsagePercent: 0.9,
@@ -176,7 +176,7 @@ func TestFilter(t *testing.T) {
176176
},
177177
},
178178
{
179-
NamespacedName: types.NamespacedName{Name: "pod2"},
179+
Pod: backend.Pod{NamespacedName: types.NamespacedName{Name: "pod2"}},
180180
Metrics: backend.Metrics{
181181
WaitingQueueSize: 3,
182182
KVCacheUsagePercent: 0.85,
@@ -188,7 +188,7 @@ func TestFilter(t *testing.T) {
188188
},
189189
},
190190
{
191-
NamespacedName: types.NamespacedName{Name: "pod3"},
191+
Pod: backend.Pod{NamespacedName: types.NamespacedName{Name: "pod3"}},
192192
Metrics: backend.Metrics{
193193
WaitingQueueSize: 10,
194194
KVCacheUsagePercent: 0.85,

0 commit comments

Comments
 (0)