Skip to content

Commit 7db0c98

Browse files
committed
Implementing LLMServerPool controller and Pod controller
1 parent 9270ff6 commit 7db0c98

File tree

9 files changed

+586
-46
lines changed

9 files changed

+586
-46
lines changed

Diff for: .gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ Dockerfile.cross
1515

1616
# Go workspace file
1717
go.work
18+
go.work.sum
1819

1920
# Kubernetes Generated files - skip generated files, except for vendored files
2021
!vendor/**/zz_generated.*

Diff for: pkg/ext-proc/Dockerfile renamed to Dockerfile

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
## Multistage build
2-
FROM golang:1.22.5-alpine as build
2+
FROM golang:1.23-alpine as build
33
ENV CGO_ENABLED=0
44
ENV GOOS=linux
55
ENV GOARCH=amd64
66

77
WORKDIR /src
88
COPY . .
9+
WORKDIR /src/pkg/ext-proc
910
RUN go mod download
1011
RUN go build -o /ext-proc
1112
FROM alpine:latest
@@ -16,4 +17,4 @@ FROM gcr.io/distroless/base-debian10
1617
WORKDIR /
1718
COPY --from=build /ext-proc /ext-proc
1819

19-
ENTRYPOINT ["/ext-proc"]
20+
ENTRYPOINT ["/ext-proc"]

Diff for: examples/poc/ext-proc/go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
module ext-proc
1+
module ext-proc-poc
22

33
go 1.21
44

Diff for: pkg/ext-proc/backend/datastore.go

+38
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package backend
2+
3+
import (
4+
"sync"
5+
6+
"inference.networking.x-k8s.io/llm-instance-gateway/api/v1alpha1"
7+
corev1 "k8s.io/api/core/v1"
8+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9+
"k8s.io/apimachinery/pkg/labels"
10+
"k8s.io/klog/v2"
11+
)
12+
13+
type Datastore struct {
14+
LLMServerPool *v1alpha1.LLMServerPool
15+
Pods sync.Map
16+
Port string
17+
}
18+
19+
func (ds *Datastore) GetPodIPs() []string {
20+
var ips []string
21+
ds.Pods.Range(func(name, pod any) bool {
22+
23+
ips = append(ips, pod.(*corev1.Pod).Status.PodIP)
24+
return true
25+
})
26+
return ips
27+
}
28+
29+
func (ds *Datastore) LabelsMatch(podLabels map[string]string) bool {
30+
selector, err := metav1.LabelSelectorAsSelector(&ds.LLMServerPool.Spec.ModelServerSelector)
31+
if err != nil {
32+
klog.Error(err.Error())
33+
return false
34+
}
35+
set := labels.Set(podLabels)
36+
return selector.Matches(set)
37+
38+
}

Diff for: pkg/ext-proc/backend/llmserverpool_controller.go

+199
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,199 @@
1+
package backend
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"golang.org/x/time/rate"
9+
10+
clientset "inference.networking.x-k8s.io/llm-instance-gateway/client-go/clientset/versioned"
11+
"inference.networking.x-k8s.io/llm-instance-gateway/client-go/clientset/versioned/scheme"
12+
informers "inference.networking.x-k8s.io/llm-instance-gateway/client-go/informers/externalversions/api/v1alpha1"
13+
listers "inference.networking.x-k8s.io/llm-instance-gateway/client-go/listers/api/v1alpha1"
14+
corev1 "k8s.io/api/core/v1"
15+
"k8s.io/apimachinery/pkg/api/errors"
16+
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
17+
"k8s.io/apimachinery/pkg/util/wait"
18+
"k8s.io/client-go/kubernetes"
19+
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
20+
"k8s.io/client-go/tools/cache"
21+
"k8s.io/client-go/tools/record"
22+
"k8s.io/client-go/util/workqueue"
23+
"k8s.io/klog/v2"
24+
)
25+
26+
const (
27+
controllerNamePrefix = "instance-gateway-"
28+
)
29+
30+
// LLMServerPoolController is the controller implementation for Instance Gateway resources
31+
type LLMServerPoolController struct {
32+
// kubeclientset is the standard kubernetes clientset
33+
kubeclientset kubernetes.Interface
34+
// clientset is the clientset for our own API group
35+
clientset clientset.Interface
36+
37+
llmServerPoolLister listers.LLMServerPoolLister
38+
llmServerPoolsSynced cache.InformerSynced
39+
40+
// workqueue is a rate limited work queue. This is used to queue work to be
41+
// processed instead of performing it as soon as a change happens. This
42+
// means we can ensure we only process a fixed amount of resources at a
43+
// time, and makes it easy to ensure we are never processing the same item
44+
// simultaneously in two different workers.
45+
workqueue workqueue.TypedRateLimitingInterface[cache.ObjectName]
46+
controllerName string
47+
recorder record.EventRecorder
48+
serverPoolName string
49+
datastore *Datastore
50+
}
51+
52+
func NewLLMServerPoolController(
53+
ctx context.Context,
54+
llmServerPoolName string,
55+
datastore *Datastore,
56+
kubeclientset kubernetes.Interface,
57+
llmServerPoolInformer informers.LLMServerPoolInformer) *LLMServerPoolController {
58+
59+
logger := klog.FromContext(ctx)
60+
utilruntime.Must(scheme.AddToScheme(scheme.Scheme))
61+
logger.V(4).Info("Creating event broadcaster")
62+
63+
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
64+
eventBroadcaster.StartStructuredLogging(0)
65+
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
66+
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerNamePrefix + llmServerPoolName})
67+
ratelimiter := workqueue.NewTypedMaxOfRateLimiter(
68+
workqueue.NewTypedItemExponentialFailureRateLimiter[cache.ObjectName](5*time.Millisecond, 1000*time.Second),
69+
&workqueue.TypedBucketRateLimiter[cache.ObjectName]{Limiter: rate.NewLimiter(rate.Limit(50), 300)},
70+
)
71+
72+
controller := &LLMServerPoolController{
73+
controllerName: controllerNamePrefix + llmServerPoolName,
74+
serverPoolName: llmServerPoolName,
75+
datastore: datastore,
76+
workqueue: workqueue.NewTypedRateLimitingQueue(ratelimiter),
77+
llmServerPoolsSynced: llmServerPoolInformer.Informer().HasSynced,
78+
llmServerPoolLister: llmServerPoolInformer.Lister(),
79+
recorder: recorder,
80+
}
81+
82+
llmServerPoolInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
83+
AddFunc: controller.enqueueLLMServerPool,
84+
UpdateFunc: func(old, new interface{}) {
85+
controller.enqueueLLMServerPool(new)
86+
},
87+
DeleteFunc: controller.enqueueLLMServerPool,
88+
})
89+
90+
return controller
91+
}
92+
93+
// Run will set up the event handlers for types we are interested in, as well
94+
// as syncing informer caches and starting workers. It will block until stopCh
95+
// is closed, at which point it will shutdown the workqueue and wait for
96+
// workers to finish processing their current work items.
97+
func (c *LLMServerPoolController) Run(ctx context.Context, workers int) error {
98+
defer utilruntime.HandleCrash()
99+
defer c.workqueue.ShutDown()
100+
logger := klog.FromContext(ctx)
101+
102+
// Start the informer factories to begin populating the informer caches
103+
logger.Info("Starting LLMServerPool controller:", c.controllerName)
104+
105+
// Wait for the caches to be synced before starting workers
106+
logger.Info("Waiting for informer caches to sync")
107+
108+
if ok := cache.WaitForCacheSync(ctx.Done(), c.llmServerPoolsSynced); !ok {
109+
return fmt.Errorf("failed to wait for caches to sync")
110+
}
111+
112+
logger.Info("Starting workers", "count", workers)
113+
// Launch two workers to process Foo resources
114+
for i := 0; i < workers; i++ {
115+
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
116+
}
117+
118+
logger.Info("Started workers")
119+
<-ctx.Done()
120+
logger.Info("Shutting down workers")
121+
122+
return nil
123+
}
124+
125+
// runWorker is a long-running function that will continually call the
126+
// processNextWorkItem function in order to read and process a message on the
127+
// workqueue.
128+
func (c *LLMServerPoolController) runWorker(ctx context.Context) {
129+
for c.processNextWorkItem(ctx) {
130+
}
131+
}
132+
133+
// processNextWorkItem will read a single work item off the workqueue and
134+
// attempt to process it, by calling the syncHandler.
135+
func (c *LLMServerPoolController) processNextWorkItem(ctx context.Context) bool {
136+
logger := klog.FromContext(ctx)
137+
objRef, shutdown := c.workqueue.Get()
138+
139+
if shutdown {
140+
return false
141+
}
142+
143+
// We call Done at the end of this func so the workqueue knows we have
144+
// finished processing this item. We also must remember to call Forget
145+
// if we do not want this work item being re-queued. For example, we do
146+
// not call Forget if a transient error occurs, instead the item is
147+
// put back on the workqueue and attempted again after a back-off
148+
// period.
149+
defer c.workqueue.Done(objRef)
150+
151+
// Do work and update the local datastore
152+
err := c.updateDatastore(objRef)
153+
if err == nil {
154+
// If no error occurs then we Forget this item so it does not
155+
// get queued again until another change happens.
156+
c.workqueue.Forget(objRef)
157+
logger.Info("Successfully synced", "objectName", objRef)
158+
return true
159+
}
160+
161+
// there was a failure so be sure to report it. This method allows for
162+
// pluggable error handling which can be used for things like
163+
// cluster-monitoring.
164+
utilruntime.HandleErrorWithContext(ctx, err, "Error syncing; requeuing for later retry", "objectReference", objRef)
165+
// since we failed, we should requeue the item to work on later. This
166+
// method will add a backoff to avoid hotlooping on particular items
167+
// (they're probably still not going to work right away) and overall
168+
// controller protection (everything I've done is broken, this controller
169+
// needs to calm down or it can starve other useful work) cases.
170+
c.workqueue.AddRateLimited(objRef)
171+
return true
172+
}
173+
174+
func (c *LLMServerPoolController) updateDatastore(objName cache.ObjectName) error {
175+
serverPool, err := c.llmServerPoolLister.LLMServerPools(objName.Namespace).Get(objName.Name)
176+
if err != nil {
177+
if errors.IsNotFound(err) {
178+
klog.Info("The parent LLMServerPool cannot be found, if it was deleted, this controller should be shut down shortly.")
179+
return nil
180+
}
181+
return err
182+
}
183+
184+
if c.datastore.LLMServerPool == nil || serverPool.ObjectMeta.ResourceVersion != c.datastore.LLMServerPool.ObjectMeta.ResourceVersion {
185+
c.datastore.LLMServerPool = serverPool
186+
}
187+
188+
return nil
189+
}
190+
191+
func (c *LLMServerPoolController) enqueueLLMServerPool(obj interface{}) {
192+
if objectRef, err := cache.ObjectToName(obj); err != nil {
193+
utilruntime.HandleError(err)
194+
return
195+
} else if objectRef.Name == c.serverPoolName {
196+
// Only add the relevant LLMServerPool to the queue
197+
c.workqueue.Add(objectRef)
198+
}
199+
}

0 commit comments

Comments
 (0)