Skip to content

Commit 583fd27

Browse files
committed
Implementing LLMServerPool controller and Pod controller
1 parent 9270ff6 commit 583fd27

15 files changed

+714
-74
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Dockerfile.cross
1515

1616
# Go workspace file
1717
go.work
18+
go.work.sum
1819

1920
# Kubernetes Generated files - skip generated files, except for vendored files
2021
!vendor/**/zz_generated.*

pkg/ext-proc/Dockerfile renamed to Dockerfile

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
## Multistage build
2-
FROM golang:1.22.5-alpine as build
2+
FROM golang:1.23-alpine as build
33
ENV CGO_ENABLED=0
44
ENV GOOS=linux
55
ENV GOARCH=amd64
66

77
WORKDIR /src
88
COPY . .
9+
WORKDIR /src/pkg/ext-proc
910
RUN go mod download
1011
RUN go build -o /ext-proc
1112
FROM alpine:latest
@@ -16,4 +17,4 @@ FROM gcr.io/distroless/base-debian10
1617
WORKDIR /
1718
COPY --from=build /ext-proc /ext-proc
1819

19-
ENTRYPOINT ["/ext-proc"]
20+
ENTRYPOINT ["/ext-proc"]

examples/poc/ext-proc/go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
module ext-proc
1+
module ext-proc-poc
22

33
go 1.21
44

pkg/ext-proc/backend/datastore.go

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package backend
2+
3+
import (
4+
"sync"
5+
6+
"inference.networking.x-k8s.io/llm-instance-gateway/api/v1alpha1"
7+
corev1 "k8s.io/api/core/v1"
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
"k8s.io/apimachinery/pkg/labels"
10+
"k8s.io/klog/v2"
11+
)
12+
13+
// The datastore is a local cache of relevant data for the given LLMServerPool (currently all pulled from k8s-api)
14+
type K8sDatastore struct {
15+
LLMServerPool *v1alpha1.LLMServerPool
16+
Pods *sync.Map
17+
Port string
18+
}
19+
20+
func (ds *K8sDatastore) GetPodIPs() []string {
21+
var ips []string
22+
ds.Pods.Range(func(name, pod any) bool {
23+
24+
ips = append(ips, pod.(*corev1.Pod).Status.PodIP)
25+
return true
26+
})
27+
return ips
28+
}
29+
30+
func (ds *K8sDatastore) LabelsMatch(podLabels map[string]string) bool {
31+
selector, err := metav1.LabelSelectorAsSelector(&ds.LLMServerPool.Spec.ModelServerSelector)
32+
if err != nil {
33+
klog.Error(err.Error())
34+
return false
35+
}
36+
if selector == nil {
37+
return false
38+
}
39+
set := labels.Set(podLabels)
40+
return selector.Matches(set)
41+
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package backend
2+
3+
import (
4+
"context"
5+
6+
discoveryv1 "k8s.io/api/discovery/v1"
7+
"k8s.io/apimachinery/pkg/runtime"
8+
"k8s.io/apimachinery/pkg/types"
9+
"k8s.io/client-go/tools/record"
10+
"k8s.io/klog/v2"
11+
ctrl "sigs.k8s.io/controller-runtime"
12+
"sigs.k8s.io/controller-runtime/pkg/client"
13+
)
14+
15+
var (
16+
labelName = "kubernetes.io/service-name"
17+
)
18+
19+
type EndpointSliceController struct {
20+
client.Client
21+
Scheme *runtime.Scheme
22+
Record record.EventRecorder
23+
ServerPoolName string
24+
Zone string
25+
Namespace string
26+
Datastore *K8sDatastore
27+
}
28+
29+
func (c *EndpointSliceController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
30+
if !c.ownsEndPointSlice(req.NamespacedName) {
31+
return ctrl.Result{}, nil
32+
}
33+
34+
endpointSlice := &discoveryv1.EndpointSlice{}
35+
if err := c.Get(ctx, req.NamespacedName, endpointSlice); err != nil {
36+
klog.Error(err, "unable to get LLMServerPool")
37+
return ctrl.Result{}, err
38+
}
39+
40+
c.updateDatastore(endpointSlice)
41+
42+
return ctrl.Result{}, nil
43+
}
44+
45+
func (c *EndpointSliceController) ownsEndPointSlice(fullName types.NamespacedName) bool {
46+
name := c.ServerPoolName
47+
if c.Zone != "" {
48+
name = c.ServerPoolName + "-" + c.Zone
49+
}
50+
51+
return types.NamespacedName{Name: name, Namespace: c.Namespace} == fullName
52+
}
53+
54+
func (c *EndpointSliceController) updateDatastore(slice *discoveryv1.EndpointSlice) {
55+
for _, endpoint := range slice.Endpoints {
56+
endpoint.Addresses
57+
}
58+
59+
}

pkg/ext-proc/backend/fake.go

+5-10
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
package backend
22

3-
import "context"
4-
5-
type FakePodLister struct {
6-
Err error
7-
Pods PodSet
8-
}
3+
import (
4+
"context"
5+
"fmt"
6+
)
97

108
type FakePodMetricsClient struct {
119
Err map[Pod]error
@@ -16,9 +14,6 @@ func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, pod Pod, existi
1614
if err, ok := f.Err[pod]; ok {
1715
return nil, err
1816
}
17+
fmt.Printf("pod: %+v\n existing: %+v \n new: %+v \n", pod, existing, f.Res[pod])
1918
return f.Res[pod], nil
2019
}
21-
22-
func (fpl *FakePodLister) List() (PodSet, error) {
23-
return fpl.Pods, fpl.Err
24-
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package backend
2+
3+
import (
4+
"testing"
5+
6+
"github.com/google/go-cmp/cmp"
7+
"inference.networking.x-k8s.io/llm-instance-gateway/api/v1alpha1"
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
)
10+
11+
func TestUpdateDatastore_LLMServerPoolController(t *testing.T) {
12+
tests := []struct {
13+
name string
14+
datastore K8sDatastore
15+
incomingServerPool *v1alpha1.LLMServerPool
16+
want K8sDatastore
17+
}{
18+
{
19+
name: "Update to new, fresh LLMServerPool",
20+
datastore: K8sDatastore{
21+
LLMServerPool: &v1alpha1.LLMServerPool{
22+
Spec: v1alpha1.LLMServerPoolSpec{
23+
ModelServerSelector: metav1.LabelSelector{
24+
MatchLabels: map[string]string{"app": "vllm"},
25+
},
26+
},
27+
ObjectMeta: metav1.ObjectMeta{
28+
Name: "test-pool",
29+
ResourceVersion: "Old and boring",
30+
},
31+
},
32+
},
33+
incomingServerPool: &v1alpha1.LLMServerPool{
34+
Spec: v1alpha1.LLMServerPoolSpec{
35+
ModelServerSelector: metav1.LabelSelector{
36+
MatchLabels: map[string]string{"app": "not-vllm"},
37+
},
38+
},
39+
ObjectMeta: metav1.ObjectMeta{
40+
Name: "test-pool",
41+
ResourceVersion: "New and fun",
42+
},
43+
},
44+
want: K8sDatastore{
45+
LLMServerPool: &v1alpha1.LLMServerPool{
46+
Spec: v1alpha1.LLMServerPoolSpec{
47+
ModelServerSelector: metav1.LabelSelector{
48+
MatchLabels: map[string]string{"app": "not-vllm"},
49+
},
50+
},
51+
ObjectMeta: metav1.ObjectMeta{
52+
Name: "test-pool",
53+
ResourceVersion: "New and fun",
54+
},
55+
},
56+
},
57+
},
58+
{
59+
name: "Do not update, resource version the same",
60+
datastore: K8sDatastore{
61+
LLMServerPool: &v1alpha1.LLMServerPool{
62+
Spec: v1alpha1.LLMServerPoolSpec{
63+
ModelServerSelector: metav1.LabelSelector{
64+
MatchLabels: map[string]string{"app": "vllm"},
65+
},
66+
},
67+
ObjectMeta: metav1.ObjectMeta{
68+
Name: "test-pool",
69+
ResourceVersion: "Old and boring",
70+
},
71+
},
72+
},
73+
incomingServerPool: &v1alpha1.LLMServerPool{
74+
Spec: v1alpha1.LLMServerPoolSpec{
75+
ModelServerSelector: metav1.LabelSelector{
76+
MatchLabels: map[string]string{"technically": "this-should-never-happen"},
77+
},
78+
},
79+
ObjectMeta: metav1.ObjectMeta{
80+
Name: "test-pool",
81+
ResourceVersion: "Old and boring",
82+
},
83+
},
84+
want: K8sDatastore{
85+
LLMServerPool: &v1alpha1.LLMServerPool{
86+
Spec: v1alpha1.LLMServerPoolSpec{
87+
ModelServerSelector: metav1.LabelSelector{
88+
MatchLabels: map[string]string{"app": "vllm"},
89+
},
90+
},
91+
ObjectMeta: metav1.ObjectMeta{
92+
Name: "test-pool",
93+
ResourceVersion: "Old and boring",
94+
},
95+
},
96+
},
97+
},
98+
}
99+
for _, test := range tests {
100+
t.Run(test.name, func(t *testing.T) {
101+
llmServerPoolController := &LLMServerPoolController{Datastore: &test.datastore}
102+
llmServerPoolController.updateDatastore(test.incomingServerPool)
103+
104+
if diff := cmp.Diff(test.want.LLMServerPool, llmServerPoolController.Datastore.LLMServerPool); diff != "" {
105+
t.Errorf("Unexpected output (-want +got): %v", diff)
106+
}
107+
})
108+
}
109+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package backend
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"sigs.k8s.io/controller-runtime/pkg/client"
8+
9+
"inference.networking.x-k8s.io/llm-instance-gateway/api/v1alpha1"
10+
v1 "k8s.io/api/core/v1"
11+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12+
"k8s.io/apimachinery/pkg/runtime"
13+
"k8s.io/client-go/tools/record"
14+
"k8s.io/klog/v2"
15+
ctrl "sigs.k8s.io/controller-runtime"
16+
)
17+
18+
const (
19+
controllerNamePrefix = "instance-gateway-"
20+
)
21+
22+
// LLMServerPoolController is the controller implementation for Instance Gateway resources
23+
type LLMServerPoolController struct {
24+
client.Client
25+
Scheme *runtime.Scheme
26+
Record record.EventRecorder
27+
ServerPoolName string
28+
Namespace string
29+
Datastore *K8sDatastore
30+
Port int
31+
Zone string
32+
}
33+
34+
func (c *LLMServerPoolController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
35+
if req.NamespacedName.Name != c.ServerPoolName && req.NamespacedName.Namespace != c.Namespace {
36+
return ctrl.Result{}, nil
37+
}
38+
klog.V(1).Info("reconciling LLMServerPool", req.NamespacedName)
39+
40+
serverPool := &v1alpha1.LLMServerPool{}
41+
if err := c.Get(ctx, req.NamespacedName, serverPool); err != nil {
42+
klog.Error(err, "unable to get LLMServerPool")
43+
return ctrl.Result{}, err
44+
}
45+
46+
needNewService := true
47+
if needNewService {
48+
labels, err := metav1.LabelSelectorAsMap(&c.Datastore.LLMServerPool.Spec.ModelServerSelector)
49+
if err != nil {
50+
klog.Error(err, "error converting label selector to map.")
51+
}
52+
service := &v1.Service{
53+
ObjectMeta: metav1.ObjectMeta{
54+
Name: controllerNamePrefix + c.ServerPoolName,
55+
//figure out ownership labels
56+
},
57+
Spec: v1.ServiceSpec{
58+
Ports: []v1.ServicePort{
59+
{
60+
Port: int32(c.Port),
61+
},
62+
},
63+
Selector: labels,
64+
},
65+
}
66+
if err := c.Create(ctx, service); err != nil {
67+
klog.Error(err, "unable to create service")
68+
return ctrl.Result{RequeueAfter: time.Second * 5}, err
69+
}
70+
}
71+
72+
c.updateDatastore(serverPool)
73+
74+
return ctrl.Result{}, nil
75+
}
76+
77+
func (c *LLMServerPoolController) updateDatastore(serverPool *v1alpha1.LLMServerPool) {
78+
if c.Datastore.LLMServerPool == nil || serverPool.ObjectMeta.ResourceVersion != c.Datastore.LLMServerPool.ObjectMeta.ResourceVersion {
79+
c.Datastore.LLMServerPool = serverPool
80+
}
81+
}
82+
83+
func (c *LLMServerPoolController) SetupWithManager(mgr ctrl.Manager) error {
84+
return ctrl.NewControllerManagedBy(mgr).
85+
For(&v1alpha1.LLMServerPool{}).
86+
Complete(c)
87+
}

0 commit comments

Comments
 (0)