Skip to content

Commit 46541d0

Browse files
authored
Replacing endpointSlice Reconciler with a direct Pod Reconciler (#300)
* reversion to pod reconciliation * adding ready check and unit tests * updating test * ablating unnecessary func * embedding ready status into update so non-ready pods are deleted * scrubbing serviceName & zone as they are obsolete * implementing pod cache flushing logic * Renaming file so merge confilcts can find the diffs easier * cleaning up messy merge conflict * nil checking short circuit * Listing fixes * feedback cleanup * log formatting and removing pods if not found * removing err to provent perma-reconciliation * removing dev image ref * cleaning up err logic
1 parent db21e9e commit 46541d0

10 files changed

+324
-344
lines changed

pkg/ext-proc/backend/datastore.go

+42
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,16 @@
11
package backend
22

33
import (
4+
"context"
45
"errors"
56
"math/rand"
7+
"strconv"
68
"sync"
79

810
corev1 "k8s.io/api/core/v1"
11+
"k8s.io/apimachinery/pkg/labels"
912
"k8s.io/klog/v2"
13+
"sigs.k8s.io/controller-runtime/pkg/client"
1014
"sigs.k8s.io/gateway-api-inference-extension/api/v1alpha1"
1115
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
1216
)
@@ -111,3 +115,41 @@ func IsCritical(model *v1alpha1.InferenceModel) bool {
111115
}
112116
return false
113117
}
118+
119+
func (ds *K8sDatastore) LabelsMatch(podLabels map[string]string) bool {
120+
poolSelector := selectorFromInferencePoolSelector(ds.inferencePool.Spec.Selector)
121+
podSet := labels.Set(podLabels)
122+
return poolSelector.Matches(podSet)
123+
}
124+
125+
func (ds *K8sDatastore) flushPodsAndRefetch(ctx context.Context, ctrlClient client.Client, newServerPool *v1alpha1.InferencePool) {
126+
podList := &corev1.PodList{}
127+
if err := ctrlClient.List(ctx, podList, &client.ListOptions{
128+
LabelSelector: selectorFromInferencePoolSelector(newServerPool.Spec.Selector),
129+
Namespace: newServerPool.Namespace,
130+
}); err != nil {
131+
klog.Error(err, "error listing clients")
132+
}
133+
ds.pods.Clear()
134+
135+
for _, k8sPod := range podList.Items {
136+
pod := Pod{
137+
Name: k8sPod.Name,
138+
Address: k8sPod.Status.PodIP + ":" + strconv.Itoa(int(newServerPool.Spec.TargetPortNumber)),
139+
}
140+
ds.pods.Store(pod, true)
141+
}
142+
143+
}
144+
145+
func selectorFromInferencePoolSelector(selector map[v1alpha1.LabelKey]v1alpha1.LabelValue) labels.Selector {
146+
return labels.SelectorFromSet(stripLabelKeyAliasFromLabelMap(selector))
147+
}
148+
149+
func stripLabelKeyAliasFromLabelMap(labels map[v1alpha1.LabelKey]v1alpha1.LabelValue) map[string]string {
150+
outMap := make(map[string]string)
151+
for k, v := range labels {
152+
outMap[string(k)] = string(v)
153+
}
154+
return outMap
155+
}

pkg/ext-proc/backend/endpointslice_reconciler.go

-109
This file was deleted.

pkg/ext-proc/backend/endpointslice_reconcilier_test.go

-202
This file was deleted.

pkg/ext-proc/backend/inferencemodel_reconciler_test.go

+21
Original file line numberDiff line numberDiff line change
@@ -309,3 +309,24 @@ func populateServiceMap(services ...*v1alpha1.InferenceModel) *sync.Map {
309309
}
310310
return returnVal
311311
}
312+
313+
func mapsEqual(map1, map2 *sync.Map) bool {
314+
equal := true
315+
316+
map1.Range(func(k, v any) bool {
317+
if _, ok := map2.Load(k); !ok {
318+
equal = false
319+
return false
320+
}
321+
return true
322+
})
323+
map2.Range(func(k, v any) bool {
324+
if _, ok := map1.Load(k); !ok {
325+
equal = false
326+
return false
327+
}
328+
return true
329+
})
330+
331+
return equal
332+
}

0 commit comments

Comments
 (0)