Skip to content

Commit 7056e73

Browse files
committed
Fix a race condition where LLMServerPool has not been initialized yet
1 parent 91f2055 commit 7056e73

12 files changed

+110
-229
lines changed

pkg/ext-proc/backend/datastore.go

+47-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package backend
22

33
import (
4+
"fmt"
45
"math/rand"
56
"sync"
67

@@ -9,24 +10,65 @@ import (
910
"k8s.io/klog/v2"
1011
)
1112

13+
func NewK8sDataStore(options ...K8sDatastoreOption) *K8sDatastore {
14+
store := &K8sDatastore{
15+
poolMu: sync.RWMutex{},
16+
llmServices: &sync.Map{},
17+
pods: &sync.Map{},
18+
}
19+
for _, opt := range options {
20+
opt(store)
21+
}
22+
return store
23+
}
24+
1225
// The datastore is a local cache of relevant data for the given LLMServerPool (currently all pulled from k8s-api)
1326
type K8sDatastore struct {
14-
LLMServerPool *v1alpha1.LLMServerPool
15-
LLMServices *sync.Map
16-
Pods *sync.Map
27+
// poolMu is used to synchronize access to the llmServerPool.
28+
poolMu sync.RWMutex
29+
llmServerPool *v1alpha1.LLMServerPool
30+
llmServices *sync.Map
31+
pods *sync.Map
32+
}
33+
34+
type K8sDatastoreOption func(*K8sDatastore)
35+
36+
// WithPods can be used in tests to override the pods.
37+
func WithPods(pods []*PodMetrics) K8sDatastoreOption {
38+
return func(store *K8sDatastore) {
39+
store.pods = &sync.Map{}
40+
for _, pod := range pods {
41+
store.pods.Store(pod.Pod, true)
42+
}
43+
}
44+
}
45+
46+
func (ds *K8sDatastore) SetLLMServerPool(pool *v1alpha1.LLMServerPool) {
47+
ds.poolMu.Lock()
48+
defer ds.poolMu.Unlock()
49+
ds.llmServerPool = pool
50+
}
51+
52+
func (ds *K8sDatastore) GetLLMServerPool() (*v1alpha1.LLMServerPool, error) {
53+
ds.poolMu.RLock()
54+
defer ds.poolMu.RUnlock()
55+
if ds.llmServerPool == nil {
56+
return nil, fmt.Errorf("LLMServerPool hasn't been initialized yet")
57+
}
58+
return ds.llmServerPool, nil
1759
}
1860

1961
func (ds *K8sDatastore) GetPodIPs() []string {
2062
var ips []string
21-
ds.Pods.Range(func(name, pod any) bool {
63+
ds.pods.Range(func(name, pod any) bool {
2264
ips = append(ips, pod.(*corev1.Pod).Status.PodIP)
2365
return true
2466
})
2567
return ips
2668
}
2769

2870
func (s *K8sDatastore) FetchModelData(modelName string) (returnModel *v1alpha1.Model) {
29-
s.LLMServices.Range(func(k, v any) bool {
71+
s.llmServices.Range(func(k, v any) bool {
3072
service := v.(*v1alpha1.LLMService)
3173
klog.V(3).Infof("Service name: %v", service.Name)
3274
for _, model := range service.Spec.Models {

pkg/ext-proc/backend/datastore_test.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,9 @@ var ()
1010

1111
func TestRandomWeightedDraw(t *testing.T) {
1212
tests := []struct {
13-
name string
14-
datastore K8sDatastore
15-
model *v1alpha1.Model
16-
want string
13+
name string
14+
model *v1alpha1.Model
15+
want string
1716
}{
1817
{
1918
name: "'random' distribution",

pkg/ext-proc/backend/endpointslice_reconciler.go

+13-5
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,9 @@ func (c *EndpointSliceReconciler) updateDatastore(slice *discoveryv1.EndpointSli
4848
for _, endpoint := range slice.Endpoints {
4949
klog.V(4).Infof("Zone: %v \n endpoint: %+v \n", c.Zone, endpoint)
5050
if c.validPod(endpoint) {
51-
pod := Pod{Name: *&endpoint.TargetRef.Name, Address: endpoint.Addresses[0] + ":" + fmt.Sprint(c.Datastore.LLMServerPool.Spec.TargetPort)}
51+
pod := Pod{Name: *&endpoint.TargetRef.Name, Address: endpoint.Addresses[0] + ":" + fmt.Sprint(c.Datastore.llmServerPool.Spec.TargetPort)}
5252
podMap[pod] = true
53-
c.Datastore.Pods.Store(pod, true)
53+
c.Datastore.pods.Store(pod, true)
5454
}
5555
}
5656

@@ -61,14 +61,22 @@ func (c *EndpointSliceReconciler) updateDatastore(slice *discoveryv1.EndpointSli
6161
return false
6262
}
6363
if _, ok := podMap[pod]; !ok {
64-
c.Datastore.Pods.Delete(pod)
64+
c.Datastore.pods.Delete(pod)
6565
}
6666
return true
6767
}
68-
c.Datastore.Pods.Range(removeOldPods)
68+
c.Datastore.pods.Range(removeOldPods)
6969
}
7070

7171
func (c *EndpointSliceReconciler) SetupWithManager(mgr ctrl.Manager) error {
72+
llmServerPoolAvailable := func(object client.Object) bool {
73+
_, err := c.Datastore.GetLLMServerPool()
74+
if err != nil {
75+
klog.Warningf("Skipping reconciling EndpointSlice because LLMServerPool is not available yet: %v", err)
76+
}
77+
return err == nil
78+
}
79+
7280
ownsEndPointSlice := func(object client.Object) bool {
7381
// Check if the object is an EndpointSlice
7482
endpointSlice, ok := object.(*discoveryv1.EndpointSlice)
@@ -80,7 +88,7 @@ func (c *EndpointSliceReconciler) SetupWithManager(mgr ctrl.Manager) error {
8088
}
8189

8290
return ctrl.NewControllerManagedBy(mgr).
83-
For(&discoveryv1.EndpointSlice{}, builder.WithPredicates(predicate.NewPredicateFuncs(ownsEndPointSlice))).
91+
For(&discoveryv1.EndpointSlice{}, builder.WithPredicates(predicate.NewPredicateFuncs(llmServerPoolAvailable), predicate.NewPredicateFuncs(ownsEndPointSlice))).
8492
Complete(c)
8593
}
8694

pkg/ext-proc/backend/endpointslice_reconcilier_test.go

+17-23
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@ var (
1818
func TestUpdateDatastore_EndpointSliceReconciler(t *testing.T) {
1919
tests := []struct {
2020
name string
21-
datastore K8sDatastore
21+
datastore *K8sDatastore
2222
incomingSlice *discoveryv1.EndpointSlice
23-
want K8sDatastore
23+
wantPods *sync.Map
2424
}{
2525
{
2626
name: "Add new pod",
27-
datastore: K8sDatastore{
28-
Pods: populateMap(basePod1, basePod2),
29-
LLMServerPool: &v1alpha1.LLMServerPool{
27+
datastore: &K8sDatastore{
28+
pods: populateMap(basePod1, basePod2),
29+
llmServerPool: &v1alpha1.LLMServerPool{
3030
Spec: v1alpha1.LLMServerPoolSpec{
3131
TargetPort: int32(8000),
3232
},
@@ -66,15 +66,13 @@ func TestUpdateDatastore_EndpointSliceReconciler(t *testing.T) {
6666
},
6767
},
6868
},
69-
want: K8sDatastore{
70-
Pods: populateMap(basePod1, basePod2, basePod3),
71-
},
69+
wantPods: populateMap(basePod1, basePod2, basePod3),
7270
},
7371
{
7472
name: "New pod, but its not ready yet. Do not add.",
75-
datastore: K8sDatastore{
76-
Pods: populateMap(basePod1, basePod2),
77-
LLMServerPool: &v1alpha1.LLMServerPool{
73+
datastore: &K8sDatastore{
74+
pods: populateMap(basePod1, basePod2),
75+
llmServerPool: &v1alpha1.LLMServerPool{
7876
Spec: v1alpha1.LLMServerPoolSpec{
7977
TargetPort: int32(8000),
8078
},
@@ -114,15 +112,13 @@ func TestUpdateDatastore_EndpointSliceReconciler(t *testing.T) {
114112
},
115113
},
116114
},
117-
want: K8sDatastore{
118-
Pods: populateMap(basePod1, basePod2),
119-
},
115+
wantPods: populateMap(basePod1, basePod2),
120116
},
121117
{
122118
name: "Existing pod not ready, new pod added, and is ready",
123-
datastore: K8sDatastore{
124-
Pods: populateMap(basePod1, basePod2),
125-
LLMServerPool: &v1alpha1.LLMServerPool{
119+
datastore: &K8sDatastore{
120+
pods: populateMap(basePod1, basePod2),
121+
llmServerPool: &v1alpha1.LLMServerPool{
126122
Spec: v1alpha1.LLMServerPoolSpec{
127123
TargetPort: int32(8000),
128124
},
@@ -162,18 +158,16 @@ func TestUpdateDatastore_EndpointSliceReconciler(t *testing.T) {
162158
},
163159
},
164160
},
165-
want: K8sDatastore{
166-
Pods: populateMap(basePod3, basePod2),
167-
},
161+
wantPods: populateMap(basePod3, basePod2),
168162
},
169163
}
170164
for _, test := range tests {
171165
t.Run(test.name, func(t *testing.T) {
172-
endpointSliceReconciler := &EndpointSliceReconciler{Datastore: &test.datastore, Zone: ""}
166+
endpointSliceReconciler := &EndpointSliceReconciler{Datastore: test.datastore, Zone: ""}
173167
endpointSliceReconciler.updateDatastore(test.incomingSlice)
174168

175-
if mapsEqual(endpointSliceReconciler.Datastore.Pods, test.want.Pods) {
176-
t.Errorf("Unexpected output pod mismatch. \n Got %v \n Want: %v \n", endpointSliceReconciler.Datastore.Pods, test.want.Pods)
169+
if mapsEqual(endpointSliceReconciler.Datastore.pods, test.wantPods) {
170+
t.Errorf("Unexpected output pod mismatch. \n Got %v \n Want: %v \n", endpointSliceReconciler.Datastore.pods, test.wantPods)
177171
}
178172
})
179173
}

pkg/ext-proc/backend/llmlserverpool_reconciler_test.go

-97
This file was deleted.

pkg/ext-proc/backend/llmserverpool_reconciler.go

+2-8
Original file line numberDiff line numberDiff line change
@@ -41,18 +41,12 @@ func (c *LLMServerPoolReconciler) Reconcile(ctx context.Context, req ctrl.Reques
4141
return ctrl.Result{}, err
4242
}
4343

44-
c.updateDatastore(serverPool)
44+
klog.V(2).Infof("Updated LLMServerPool: %+v", serverPool)
45+
c.Datastore.SetLLMServerPool(serverPool)
4546

4647
return ctrl.Result{}, nil
4748
}
4849

49-
func (c *LLMServerPoolReconciler) updateDatastore(serverPool *v1alpha1.LLMServerPool) {
50-
if c.Datastore.LLMServerPool == nil || serverPool.ObjectMeta.ResourceVersion != c.Datastore.LLMServerPool.ObjectMeta.ResourceVersion {
51-
klog.V(2).Infof("Updated LLMServerPool: %+v", serverPool)
52-
c.Datastore.LLMServerPool = serverPool
53-
}
54-
}
55-
5650
func (c *LLMServerPoolReconciler) SetupWithManager(mgr ctrl.Manager) error {
5751
return ctrl.NewControllerManagedBy(mgr).
5852
For(&v1alpha1.LLMServerPool{}).

pkg/ext-proc/backend/llmservice_reconciler.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,12 @@ func (c *LLMServiceReconciler) updateDatastore(service *v1alpha1.LLMService) {
4444
for _, ref := range service.Spec.PoolRef {
4545
if strings.Contains(strings.ToLower(ref.Kind), strings.ToLower("LLMServerPool")) && ref.Name == c.ServerPoolName {
4646
klog.V(2).Infof("Adding/Updating service: %+v", service)
47-
c.Datastore.LLMServices.Store(service.Name, service)
47+
c.Datastore.llmServices.Store(service.Name, service)
4848
return
4949
}
5050
}
5151
klog.V(2).Infof("Removing/Not adding service: %+v", service)
5252
// The LLMService may have changed to a different pool. Remove such services.
5353
// Otherwise this is a noop.
54-
c.Datastore.LLMServices.Delete(service.Name)
54+
c.Datastore.llmServices.Delete(service.Name)
5555
}

0 commit comments

Comments
 (0)