Skip to content

Commit 741da3d

Browse files
committed
Implementing LLMServerPool controller and Pod controller
1 parent 9270ff6 commit 741da3d

File tree

9 files changed

+259
-47
lines changed

9 files changed

+259
-47
lines changed

Diff for: .gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Dockerfile.cross
1515

1616
# Go workspace file
1717
go.work
18+
go.work.sum
1819

1920
# Kubernetes Generated files - skip generated files, except for vendored files
2021
!vendor/**/zz_generated.*

Diff for: pkg/ext-proc/Dockerfile renamed to Dockerfile

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
## Multistage build
2-
FROM golang:1.22.5-alpine as build
2+
FROM golang:1.23-alpine as build
33
ENV CGO_ENABLED=0
44
ENV GOOS=linux
55
ENV GOARCH=amd64
66

77
WORKDIR /src
88
COPY . .
9+
WORKDIR /src/pkg/ext-proc
910
RUN go mod download
1011
RUN go build -o /ext-proc
1112
FROM alpine:latest
@@ -16,4 +17,4 @@ FROM gcr.io/distroless/base-debian10
1617
WORKDIR /
1718
COPY --from=build /ext-proc /ext-proc
1819

19-
ENTRYPOINT ["/ext-proc"]
20+
ENTRYPOINT ["/ext-proc"]

Diff for: examples/poc/ext-proc/go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
module ext-proc
1+
module ext-proc-poc
22

33
go 1.21
44

Diff for: pkg/ext-proc/backend/datastore.go

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package backend
2+
3+
import (
4+
"sync"
5+
6+
"inference.networking.x-k8s.io/llm-instance-gateway/api/v1alpha1"
7+
corev1 "k8s.io/api/core/v1"
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
"k8s.io/apimachinery/pkg/labels"
10+
"k8s.io/klog/v2"
11+
)
12+
13+
type Datastore struct {
14+
LLMServerPool *v1alpha1.LLMServerPool
15+
Pods sync.Map
16+
Port string
17+
}
18+
19+
func (ds *Datastore) GetPodIPs() []string {
20+
var ips []string
21+
ds.Pods.Range(func(name, pod any) bool {
22+
23+
ips = append(ips, pod.(*corev1.Pod).Status.PodIP)
24+
return true
25+
})
26+
return ips
27+
}
28+
29+
func (ds *Datastore) LabelsMatch(podLabels map[string]string) bool {
30+
selector, err := metav1.LabelSelectorAsSelector(&ds.LLMServerPool.Spec.ModelServerSelector)
31+
if err != nil {
32+
klog.Error(err.Error())
33+
return false
34+
}
35+
set := labels.Set(podLabels)
36+
return selector.Matches(set)
37+
38+
}

Diff for: pkg/ext-proc/backend/llmserverpool_controller.go

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package backend
2+
3+
import (
4+
"context"
5+
6+
"sigs.k8s.io/controller-runtime/pkg/client"
7+
8+
"inference.networking.x-k8s.io/llm-instance-gateway/api/v1alpha1"
9+
"k8s.io/apimachinery/pkg/runtime"
10+
"k8s.io/client-go/tools/record"
11+
"k8s.io/klog/v2"
12+
ctrl "sigs.k8s.io/controller-runtime"
13+
)
14+
15+
const (
16+
controllerNamePrefix = "instance-gateway-"
17+
)
18+
19+
// LLMServerPoolController is the controller implementation for Instance Gateway resources
20+
type LLMServerPoolController struct {
21+
client.Client
22+
Scheme *runtime.Scheme
23+
Record record.EventRecorder
24+
ServerPoolName string
25+
Datastore *Datastore
26+
}
27+
28+
func (c *LLMServerPoolController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
29+
logger := klog.FromContext(ctx)
30+
logger.V(1).Info("reconciling LLMServerPool")
31+
32+
var serverPool *v1alpha1.LLMServerPool
33+
if err := c.Get(ctx, req.NamespacedName, serverPool); err != nil {
34+
logger.Error(err, "unable to get LLMServerPool")
35+
return ctrl.Result{}, err
36+
}
37+
38+
c.updateDatastore(serverPool)
39+
40+
return ctrl.Result{}, nil
41+
}
42+
43+
func (c *LLMServerPoolController) updateDatastore(serverPool *v1alpha1.LLMServerPool) {
44+
if c.Datastore.LLMServerPool == nil || serverPool.ObjectMeta.ResourceVersion != c.Datastore.LLMServerPool.ObjectMeta.ResourceVersion {
45+
c.Datastore.LLMServerPool = serverPool
46+
}
47+
}
48+
49+
func (c *LLMServerPoolController) SetupWithManager(mgr ctrl.Manager) error {
50+
return ctrl.NewControllerManagedBy(mgr).
51+
For(&v1alpha1.LLMServerPool{}).
52+
Complete(c)
53+
}

Diff for: pkg/ext-proc/backend/pod_controller.go

+54
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package backend
2+
3+
import (
4+
"context"
5+
6+
v1 "k8s.io/api/core/v1"
7+
"k8s.io/apimachinery/pkg/runtime"
8+
ctrl "sigs.k8s.io/controller-runtime"
9+
"sigs.k8s.io/controller-runtime/pkg/client"
10+
11+
"k8s.io/client-go/tools/record"
12+
"k8s.io/klog/v2"
13+
)
14+
15+
type PodController struct {
16+
client.Client
17+
Scheme *runtime.Scheme
18+
Record record.EventRecorder
19+
Datastore *Datastore
20+
}
21+
22+
func (c *PodController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
23+
logger := klog.FromContext(ctx)
24+
logger.V(1).Info("reconciling Pod")
25+
26+
var pod *v1.Pod
27+
if err := c.Get(ctx, req.NamespacedName, pod); err != nil {
28+
logger.Error(err, "unable to get Pod")
29+
return ctrl.Result{}, err
30+
}
31+
32+
c.updateDatastore(pod)
33+
34+
return ctrl.Result{}, nil
35+
}
36+
37+
func (c *PodController) updateDatastore(pod *v1.Pod) {
38+
if pod.ObjectMeta.DeletionTimestamp != nil {
39+
klog.Info("Kellen; Pod deletion timestamp is not nil %v", pod.ObjectMeta.DeletionTimestamp)
40+
}
41+
42+
// if labels don't match or pod is scheduled to be deleted, remove in case the pod was part of this pool, otherwise, add.
43+
if !c.Datastore.LabelsMatch(pod.ObjectMeta.Labels) || pod.ObjectMeta.DeletionTimestamp != nil {
44+
c.Datastore.Pods.Delete(pod.Name)
45+
} else {
46+
c.Datastore.Pods.Store(pod.Name, pod)
47+
}
48+
}
49+
50+
func (c *PodController) SetupWithManager(mgr ctrl.Manager) error {
51+
return ctrl.NewControllerManagedBy(mgr).
52+
For(&v1.Pod{}).
53+
Complete(c)
54+
}

Diff for: pkg/ext-proc/backend/provider.go

+10-13
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,19 @@ import (
77
"time"
88

99
"go.uber.org/multierr"
10+
corev1 "k8s.io/api/core/v1"
1011
klog "k8s.io/klog/v2"
1112
)
1213

1314
const (
1415
fetchMetricsTimeout = 5 * time.Second
1516
)
1617

17-
func NewProvider(pmc PodMetricsClient, pl PodLister) *Provider {
18+
func NewProvider(pmc PodMetricsClient, datastore *Datastore) *Provider {
1819
p := &Provider{
1920
podMetrics: sync.Map{},
2021
pmc: pmc,
21-
pl: pl,
22+
datastore: datastore,
2223
}
2324
return p
2425
}
@@ -28,17 +29,13 @@ type Provider struct {
2829
// key: Pod, value: *PodMetrics
2930
podMetrics sync.Map
3031
pmc PodMetricsClient
31-
pl PodLister
32+
datastore *Datastore
3233
}
3334

3435
type PodMetricsClient interface {
3536
FetchMetrics(ctx context.Context, pod Pod, existing *PodMetrics) (*PodMetrics, error)
3637
}
3738

38-
type PodLister interface {
39-
List() (PodSet, error)
40-
}
41-
4239
func (p *Provider) AllPodMetrics() []*PodMetrics {
4340
res := []*PodMetrics{}
4441
fn := func(k, v any) bool {
@@ -108,13 +105,11 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio
108105
// refreshPodsOnce lists pods and updates keys in the podMetrics map.
109106
// Note this function doesn't update the PodMetrics value, it's done separately.
110107
func (p *Provider) refreshPodsOnce() error {
111-
pods, err := p.pl.List()
112-
if err != nil {
113-
return err
114-
}
115108
// merge new pods with cached ones.
116109
// add new pod to the map
117-
for pod := range pods {
110+
addNewPods := func(k, v any) bool {
111+
k8sPod := v.(*corev1.Pod)
112+
pod := Pod{Name: k8sPod.Name, Namespace: k8sPod.Namespace, Address: k8sPod.Status.PodIP + ":" + p.datastore.Port}
118113
if _, ok := p.podMetrics.Load(pod); !ok {
119114
new := &PodMetrics{
120115
Pod: pod,
@@ -124,16 +119,18 @@ func (p *Provider) refreshPodsOnce() error {
124119
}
125120
p.podMetrics.Store(pod, new)
126121
}
122+
return true
127123
}
128124
// remove pods that don't exist any more.
129125
mergeFn := func(k, v any) bool {
130126
pod := k.(Pod)
131-
if _, ok := pods[pod]; !ok {
127+
if _, ok := p.datastore.Pods.Load(pod.Name); !ok {
132128
p.podMetrics.Delete(pod)
133129
}
134130
return true
135131
}
136132
p.podMetrics.Range(mergeFn)
133+
p.datastore.Pods.Range(addNewPods)
137134
return nil
138135
}
139136

Diff for: pkg/ext-proc/go.mod

+36-5
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module ext-proc
22

3-
go 1.21
3+
go 1.22.0
44

55
require (
66
github.com/bojand/ghz v0.120.0
@@ -9,12 +9,14 @@ require (
99
github.com/jhump/protoreflect v1.15.1
1010
github.com/prometheus/client_model v0.6.1
1111
github.com/prometheus/common v0.55.0
12-
go.uber.org/multierr v1.9.0
12+
go.uber.org/multierr v1.11.0
1313
google.golang.org/grpc v1.67.0
1414
google.golang.org/protobuf v1.34.2
1515
k8s.io/klog/v2 v2.130.1
1616
)
1717

18+
require sigs.k8s.io/controller-runtime v0.19.0 // indirect
19+
1820
require (
1921
cel.dev/expr v0.16.0 // indirect
2022
cloud.google.com/go/compute/metadata v0.5.0 // indirect
@@ -27,31 +29,60 @@ require (
2729
github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect
2830
github.com/cespare/xxhash/v2 v2.3.0 // indirect
2931
github.com/cncf/xds/go v0.0.0-20240723142845-024c85f92f20 // indirect
32+
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
3033
github.com/dustin/go-humanize v1.0.1 // indirect
34+
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
3135
github.com/envoyproxy/protoc-gen-validate v1.1.0 // indirect
32-
github.com/go-logr/logr v1.4.1 // indirect
36+
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
37+
github.com/go-logr/logr v1.4.2 // indirect
38+
github.com/go-openapi/jsonpointer v0.19.6 // indirect
39+
github.com/go-openapi/jsonreference v0.20.2 // indirect
40+
github.com/go-openapi/swag v0.22.4 // indirect
3341
github.com/gogo/protobuf v1.3.2 // indirect
3442
github.com/golang/protobuf v1.5.4 // indirect
43+
github.com/google/gnostic-models v0.6.8 // indirect
44+
github.com/google/go-cmp v0.6.0 // indirect
45+
github.com/google/gofuzz v1.2.0 // indirect
3546
github.com/google/uuid v1.6.0 // indirect
3647
github.com/huandu/xstrings v1.3.3 // indirect
3748
github.com/imdario/mergo v0.3.11 // indirect
3849
github.com/jinzhu/configor v1.2.1 // indirect
39-
github.com/kr/text v0.2.0 // indirect
50+
github.com/josharian/intern v1.0.0 // indirect
51+
github.com/json-iterator/go v1.1.12 // indirect
52+
github.com/mailru/easyjson v0.7.7 // indirect
4053
github.com/mitchellh/copystructure v1.0.0 // indirect
4154
github.com/mitchellh/reflectwalk v1.0.1 // indirect
55+
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
56+
github.com/modern-go/reflect2 v1.0.2 // indirect
4257
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
4358
github.com/pkg/errors v0.9.1 // indirect
4459
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
4560
github.com/shopspring/decimal v1.2.0 // indirect
4661
github.com/spf13/cast v1.4.1 // indirect
47-
go.uber.org/atomic v1.7.0 // indirect
62+
github.com/x448/float16 v0.8.4 // indirect
4863
golang.org/x/crypto v0.26.0 // indirect
4964
golang.org/x/net v0.28.0 // indirect
5065
golang.org/x/oauth2 v0.22.0 // indirect
5166
golang.org/x/sync v0.8.0 // indirect
5267
golang.org/x/sys v0.24.0 // indirect
68+
golang.org/x/term v0.23.0 // indirect
5369
golang.org/x/text v0.17.0 // indirect
70+
golang.org/x/time v0.3.0 // indirect
5471
google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 // indirect
5572
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect
73+
gopkg.in/inf.v0 v0.9.1 // indirect
5674
gopkg.in/yaml.v2 v2.4.0 // indirect
75+
gopkg.in/yaml.v3 v3.0.1 // indirect
76+
inference.networking.x-k8s.io/llm-instance-gateway v0.0.0
77+
k8s.io/api v0.31.1 // indirect
78+
k8s.io/apimachinery v0.31.1
79+
k8s.io/client-go v0.31.1
80+
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect
81+
k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect
82+
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
83+
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
84+
sigs.k8s.io/yaml v1.4.0 // indirect
5785
)
86+
87+
//replace inference.networking.x-k8s.io/llm-instance-gateway v0.0.0 => /usr/local/google/home/kfswain/repos/kubernetes/llm-instance-gateway
88+
replace inference.networking.x-k8s.io/llm-instance-gateway v0.0.0 => /src

0 commit comments

Comments
 (0)