Skip to content

Commit f82445b

Browse files
authored
Implementing LLMServerPool controller and Pod controller (#36)
1 parent 54ee6d7 commit f82445b

14 files changed

+623
-77
lines changed

Diff for: .gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Dockerfile.cross
1515

1616
# Go workspace file
1717
go.work
18+
go.work.sum
1819

1920
# Kubernetes Generated files - skip generated files, except for vendored files
2021
!vendor/**/zz_generated.*

Diff for: pkg/ext-proc/Dockerfile renamed to Dockerfile

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
## Multistage build
2-
FROM golang:1.22.5-alpine as build
2+
FROM golang:1.23-alpine as build
33
ENV CGO_ENABLED=0
44
ENV GOOS=linux
55
ENV GOARCH=amd64
66

77
WORKDIR /src
88
COPY . .
9+
WORKDIR /src/pkg/ext-proc
910
RUN go mod download
1011
RUN go build -o /ext-proc
1112
FROM alpine:latest
@@ -16,4 +17,4 @@ FROM gcr.io/distroless/base-debian10
1617
WORKDIR /
1718
COPY --from=build /ext-proc /ext-proc
1819

19-
ENTRYPOINT ["/ext-proc"]
20+
ENTRYPOINT ["/ext-proc"]

Diff for: examples/poc/ext-proc/go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
module ext-proc
1+
module ext-proc-poc
22

33
go 1.21
44

Diff for: pkg/ext-proc/backend/datastore.go

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package backend
2+
3+
import (
4+
"sync"
5+
6+
"inference.networking.x-k8s.io/llm-instance-gateway/api/v1alpha1"
7+
corev1 "k8s.io/api/core/v1"
8+
)
9+
10+
// The datastore is a local cache of relevant data for the given LLMServerPool (currently all pulled from k8s-api)
11+
type K8sDatastore struct {
12+
LLMServerPool *v1alpha1.LLMServerPool
13+
Pods *sync.Map
14+
Port string
15+
}
16+
17+
func (ds *K8sDatastore) GetPodIPs() []string {
18+
var ips []string
19+
ds.Pods.Range(func(name, pod any) bool {
20+
ips = append(ips, pod.(*corev1.Pod).Status.PodIP)
21+
return true
22+
})
23+
return ips
24+
}

Diff for: pkg/ext-proc/backend/endpointslice_reconciler.go

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package backend
2+
3+
import (
4+
"context"
5+
6+
discoveryv1 "k8s.io/api/discovery/v1"
7+
"k8s.io/apimachinery/pkg/runtime"
8+
"k8s.io/client-go/tools/record"
9+
"k8s.io/klog/v2"
10+
ctrl "sigs.k8s.io/controller-runtime"
11+
"sigs.k8s.io/controller-runtime/pkg/client"
12+
)
13+
14+
var (
15+
serviceOwnerLabel = "kubernetes.io/service-name"
16+
)
17+
18+
type EndpointSliceReconciler struct {
19+
client.Client
20+
Scheme *runtime.Scheme
21+
Record record.EventRecorder
22+
ServerPoolName string
23+
ServiceName string
24+
Zone string
25+
Namespace string
26+
Datastore *K8sDatastore
27+
}
28+
29+
func (c *EndpointSliceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
30+
klog.V(1).Info("reconciling EndpointSlice ", req.NamespacedName)
31+
32+
endpointSlice := &discoveryv1.EndpointSlice{}
33+
if err := c.Get(ctx, req.NamespacedName, endpointSlice); err != nil {
34+
klog.Error(err, "unable to get LLMServerPool")
35+
return ctrl.Result{}, err
36+
}
37+
38+
if !c.ownsEndPointSlice(endpointSlice.ObjectMeta.Labels) {
39+
return ctrl.Result{}, nil
40+
}
41+
42+
c.updateDatastore(endpointSlice)
43+
44+
return ctrl.Result{}, nil
45+
}
46+
47+
func (c *EndpointSliceReconciler) updateDatastore(slice *discoveryv1.EndpointSlice) {
48+
podMap := make(map[Pod]bool)
49+
for _, endpoint := range slice.Endpoints {
50+
klog.V(4).Infof("Zone: %v \n endpoint: %+v \n", c.Zone, endpoint)
51+
if c.validPod(endpoint) {
52+
pod := Pod{Name: *&endpoint.TargetRef.Name, Address: endpoint.Addresses[0] + ":" + c.Datastore.Port}
53+
podMap[pod] = true
54+
c.Datastore.Pods.Store(pod, true)
55+
}
56+
}
57+
58+
removeOldPods := func(k, v any) bool {
59+
pod, ok := k.(Pod)
60+
if !ok {
61+
klog.Errorf("Unable to cast key to Pod: %v", k)
62+
return false
63+
}
64+
if _, ok := podMap[pod]; !ok {
65+
c.Datastore.Pods.Delete(pod)
66+
}
67+
return true
68+
}
69+
c.Datastore.Pods.Range(removeOldPods)
70+
}
71+
72+
func (c *EndpointSliceReconciler) SetupWithManager(mgr ctrl.Manager) error {
73+
return ctrl.NewControllerManagedBy(mgr).
74+
For(&discoveryv1.EndpointSlice{}).
75+
Complete(c)
76+
}
77+
78+
func (c *EndpointSliceReconciler) ownsEndPointSlice(labels map[string]string) bool {
79+
return labels[serviceOwnerLabel] == c.ServiceName
80+
}
81+
82+
func (c *EndpointSliceReconciler) validPod(endpoint discoveryv1.Endpoint) bool {
83+
validZone := c.Zone == "" || c.Zone != "" && *endpoint.Zone == c.Zone
84+
return validZone && *endpoint.Conditions.Ready == true
85+
86+
}
+196
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
package backend
2+
3+
import (
4+
"sync"
5+
"testing"
6+
7+
v1 "k8s.io/api/core/v1"
8+
discoveryv1 "k8s.io/api/discovery/v1"
9+
)
10+
11+
var (
12+
basePod1 = Pod{Name: "pod1"}
13+
basePod2 = Pod{Name: "pod2"}
14+
basePod3 = Pod{Name: "pod3"}
15+
)
16+
17+
func TestUpdateDatastore_EndpointSliceReconciler(t *testing.T) {
18+
tests := []struct {
19+
name string
20+
datastore K8sDatastore
21+
incomingSlice *discoveryv1.EndpointSlice
22+
want K8sDatastore
23+
}{
24+
{
25+
name: "Add new pod",
26+
datastore: K8sDatastore{
27+
Pods: populateMap(basePod1, basePod2),
28+
Port: "8000",
29+
},
30+
incomingSlice: &discoveryv1.EndpointSlice{
31+
Endpoints: []discoveryv1.Endpoint{
32+
{
33+
TargetRef: &v1.ObjectReference{
34+
Name: "pod1",
35+
},
36+
Zone: new(string),
37+
Conditions: discoveryv1.EndpointConditions{
38+
Ready: truePointer(),
39+
},
40+
Addresses: []string{"0.0.0.0"},
41+
},
42+
{
43+
TargetRef: &v1.ObjectReference{
44+
Name: "pod2",
45+
},
46+
Zone: new(string),
47+
Conditions: discoveryv1.EndpointConditions{
48+
Ready: truePointer(),
49+
},
50+
Addresses: []string{"0.0.0.0"},
51+
},
52+
{
53+
TargetRef: &v1.ObjectReference{
54+
Name: "pod3",
55+
},
56+
Zone: new(string),
57+
Conditions: discoveryv1.EndpointConditions{
58+
Ready: truePointer(),
59+
},
60+
Addresses: []string{"0.0.0.0"},
61+
},
62+
},
63+
},
64+
want: K8sDatastore{
65+
Pods: populateMap(basePod1, basePod2, basePod3),
66+
Port: "8000",
67+
},
68+
},
69+
{
70+
name: "New pod, but its not ready yet. Do not add.",
71+
datastore: K8sDatastore{
72+
Pods: populateMap(basePod1, basePod2),
73+
Port: "8000",
74+
},
75+
incomingSlice: &discoveryv1.EndpointSlice{
76+
Endpoints: []discoveryv1.Endpoint{
77+
{
78+
TargetRef: &v1.ObjectReference{
79+
Name: "pod1",
80+
},
81+
Zone: new(string),
82+
Conditions: discoveryv1.EndpointConditions{
83+
Ready: truePointer(),
84+
},
85+
Addresses: []string{"0.0.0.0"},
86+
},
87+
{
88+
TargetRef: &v1.ObjectReference{
89+
Name: "pod2",
90+
},
91+
Zone: new(string),
92+
Conditions: discoveryv1.EndpointConditions{
93+
Ready: truePointer(),
94+
},
95+
Addresses: []string{"0.0.0.0"},
96+
},
97+
{
98+
TargetRef: &v1.ObjectReference{
99+
Name: "pod3",
100+
},
101+
Zone: new(string),
102+
Conditions: discoveryv1.EndpointConditions{
103+
Ready: new(bool),
104+
},
105+
Addresses: []string{"0.0.0.0"},
106+
},
107+
},
108+
},
109+
want: K8sDatastore{
110+
Pods: populateMap(basePod1, basePod2),
111+
Port: "8000",
112+
},
113+
},
114+
{
115+
name: "Existing pod not ready, new pod added, and is ready",
116+
datastore: K8sDatastore{
117+
Pods: populateMap(basePod1, basePod2),
118+
Port: "8000",
119+
},
120+
incomingSlice: &discoveryv1.EndpointSlice{
121+
Endpoints: []discoveryv1.Endpoint{
122+
{
123+
TargetRef: &v1.ObjectReference{
124+
Name: "pod1",
125+
},
126+
Zone: new(string),
127+
Conditions: discoveryv1.EndpointConditions{
128+
Ready: new(bool),
129+
},
130+
Addresses: []string{"0.0.0.0"},
131+
},
132+
{
133+
TargetRef: &v1.ObjectReference{
134+
Name: "pod2",
135+
},
136+
Zone: new(string),
137+
Conditions: discoveryv1.EndpointConditions{
138+
Ready: truePointer(),
139+
},
140+
Addresses: []string{"0.0.0.0"},
141+
},
142+
{
143+
TargetRef: &v1.ObjectReference{
144+
Name: "pod3",
145+
},
146+
Zone: new(string),
147+
Conditions: discoveryv1.EndpointConditions{
148+
Ready: truePointer(),
149+
},
150+
Addresses: []string{"0.0.0.0"},
151+
},
152+
},
153+
},
154+
want: K8sDatastore{
155+
Pods: populateMap(basePod3, basePod2),
156+
Port: "8000",
157+
},
158+
},
159+
}
160+
for _, test := range tests {
161+
t.Run(test.name, func(t *testing.T) {
162+
endpointSliceReconciler := &EndpointSliceReconciler{Datastore: &test.datastore, Zone: ""}
163+
endpointSliceReconciler.updateDatastore(test.incomingSlice)
164+
165+
if mapsEqual(endpointSliceReconciler.Datastore.Pods, test.want.Pods) {
166+
t.Errorf("Unexpected output pod mismatch. \n Got %v \n Want: %v \n", endpointSliceReconciler.Datastore.Pods, test.want.Pods)
167+
}
168+
})
169+
}
170+
}
171+
172+
func mapsEqual(map1, map2 *sync.Map) bool {
173+
equal := true
174+
175+
map1.Range(func(k, v any) bool {
176+
if _, ok := map2.Load(k); !ok {
177+
equal = false
178+
return false
179+
}
180+
return true
181+
})
182+
map2.Range(func(k, v any) bool {
183+
if _, ok := map1.Load(k); !ok {
184+
equal = false
185+
return false
186+
}
187+
return true
188+
})
189+
190+
return equal
191+
}
192+
193+
func truePointer() *bool {
194+
primitivePointersAreSilly := true
195+
return &primitivePointersAreSilly
196+
}

Diff for: pkg/ext-proc/backend/fake.go

+5-10
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
package backend
22

3-
import "context"
4-
5-
type FakePodLister struct {
6-
Err error
7-
Pods PodSet
8-
}
3+
import (
4+
"context"
5+
"fmt"
6+
)
97

108
type FakePodMetricsClient struct {
119
Err map[Pod]error
@@ -16,9 +14,6 @@ func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, pod Pod, existi
1614
if err, ok := f.Err[pod]; ok {
1715
return nil, err
1816
}
17+
fmt.Printf("pod: %+v\n existing: %+v \n new: %+v \n", pod, existing, f.Res[pod])
1918
return f.Res[pod], nil
2019
}
21-
22-
func (fpl *FakePodLister) List() (PodSet, error) {
23-
return fpl.Pods, fpl.Err
24-
}

0 commit comments

Comments
 (0)