Skip to content

Commit 72bcadc

Browse files
committed
Implementing LLMServerPool controller and Pod controller
1 parent 300176b commit 72bcadc

13 files changed

+432
-77
lines changed

Diff for: .gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Dockerfile.cross
1515

1616
# Go workspace file
1717
go.work
18+
go.work.sum
1819

1920
# Kubernetes Generated files - skip generated files, except for vendored files
2021
!vendor/**/zz_generated.*

Diff for: pkg/ext-proc/Dockerfile renamed to Dockerfile

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
## Multistage build
2-
FROM golang:1.22.5-alpine as build
2+
FROM golang:1.23-alpine as build
33
ENV CGO_ENABLED=0
44
ENV GOOS=linux
55
ENV GOARCH=amd64
66

77
WORKDIR /src
88
COPY . .
9+
WORKDIR /src/pkg/ext-proc
910
RUN go mod download
1011
RUN go build -o /ext-proc
1112
FROM alpine:latest
@@ -16,4 +17,4 @@ FROM gcr.io/distroless/base-debian10
1617
WORKDIR /
1718
COPY --from=build /ext-proc /ext-proc
1819

19-
ENTRYPOINT ["/ext-proc"]
20+
ENTRYPOINT ["/ext-proc"]

Diff for: examples/poc/ext-proc/go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
module ext-proc
1+
module ext-proc-poc
22

33
go 1.21
44

Diff for: pkg/ext-proc/backend/datastore.go

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package backend
2+
3+
import (
4+
"sync"
5+
6+
"inference.networking.x-k8s.io/llm-instance-gateway/api/v1alpha1"
7+
corev1 "k8s.io/api/core/v1"
8+
)
9+
10+
// The datastore is a local cache of relevant data for the given LLMServerPool (currently all pulled from k8s-api)
11+
type K8sDatastore struct {
12+
LLMServerPool *v1alpha1.LLMServerPool
13+
Pods *sync.Map
14+
Port string
15+
}
16+
17+
func (ds *K8sDatastore) GetPodIPs() []string {
18+
var ips []string
19+
ds.Pods.Range(func(name, pod any) bool {
20+
ips = append(ips, pod.(*corev1.Pod).Status.PodIP)
21+
return true
22+
})
23+
return ips
24+
}

Diff for: pkg/ext-proc/backend/endpointslice_controller.go

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package backend
2+
3+
import (
4+
"context"
5+
6+
discoveryv1 "k8s.io/api/discovery/v1"
7+
"k8s.io/apimachinery/pkg/runtime"
8+
"k8s.io/client-go/tools/record"
9+
"k8s.io/klog/v2"
10+
ctrl "sigs.k8s.io/controller-runtime"
11+
"sigs.k8s.io/controller-runtime/pkg/client"
12+
)
13+
14+
var (
15+
serviceOwnerLabel = "kubernetes.io/service-name"
16+
)
17+
18+
type EndpointSliceController struct {
19+
client.Client
20+
Scheme *runtime.Scheme
21+
Record record.EventRecorder
22+
ServerPoolName string
23+
ServiceName string
24+
Zone string
25+
Namespace string
26+
Datastore *K8sDatastore
27+
}
28+
29+
func (c *EndpointSliceController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
30+
klog.V(1).Info("reconciling EndpointSlice ", req.NamespacedName)
31+
32+
endpointSlice := &discoveryv1.EndpointSlice{}
33+
if err := c.Get(ctx, req.NamespacedName, endpointSlice); err != nil {
34+
klog.Error(err, "unable to get LLMServerPool")
35+
return ctrl.Result{}, err
36+
}
37+
38+
if !c.ownsEndPointSlice(endpointSlice.ObjectMeta.Labels) {
39+
return ctrl.Result{}, nil
40+
}
41+
42+
c.updateDatastore(endpointSlice)
43+
44+
return ctrl.Result{}, nil
45+
}
46+
47+
func (c *EndpointSliceController) updateDatastore(slice *discoveryv1.EndpointSlice) {
48+
podMap := make(map[Pod]bool)
49+
for _, endpoint := range slice.Endpoints {
50+
klog.V(4).Infof("Zone: %v \n endpoint: %+v \n", c.Zone, endpoint)
51+
if c.validPod(endpoint) {
52+
pod := Pod{Name: *&endpoint.TargetRef.Name, Address: endpoint.Addresses[0] + ":" + c.Datastore.Port}
53+
podMap[pod] = true
54+
c.Datastore.Pods.Store(pod, true)
55+
}
56+
}
57+
58+
removeOldPods := func(k, v any) bool {
59+
pod, ok := k.(Pod)
60+
if !ok {
61+
klog.Errorf("Unable to cast key to Pod: %v", k)
62+
return false
63+
}
64+
if _, ok := podMap[pod]; !ok {
65+
c.Datastore.Pods.Delete(pod)
66+
}
67+
return true
68+
}
69+
c.Datastore.Pods.Range(removeOldPods)
70+
}
71+
72+
func (c *EndpointSliceController) SetupWithManager(mgr ctrl.Manager) error {
73+
return ctrl.NewControllerManagedBy(mgr).
74+
For(&discoveryv1.EndpointSlice{}).
75+
Complete(c)
76+
}
77+
78+
func (c *EndpointSliceController) ownsEndPointSlice(labels map[string]string) bool {
79+
return labels[serviceOwnerLabel] == c.ServiceName
80+
}
81+
82+
func (c *EndpointSliceController) validPod(endpoint discoveryv1.Endpoint) bool {
83+
validZone := c.Zone == "" || c.Zone != "" && *endpoint.Zone == c.Zone
84+
return validZone && *endpoint.Conditions.Ready == true
85+
86+
}

Diff for: pkg/ext-proc/backend/fake.go

+5-10
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
package backend
22

3-
import "context"
4-
5-
type FakePodLister struct {
6-
Err error
7-
Pods PodSet
8-
}
3+
import (
4+
"context"
5+
"fmt"
6+
)
97

108
type FakePodMetricsClient struct {
119
Err map[Pod]error
@@ -16,9 +14,6 @@ func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, pod Pod, existi
1614
if err, ok := f.Err[pod]; ok {
1715
return nil, err
1816
}
17+
fmt.Printf("pod: %+v\n existing: %+v \n new: %+v \n", pod, existing, f.Res[pod])
1918
return f.Res[pod], nil
2019
}
21-
22-
func (fpl *FakePodLister) List() (PodSet, error) {
23-
return fpl.Pods, fpl.Err
24-
}
+109
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package backend
2+
3+
import (
4+
"testing"
5+
6+
"github.com/google/go-cmp/cmp"
7+
"inference.networking.x-k8s.io/llm-instance-gateway/api/v1alpha1"
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
)
10+
11+
func TestUpdateDatastore_LLMServerPoolController(t *testing.T) {
12+
tests := []struct {
13+
name string
14+
datastore K8sDatastore
15+
incomingServerPool *v1alpha1.LLMServerPool
16+
want K8sDatastore
17+
}{
18+
{
19+
name: "Update to new, fresh LLMServerPool",
20+
datastore: K8sDatastore{
21+
LLMServerPool: &v1alpha1.LLMServerPool{
22+
Spec: v1alpha1.LLMServerPoolSpec{
23+
ModelServerSelector: metav1.LabelSelector{
24+
MatchLabels: map[string]string{"app": "vllm"},
25+
},
26+
},
27+
ObjectMeta: metav1.ObjectMeta{
28+
Name: "test-pool",
29+
ResourceVersion: "Old and boring",
30+
},
31+
},
32+
},
33+
incomingServerPool: &v1alpha1.LLMServerPool{
34+
Spec: v1alpha1.LLMServerPoolSpec{
35+
ModelServerSelector: metav1.LabelSelector{
36+
MatchLabels: map[string]string{"app": "not-vllm"},
37+
},
38+
},
39+
ObjectMeta: metav1.ObjectMeta{
40+
Name: "test-pool",
41+
ResourceVersion: "New and fun",
42+
},
43+
},
44+
want: K8sDatastore{
45+
LLMServerPool: &v1alpha1.LLMServerPool{
46+
Spec: v1alpha1.LLMServerPoolSpec{
47+
ModelServerSelector: metav1.LabelSelector{
48+
MatchLabels: map[string]string{"app": "not-vllm"},
49+
},
50+
},
51+
ObjectMeta: metav1.ObjectMeta{
52+
Name: "test-pool",
53+
ResourceVersion: "New and fun",
54+
},
55+
},
56+
},
57+
},
58+
{
59+
name: "Do not update, resource version the same",
60+
datastore: K8sDatastore{
61+
LLMServerPool: &v1alpha1.LLMServerPool{
62+
Spec: v1alpha1.LLMServerPoolSpec{
63+
ModelServerSelector: metav1.LabelSelector{
64+
MatchLabels: map[string]string{"app": "vllm"},
65+
},
66+
},
67+
ObjectMeta: metav1.ObjectMeta{
68+
Name: "test-pool",
69+
ResourceVersion: "Old and boring",
70+
},
71+
},
72+
},
73+
incomingServerPool: &v1alpha1.LLMServerPool{
74+
Spec: v1alpha1.LLMServerPoolSpec{
75+
ModelServerSelector: metav1.LabelSelector{
76+
MatchLabels: map[string]string{"technically": "this-should-never-happen"},
77+
},
78+
},
79+
ObjectMeta: metav1.ObjectMeta{
80+
Name: "test-pool",
81+
ResourceVersion: "Old and boring",
82+
},
83+
},
84+
want: K8sDatastore{
85+
LLMServerPool: &v1alpha1.LLMServerPool{
86+
Spec: v1alpha1.LLMServerPoolSpec{
87+
ModelServerSelector: metav1.LabelSelector{
88+
MatchLabels: map[string]string{"app": "vllm"},
89+
},
90+
},
91+
ObjectMeta: metav1.ObjectMeta{
92+
Name: "test-pool",
93+
ResourceVersion: "Old and boring",
94+
},
95+
},
96+
},
97+
},
98+
}
99+
for _, test := range tests {
100+
t.Run(test.name, func(t *testing.T) {
101+
llmServerPoolController := &LLMServerPoolController{Datastore: &test.datastore}
102+
llmServerPoolController.updateDatastore(test.incomingServerPool)
103+
104+
if diff := cmp.Diff(test.want.LLMServerPool, llmServerPoolController.Datastore.LLMServerPool); diff != "" {
105+
t.Errorf("Unexpected output (-want +got): %v", diff)
106+
}
107+
})
108+
}
109+
}

Diff for: pkg/ext-proc/backend/llmserverpool_controller.go

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package backend
2+
3+
import (
4+
"context"
5+
6+
"sigs.k8s.io/controller-runtime/pkg/client"
7+
8+
"inference.networking.x-k8s.io/llm-instance-gateway/api/v1alpha1"
9+
"k8s.io/apimachinery/pkg/runtime"
10+
"k8s.io/client-go/tools/record"
11+
"k8s.io/klog/v2"
12+
ctrl "sigs.k8s.io/controller-runtime"
13+
)
14+
15+
const (
16+
controllerNamePrefix = "instance-gateway-"
17+
)
18+
19+
// LLMServerPoolController is the controller implementation for Instance Gateway resources
20+
type LLMServerPoolController struct {
21+
client.Client
22+
Scheme *runtime.Scheme
23+
Record record.EventRecorder
24+
ServerPoolName string
25+
Namespace string
26+
Datastore *K8sDatastore
27+
Port int
28+
Zone string
29+
}
30+
31+
func (c *LLMServerPoolController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
32+
if req.NamespacedName.Name != c.ServerPoolName && req.NamespacedName.Namespace != c.Namespace {
33+
return ctrl.Result{}, nil
34+
}
35+
klog.V(1).Info("reconciling LLMServerPool", req.NamespacedName)
36+
37+
serverPool := &v1alpha1.LLMServerPool{}
38+
if err := c.Get(ctx, req.NamespacedName, serverPool); err != nil {
39+
klog.Error(err, "unable to get LLMServerPool")
40+
return ctrl.Result{}, err
41+
}
42+
43+
c.updateDatastore(serverPool)
44+
45+
return ctrl.Result{}, nil
46+
}
47+
48+
func (c *LLMServerPoolController) updateDatastore(serverPool *v1alpha1.LLMServerPool) {
49+
if c.Datastore.LLMServerPool == nil || serverPool.ObjectMeta.ResourceVersion != c.Datastore.LLMServerPool.ObjectMeta.ResourceVersion {
50+
c.Datastore.LLMServerPool = serverPool
51+
}
52+
}
53+
54+
func (c *LLMServerPoolController) SetupWithManager(mgr ctrl.Manager) error {
55+
return ctrl.NewControllerManagedBy(mgr).
56+
For(&v1alpha1.LLMServerPool{}).
57+
Complete(c)
58+
}

0 commit comments

Comments
 (0)