Skip to content

LLMServerPool Implementation #36

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ Dockerfile.cross

# Go workspace file
go.work
go.work.sum

# Kubernetes Generated files - skip generated files, except for vendored files
!vendor/**/zz_generated.*
Expand Down
5 changes: 3 additions & 2 deletions pkg/ext-proc/Dockerfile → Dockerfile
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
## Multistage build
FROM golang:1.22.5-alpine as build
FROM golang:1.23-alpine as build
ENV CGO_ENABLED=0
ENV GOOS=linux
ENV GOARCH=amd64

WORKDIR /src
COPY . .
WORKDIR /src/pkg/ext-proc
RUN go mod download
RUN go build -o /ext-proc
FROM alpine:latest
Expand All @@ -16,4 +17,4 @@ FROM gcr.io/distroless/base-debian10
WORKDIR /
COPY --from=build /ext-proc /ext-proc

ENTRYPOINT ["/ext-proc"]
ENTRYPOINT ["/ext-proc"]
2 changes: 1 addition & 1 deletion examples/poc/ext-proc/go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module ext-proc
module ext-proc-poc

go 1.21

Expand Down
24 changes: 24 additions & 0 deletions pkg/ext-proc/backend/datastore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package backend

import (
"sync"

"inference.networking.x-k8s.io/llm-instance-gateway/api/v1alpha1"
corev1 "k8s.io/api/core/v1"
)

// The datastore is a local cache of relevant data for the given LLMServerPool (currently all pulled from k8s-api)
type K8sDatastore struct {
LLMServerPool *v1alpha1.LLMServerPool
Pods *sync.Map
Port string
}

func (ds *K8sDatastore) GetPodIPs() []string {
var ips []string
ds.Pods.Range(func(name, pod any) bool {
ips = append(ips, pod.(*corev1.Pod).Status.PodIP)
return true
})
return ips
}
86 changes: 86 additions & 0 deletions pkg/ext-proc/backend/endpointslice_reconciler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package backend

import (
"context"

discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

var (
serviceOwnerLabel = "kubernetes.io/service-name"
)

type EndpointSliceReconciler struct {
client.Client
Scheme *runtime.Scheme
Record record.EventRecorder
ServerPoolName string
ServiceName string
Zone string
Namespace string
Datastore *K8sDatastore
}

func (c *EndpointSliceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
klog.V(1).Info("reconciling EndpointSlice ", req.NamespacedName)

endpointSlice := &discoveryv1.EndpointSlice{}
if err := c.Get(ctx, req.NamespacedName, endpointSlice); err != nil {
klog.Error(err, "unable to get LLMServerPool")
return ctrl.Result{}, err
}

if !c.ownsEndPointSlice(endpointSlice.ObjectMeta.Labels) {
return ctrl.Result{}, nil
}

c.updateDatastore(endpointSlice)

return ctrl.Result{}, nil
}

func (c *EndpointSliceReconciler) updateDatastore(slice *discoveryv1.EndpointSlice) {
podMap := make(map[Pod]bool)
for _, endpoint := range slice.Endpoints {
klog.V(4).Infof("Zone: %v \n endpoint: %+v \n", c.Zone, endpoint)
if c.validPod(endpoint) {
pod := Pod{Name: *&endpoint.TargetRef.Name, Address: endpoint.Addresses[0] + ":" + c.Datastore.Port}
podMap[pod] = true
c.Datastore.Pods.Store(pod, true)
}
}

removeOldPods := func(k, v any) bool {
pod, ok := k.(Pod)
if !ok {
klog.Errorf("Unable to cast key to Pod: %v", k)
return false
}
if _, ok := podMap[pod]; !ok {
c.Datastore.Pods.Delete(pod)
}
return true
}
c.Datastore.Pods.Range(removeOldPods)
}

func (c *EndpointSliceReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&discoveryv1.EndpointSlice{}).
Complete(c)
}

func (c *EndpointSliceReconciler) ownsEndPointSlice(labels map[string]string) bool {
return labels[serviceOwnerLabel] == c.ServiceName
}

func (c *EndpointSliceReconciler) validPod(endpoint discoveryv1.Endpoint) bool {
validZone := c.Zone == "" || c.Zone != "" && *endpoint.Zone == c.Zone
return validZone && *endpoint.Conditions.Ready == true

}
196 changes: 196 additions & 0 deletions pkg/ext-proc/backend/endpointslice_reconcilier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package backend

import (
"sync"
"testing"

v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
)

var (
basePod1 = Pod{Name: "pod1"}
basePod2 = Pod{Name: "pod2"}
basePod3 = Pod{Name: "pod3"}
)

func TestUpdateDatastore_EndpointSliceReconciler(t *testing.T) {
tests := []struct {
name string
datastore K8sDatastore
incomingSlice *discoveryv1.EndpointSlice
want K8sDatastore
}{
{
name: "Add new pod",
datastore: K8sDatastore{
Pods: populateMap(basePod1, basePod2),
Port: "8000",
},
incomingSlice: &discoveryv1.EndpointSlice{
Endpoints: []discoveryv1.Endpoint{
{
TargetRef: &v1.ObjectReference{
Name: "pod1",
},
Zone: new(string),
Conditions: discoveryv1.EndpointConditions{
Ready: truePointer(),
},
Addresses: []string{"0.0.0.0"},
},
{
TargetRef: &v1.ObjectReference{
Name: "pod2",
},
Zone: new(string),
Conditions: discoveryv1.EndpointConditions{
Ready: truePointer(),
},
Addresses: []string{"0.0.0.0"},
},
{
TargetRef: &v1.ObjectReference{
Name: "pod3",
},
Zone: new(string),
Conditions: discoveryv1.EndpointConditions{
Ready: truePointer(),
},
Addresses: []string{"0.0.0.0"},
},
},
},
want: K8sDatastore{
Pods: populateMap(basePod1, basePod2, basePod3),
Port: "8000",
},
},
{
name: "New pod, but its not ready yet. Do not add.",
datastore: K8sDatastore{
Pods: populateMap(basePod1, basePod2),
Port: "8000",
},
incomingSlice: &discoveryv1.EndpointSlice{
Endpoints: []discoveryv1.Endpoint{
{
TargetRef: &v1.ObjectReference{
Name: "pod1",
},
Zone: new(string),
Conditions: discoveryv1.EndpointConditions{
Ready: truePointer(),
},
Addresses: []string{"0.0.0.0"},
},
{
TargetRef: &v1.ObjectReference{
Name: "pod2",
},
Zone: new(string),
Conditions: discoveryv1.EndpointConditions{
Ready: truePointer(),
},
Addresses: []string{"0.0.0.0"},
},
{
TargetRef: &v1.ObjectReference{
Name: "pod3",
},
Zone: new(string),
Conditions: discoveryv1.EndpointConditions{
Ready: new(bool),
},
Addresses: []string{"0.0.0.0"},
},
},
},
want: K8sDatastore{
Pods: populateMap(basePod1, basePod2),
Port: "8000",
},
},
{
name: "Existing pod not ready, new pod added, and is ready",
datastore: K8sDatastore{
Pods: populateMap(basePod1, basePod2),
Port: "8000",
},
incomingSlice: &discoveryv1.EndpointSlice{
Endpoints: []discoveryv1.Endpoint{
{
TargetRef: &v1.ObjectReference{
Name: "pod1",
},
Zone: new(string),
Conditions: discoveryv1.EndpointConditions{
Ready: new(bool),
},
Addresses: []string{"0.0.0.0"},
},
{
TargetRef: &v1.ObjectReference{
Name: "pod2",
},
Zone: new(string),
Conditions: discoveryv1.EndpointConditions{
Ready: truePointer(),
},
Addresses: []string{"0.0.0.0"},
},
{
TargetRef: &v1.ObjectReference{
Name: "pod3",
},
Zone: new(string),
Conditions: discoveryv1.EndpointConditions{
Ready: truePointer(),
},
Addresses: []string{"0.0.0.0"},
},
},
},
want: K8sDatastore{
Pods: populateMap(basePod3, basePod2),
Port: "8000",
},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
endpointSliceReconciler := &EndpointSliceReconciler{Datastore: &test.datastore, Zone: ""}
endpointSliceReconciler.updateDatastore(test.incomingSlice)

if mapsEqual(endpointSliceReconciler.Datastore.Pods, test.want.Pods) {
t.Errorf("Unexpected output pod mismatch. \n Got %v \n Want: %v \n", endpointSliceReconciler.Datastore.Pods, test.want.Pods)
}
})
}
}

func mapsEqual(map1, map2 *sync.Map) bool {
equal := true

map1.Range(func(k, v any) bool {
if _, ok := map2.Load(k); !ok {
equal = false
return false
}
return true
})
map2.Range(func(k, v any) bool {
if _, ok := map1.Load(k); !ok {
equal = false
return false
}
return true
})

return equal
}

func truePointer() *bool {
primitivePointersAreSilly := true
return &primitivePointersAreSilly
}
15 changes: 5 additions & 10 deletions pkg/ext-proc/backend/fake.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package backend

import "context"

type FakePodLister struct {
Err error
Pods PodSet
}
import (
"context"
"fmt"
)

type FakePodMetricsClient struct {
Err map[Pod]error
Expand All @@ -16,9 +14,6 @@ func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, pod Pod, existi
if err, ok := f.Err[pod]; ok {
return nil, err
}
fmt.Printf("pod: %+v\n existing: %+v \n new: %+v \n", pod, existing, f.Res[pod])
return f.Res[pod], nil
}

func (fpl *FakePodLister) List() (PodSet, error) {
return fpl.Pods, fpl.Err
}
Loading