diff --git a/.gitignore b/.gitignore index ada68ff0..d633a1ba 100644 --- a/.gitignore +++ b/.gitignore @@ -15,6 +15,7 @@ Dockerfile.cross # Go workspace file go.work +go.work.sum # Kubernetes Generated files - skip generated files, except for vendored files !vendor/**/zz_generated.* diff --git a/pkg/ext-proc/Dockerfile b/Dockerfile similarity index 77% rename from pkg/ext-proc/Dockerfile rename to Dockerfile index 3a35c1cc..ef32d0e9 100644 --- a/pkg/ext-proc/Dockerfile +++ b/Dockerfile @@ -1,11 +1,12 @@ ## Multistage build -FROM golang:1.22.5-alpine as build +FROM golang:1.23-alpine as build ENV CGO_ENABLED=0 ENV GOOS=linux ENV GOARCH=amd64 WORKDIR /src COPY . . +WORKDIR /src/pkg/ext-proc RUN go mod download RUN go build -o /ext-proc FROM alpine:latest @@ -16,4 +17,4 @@ FROM gcr.io/distroless/base-debian10 WORKDIR / COPY --from=build /ext-proc /ext-proc -ENTRYPOINT ["/ext-proc"] +ENTRYPOINT ["/ext-proc"] \ No newline at end of file diff --git a/examples/poc/ext-proc/go.mod b/examples/poc/ext-proc/go.mod index 31f844a7..36035781 100644 --- a/examples/poc/ext-proc/go.mod +++ b/examples/poc/ext-proc/go.mod @@ -1,4 +1,4 @@ -module ext-proc +module ext-proc-poc go 1.21 diff --git a/pkg/ext-proc/backend/datastore.go b/pkg/ext-proc/backend/datastore.go new file mode 100644 index 00000000..114e605d --- /dev/null +++ b/pkg/ext-proc/backend/datastore.go @@ -0,0 +1,24 @@ +package backend + +import ( + "sync" + + "inference.networking.x-k8s.io/llm-instance-gateway/api/v1alpha1" + corev1 "k8s.io/api/core/v1" +) + +// The datastore is a local cache of relevant data for the given LLMServerPool (currently all pulled from k8s-api) +type K8sDatastore struct { + LLMServerPool *v1alpha1.LLMServerPool + Pods *sync.Map + Port string +} + +func (ds *K8sDatastore) GetPodIPs() []string { + var ips []string + ds.Pods.Range(func(name, pod any) bool { + ips = append(ips, pod.(*corev1.Pod).Status.PodIP) + return true + }) + return ips +} diff --git a/pkg/ext-proc/backend/endpointslice_reconciler.go b/pkg/ext-proc/backend/endpointslice_reconciler.go new file mode 100644 index 00000000..cba53864 --- /dev/null +++ b/pkg/ext-proc/backend/endpointslice_reconciler.go @@ -0,0 +1,86 @@ +package backend + +import ( + "context" + + discoveryv1 "k8s.io/api/discovery/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var ( + serviceOwnerLabel = "kubernetes.io/service-name" +) + +type EndpointSliceReconciler struct { + client.Client + Scheme *runtime.Scheme + Record record.EventRecorder + ServerPoolName string + ServiceName string + Zone string + Namespace string + Datastore *K8sDatastore +} + +func (c *EndpointSliceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + klog.V(1).Info("reconciling EndpointSlice ", req.NamespacedName) + + endpointSlice := &discoveryv1.EndpointSlice{} + if err := c.Get(ctx, req.NamespacedName, endpointSlice); err != nil { + klog.Error(err, "unable to get LLMServerPool") + return ctrl.Result{}, err + } + + if !c.ownsEndPointSlice(endpointSlice.ObjectMeta.Labels) { + return ctrl.Result{}, nil + } + + c.updateDatastore(endpointSlice) + + return ctrl.Result{}, nil +} + +func (c *EndpointSliceReconciler) updateDatastore(slice *discoveryv1.EndpointSlice) { + podMap := make(map[Pod]bool) + for _, endpoint := range slice.Endpoints { + klog.V(4).Infof("Zone: %v \n endpoint: %+v \n", c.Zone, endpoint) + if c.validPod(endpoint) { + pod := Pod{Name: *&endpoint.TargetRef.Name, Address: endpoint.Addresses[0] + ":" + c.Datastore.Port} + podMap[pod] = true + c.Datastore.Pods.Store(pod, true) + } + } + + removeOldPods := func(k, v any) bool { + pod, ok := k.(Pod) + if !ok { + klog.Errorf("Unable to cast key to Pod: %v", k) + return false + } + if _, ok := podMap[pod]; !ok { + c.Datastore.Pods.Delete(pod) + } + return true + } + c.Datastore.Pods.Range(removeOldPods) +} + +func (c *EndpointSliceReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&discoveryv1.EndpointSlice{}). + Complete(c) +} + +func (c *EndpointSliceReconciler) ownsEndPointSlice(labels map[string]string) bool { + return labels[serviceOwnerLabel] == c.ServiceName +} + +func (c *EndpointSliceReconciler) validPod(endpoint discoveryv1.Endpoint) bool { + validZone := c.Zone == "" || c.Zone != "" && *endpoint.Zone == c.Zone + return validZone && *endpoint.Conditions.Ready == true + +} diff --git a/pkg/ext-proc/backend/endpointslice_reconcilier_test.go b/pkg/ext-proc/backend/endpointslice_reconcilier_test.go new file mode 100644 index 00000000..730d314f --- /dev/null +++ b/pkg/ext-proc/backend/endpointslice_reconcilier_test.go @@ -0,0 +1,196 @@ +package backend + +import ( + "sync" + "testing" + + v1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" +) + +var ( + basePod1 = Pod{Name: "pod1"} + basePod2 = Pod{Name: "pod2"} + basePod3 = Pod{Name: "pod3"} +) + +func TestUpdateDatastore_EndpointSliceReconciler(t *testing.T) { + tests := []struct { + name string + datastore K8sDatastore + incomingSlice *discoveryv1.EndpointSlice + want K8sDatastore + }{ + { + name: "Add new pod", + datastore: K8sDatastore{ + Pods: populateMap(basePod1, basePod2), + Port: "8000", + }, + incomingSlice: &discoveryv1.EndpointSlice{ + Endpoints: []discoveryv1.Endpoint{ + { + TargetRef: &v1.ObjectReference{ + Name: "pod1", + }, + Zone: new(string), + Conditions: discoveryv1.EndpointConditions{ + Ready: truePointer(), + }, + Addresses: []string{"0.0.0.0"}, + }, + { + TargetRef: &v1.ObjectReference{ + Name: "pod2", + }, + Zone: new(string), + Conditions: discoveryv1.EndpointConditions{ + Ready: truePointer(), + }, + Addresses: []string{"0.0.0.0"}, + }, + { + TargetRef: &v1.ObjectReference{ + Name: "pod3", + }, + Zone: new(string), + Conditions: discoveryv1.EndpointConditions{ + Ready: truePointer(), + }, + Addresses: []string{"0.0.0.0"}, + }, + }, + }, + want: K8sDatastore{ + Pods: populateMap(basePod1, basePod2, basePod3), + Port: "8000", + }, + }, + { + name: "New pod, but its not ready yet. Do not add.", + datastore: K8sDatastore{ + Pods: populateMap(basePod1, basePod2), + Port: "8000", + }, + incomingSlice: &discoveryv1.EndpointSlice{ + Endpoints: []discoveryv1.Endpoint{ + { + TargetRef: &v1.ObjectReference{ + Name: "pod1", + }, + Zone: new(string), + Conditions: discoveryv1.EndpointConditions{ + Ready: truePointer(), + }, + Addresses: []string{"0.0.0.0"}, + }, + { + TargetRef: &v1.ObjectReference{ + Name: "pod2", + }, + Zone: new(string), + Conditions: discoveryv1.EndpointConditions{ + Ready: truePointer(), + }, + Addresses: []string{"0.0.0.0"}, + }, + { + TargetRef: &v1.ObjectReference{ + Name: "pod3", + }, + Zone: new(string), + Conditions: discoveryv1.EndpointConditions{ + Ready: new(bool), + }, + Addresses: []string{"0.0.0.0"}, + }, + }, + }, + want: K8sDatastore{ + Pods: populateMap(basePod1, basePod2), + Port: "8000", + }, + }, + { + name: "Existing pod not ready, new pod added, and is ready", + datastore: K8sDatastore{ + Pods: populateMap(basePod1, basePod2), + Port: "8000", + }, + incomingSlice: &discoveryv1.EndpointSlice{ + Endpoints: []discoveryv1.Endpoint{ + { + TargetRef: &v1.ObjectReference{ + Name: "pod1", + }, + Zone: new(string), + Conditions: discoveryv1.EndpointConditions{ + Ready: new(bool), + }, + Addresses: []string{"0.0.0.0"}, + }, + { + TargetRef: &v1.ObjectReference{ + Name: "pod2", + }, + Zone: new(string), + Conditions: discoveryv1.EndpointConditions{ + Ready: truePointer(), + }, + Addresses: []string{"0.0.0.0"}, + }, + { + TargetRef: &v1.ObjectReference{ + Name: "pod3", + }, + Zone: new(string), + Conditions: discoveryv1.EndpointConditions{ + Ready: truePointer(), + }, + Addresses: []string{"0.0.0.0"}, + }, + }, + }, + want: K8sDatastore{ + Pods: populateMap(basePod3, basePod2), + Port: "8000", + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + endpointSliceReconciler := &EndpointSliceReconciler{Datastore: &test.datastore, Zone: ""} + endpointSliceReconciler.updateDatastore(test.incomingSlice) + + if mapsEqual(endpointSliceReconciler.Datastore.Pods, test.want.Pods) { + t.Errorf("Unexpected output pod mismatch. \n Got %v \n Want: %v \n", endpointSliceReconciler.Datastore.Pods, test.want.Pods) + } + }) + } +} + +func mapsEqual(map1, map2 *sync.Map) bool { + equal := true + + map1.Range(func(k, v any) bool { + if _, ok := map2.Load(k); !ok { + equal = false + return false + } + return true + }) + map2.Range(func(k, v any) bool { + if _, ok := map1.Load(k); !ok { + equal = false + return false + } + return true + }) + + return equal +} + +func truePointer() *bool { + primitivePointersAreSilly := true + return &primitivePointersAreSilly +} diff --git a/pkg/ext-proc/backend/fake.go b/pkg/ext-proc/backend/fake.go index 10fb3672..614a0b91 100644 --- a/pkg/ext-proc/backend/fake.go +++ b/pkg/ext-proc/backend/fake.go @@ -1,11 +1,9 @@ package backend -import "context" - -type FakePodLister struct { - Err error - Pods PodSet -} +import ( + "context" + "fmt" +) type FakePodMetricsClient struct { Err map[Pod]error @@ -16,9 +14,6 @@ func (f *FakePodMetricsClient) FetchMetrics(ctx context.Context, pod Pod, existi if err, ok := f.Err[pod]; ok { return nil, err } + fmt.Printf("pod: %+v\n existing: %+v \n new: %+v \n", pod, existing, f.Res[pod]) return f.Res[pod], nil } - -func (fpl *FakePodLister) List() (PodSet, error) { - return fpl.Pods, fpl.Err -} diff --git a/pkg/ext-proc/backend/llmlserverpool_reconciler_test.go b/pkg/ext-proc/backend/llmlserverpool_reconciler_test.go new file mode 100644 index 00000000..ec4716cb --- /dev/null +++ b/pkg/ext-proc/backend/llmlserverpool_reconciler_test.go @@ -0,0 +1,109 @@ +package backend + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "inference.networking.x-k8s.io/llm-instance-gateway/api/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestUpdateDatastore_LLMServerPoolReconciler(t *testing.T) { + tests := []struct { + name string + datastore K8sDatastore + incomingServerPool *v1alpha1.LLMServerPool + want K8sDatastore + }{ + { + name: "Update to new, fresh LLMServerPool", + datastore: K8sDatastore{ + LLMServerPool: &v1alpha1.LLMServerPool{ + Spec: v1alpha1.LLMServerPoolSpec{ + ModelServerSelector: metav1.LabelSelector{ + MatchLabels: map[string]string{"app": "vllm"}, + }, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pool", + ResourceVersion: "Old and boring", + }, + }, + }, + incomingServerPool: &v1alpha1.LLMServerPool{ + Spec: v1alpha1.LLMServerPoolSpec{ + ModelServerSelector: metav1.LabelSelector{ + MatchLabels: map[string]string{"app": "not-vllm"}, + }, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pool", + ResourceVersion: "New and fun", + }, + }, + want: K8sDatastore{ + LLMServerPool: &v1alpha1.LLMServerPool{ + Spec: v1alpha1.LLMServerPoolSpec{ + ModelServerSelector: metav1.LabelSelector{ + MatchLabels: map[string]string{"app": "not-vllm"}, + }, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pool", + ResourceVersion: "New and fun", + }, + }, + }, + }, + { + name: "Do not update, resource version the same", + datastore: K8sDatastore{ + LLMServerPool: &v1alpha1.LLMServerPool{ + Spec: v1alpha1.LLMServerPoolSpec{ + ModelServerSelector: metav1.LabelSelector{ + MatchLabels: map[string]string{"app": "vllm"}, + }, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pool", + ResourceVersion: "Old and boring", + }, + }, + }, + incomingServerPool: &v1alpha1.LLMServerPool{ + Spec: v1alpha1.LLMServerPoolSpec{ + ModelServerSelector: metav1.LabelSelector{ + MatchLabels: map[string]string{"technically": "this-should-never-happen"}, + }, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pool", + ResourceVersion: "Old and boring", + }, + }, + want: K8sDatastore{ + LLMServerPool: &v1alpha1.LLMServerPool{ + Spec: v1alpha1.LLMServerPoolSpec{ + ModelServerSelector: metav1.LabelSelector{ + MatchLabels: map[string]string{"app": "vllm"}, + }, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pool", + ResourceVersion: "Old and boring", + }, + }, + }, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + llmServerPoolReconciler := &LLMServerPoolReconciler{Datastore: &test.datastore} + llmServerPoolReconciler.updateDatastore(test.incomingServerPool) + + if diff := cmp.Diff(test.want.LLMServerPool, llmServerPoolReconciler.Datastore.LLMServerPool); diff != "" { + t.Errorf("Unexpected output (-want +got): %v", diff) + } + }) + } +} diff --git a/pkg/ext-proc/backend/llmserverpool_reconciler.go b/pkg/ext-proc/backend/llmserverpool_reconciler.go new file mode 100644 index 00000000..37b4ed33 --- /dev/null +++ b/pkg/ext-proc/backend/llmserverpool_reconciler.go @@ -0,0 +1,60 @@ +package backend + +import ( + "context" + + "sigs.k8s.io/controller-runtime/pkg/client" + + "inference.networking.x-k8s.io/llm-instance-gateway/api/v1alpha1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" +) + +const ( + reconcilerNamePrefix = "instance-gateway-" +) + +// LLMServerPoolReconciler utilizes the controller runtime to reconcile Instance Gateway resources +// This implementation is just used for reading & maintaining data sync. The Gateway implementation +// will have the proper controller that will create/manage objects on behalf of the server pool. +type LLMServerPoolReconciler struct { + client.Client + Scheme *runtime.Scheme + Record record.EventRecorder + ServerPoolName string + Namespace string + Datastore *K8sDatastore + Port int + Zone string +} + +func (c *LLMServerPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + if req.NamespacedName.Name != c.ServerPoolName && req.NamespacedName.Namespace != c.Namespace { + return ctrl.Result{}, nil + } + klog.V(1).Info("reconciling LLMServerPool", req.NamespacedName) + + serverPool := &v1alpha1.LLMServerPool{} + if err := c.Get(ctx, req.NamespacedName, serverPool); err != nil { + klog.Error(err, "unable to get LLMServerPool") + return ctrl.Result{}, err + } + + c.updateDatastore(serverPool) + + return ctrl.Result{}, nil +} + +func (c *LLMServerPoolReconciler) updateDatastore(serverPool *v1alpha1.LLMServerPool) { + if c.Datastore.LLMServerPool == nil || serverPool.ObjectMeta.ResourceVersion != c.Datastore.LLMServerPool.ObjectMeta.ResourceVersion { + c.Datastore.LLMServerPool = serverPool + } +} + +func (c *LLMServerPoolReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&v1alpha1.LLMServerPool{}). + Complete(c) +} diff --git a/pkg/ext-proc/backend/provider.go b/pkg/ext-proc/backend/provider.go index 7725cb0f..0c12f47b 100644 --- a/pkg/ext-proc/backend/provider.go +++ b/pkg/ext-proc/backend/provider.go @@ -14,11 +14,11 @@ const ( fetchMetricsTimeout = 5 * time.Second ) -func NewProvider(pmc PodMetricsClient, pl PodLister) *Provider { +func NewProvider(pmc PodMetricsClient, datastore *K8sDatastore) *Provider { p := &Provider{ podMetrics: sync.Map{}, pmc: pmc, - pl: pl, + datastore: datastore, } return p } @@ -28,17 +28,13 @@ type Provider struct { // key: Pod, value: *PodMetrics podMetrics sync.Map pmc PodMetricsClient - pl PodLister + datastore *K8sDatastore } type PodMetricsClient interface { FetchMetrics(ctx context.Context, pod Pod, existing *PodMetrics) (*PodMetrics, error) } -type PodLister interface { - List() (PodSet, error) -} - func (p *Provider) AllPodMetrics() []*PodMetrics { res := []*PodMetrics{} fn := func(k, v any) bool { @@ -108,13 +104,10 @@ func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duratio // refreshPodsOnce lists pods and updates keys in the podMetrics map. // Note this function doesn't update the PodMetrics value, it's done separately. func (p *Provider) refreshPodsOnce() error { - pods, err := p.pl.List() - if err != nil { - return err - } // merge new pods with cached ones. // add new pod to the map - for pod := range pods { + addNewPods := func(k, v any) bool { + pod := k.(Pod) if _, ok := p.podMetrics.Load(pod); !ok { new := &PodMetrics{ Pod: pod, @@ -124,16 +117,18 @@ func (p *Provider) refreshPodsOnce() error { } p.podMetrics.Store(pod, new) } + return true } // remove pods that don't exist any more. mergeFn := func(k, v any) bool { pod := k.(Pod) - if _, ok := pods[pod]; !ok { + if _, ok := p.datastore.Pods.Load(pod); !ok { p.podMetrics.Delete(pod) } return true } p.podMetrics.Range(mergeFn) + p.datastore.Pods.Range(addNewPods) return nil } diff --git a/pkg/ext-proc/backend/provider_test.go b/pkg/ext-proc/backend/provider_test.go index 2698b0a2..0ae977bd 100644 --- a/pkg/ext-proc/backend/provider_test.go +++ b/pkg/ext-proc/backend/provider_test.go @@ -2,6 +2,7 @@ package backend import ( "errors" + "sync" "testing" "time" @@ -38,19 +39,16 @@ var ( func TestProvider(t *testing.T) { tests := []struct { - name string - pmc PodMetricsClient - pl PodLister - initErr bool - want []*PodMetrics + name string + pmc PodMetricsClient + datastore *K8sDatastore + initErr bool + want []*PodMetrics }{ { name: "Init success", - pl: &FakePodLister{ - Pods: map[Pod]bool{ - pod1.Pod: true, - pod2.Pod: true, - }, + datastore: &K8sDatastore{ + Pods: populateMap(pod1.Pod, pod2.Pod), }, pmc: &FakePodMetricsClient{ Res: map[Pod]*PodMetrics{ @@ -62,12 +60,6 @@ func TestProvider(t *testing.T) { }, { name: "Fetch metrics error", - pl: &FakePodLister{ - Pods: map[Pod]bool{ - pod1.Pod: true, - pod2.Pod: true, - }, - }, pmc: &FakePodMetricsClient{ Err: map[Pod]error{ pod2.Pod: errors.New("injected error"), @@ -76,6 +68,9 @@ func TestProvider(t *testing.T) { pod1.Pod: pod1, }, }, + datastore: &K8sDatastore{ + Pods: populateMap(pod1.Pod, pod2.Pod), + }, initErr: true, want: []*PodMetrics{ pod1, @@ -94,7 +89,7 @@ func TestProvider(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - p := NewProvider(test.pmc, test.pl) + p := NewProvider(test.pmc, test.datastore) err := p.Init(time.Millisecond, time.Millisecond) if test.initErr != (err != nil) { t.Fatalf("Unexpected error, got: %v, want: %v", err, test.initErr) @@ -109,3 +104,11 @@ func TestProvider(t *testing.T) { }) } } + +func populateMap(pods ...Pod) *sync.Map { + newMap := &sync.Map{} + for _, pod := range pods { + newMap.Store(pod, true) + } + return newMap +} diff --git a/pkg/ext-proc/backend/types.go b/pkg/ext-proc/backend/types.go index c1e1113a..7e399fed 100644 --- a/pkg/ext-proc/backend/types.go +++ b/pkg/ext-proc/backend/types.go @@ -6,13 +6,12 @@ import "fmt" type PodSet map[Pod]bool type Pod struct { - Namespace string - Name string - Address string + Name string + Address string } func (p Pod) String() string { - return p.Namespace + "/" + p.Name + return p.Name + ":" + p.Address } type Metrics struct { diff --git a/pkg/ext-proc/go.mod b/pkg/ext-proc/go.mod index cbb00816..ad224b3c 100644 --- a/pkg/ext-proc/go.mod +++ b/pkg/ext-proc/go.mod @@ -1,6 +1,6 @@ module ext-proc -go 1.21 +go 1.22.0 require ( github.com/bojand/ghz v0.120.0 @@ -9,12 +9,14 @@ require ( github.com/jhump/protoreflect v1.15.1 github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.55.0 - go.uber.org/multierr v1.9.0 + go.uber.org/multierr v1.11.0 google.golang.org/grpc v1.67.0 google.golang.org/protobuf v1.34.2 k8s.io/klog/v2 v2.130.1 ) +require sigs.k8s.io/controller-runtime v0.19.0 // indirect + require ( cel.dev/expr v0.16.0 // indirect cloud.google.com/go/compute/metadata v0.5.0 // indirect @@ -27,31 +29,59 @@ require ( github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cncf/xds/go v0.0.0-20240723142845-024c85f92f20 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dustin/go-humanize v1.0.1 // indirect + github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/envoyproxy/protoc-gen-validate v1.1.0 // indirect - github.com/go-logr/logr v1.4.1 // indirect + github.com/fxamacker/cbor/v2 v2.7.0 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-openapi/jsonpointer v0.19.6 // indirect + github.com/go-openapi/jsonreference v0.20.2 // indirect + github.com/go-openapi/swag v0.22.4 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/google/gnostic-models v0.6.8 // indirect + github.com/google/go-cmp v0.6.0 // indirect + github.com/google/gofuzz v1.2.0 // indirect github.com/google/uuid v1.6.0 // indirect github.com/huandu/xstrings v1.3.3 // indirect github.com/imdario/mergo v0.3.11 // indirect github.com/jinzhu/configor v1.2.1 // indirect - github.com/kr/text v0.2.0 // indirect + github.com/josharian/intern v1.0.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/mailru/easyjson v0.7.7 // indirect github.com/mitchellh/copystructure v1.0.0 // indirect github.com/mitchellh/reflectwalk v1.0.1 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect github.com/shopspring/decimal v1.2.0 // indirect github.com/spf13/cast v1.4.1 // indirect - go.uber.org/atomic v1.7.0 // indirect + github.com/x448/float16 v0.8.4 // indirect golang.org/x/crypto v0.26.0 // indirect golang.org/x/net v0.28.0 // indirect golang.org/x/oauth2 v0.22.0 // indirect golang.org/x/sync v0.8.0 // indirect golang.org/x/sys v0.24.0 // indirect + golang.org/x/term v0.23.0 // indirect golang.org/x/text v0.17.0 // indirect + golang.org/x/time v0.3.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect + inference.networking.x-k8s.io/llm-instance-gateway v0.0.0 + k8s.io/api v0.31.1 // indirect + k8s.io/apimachinery v0.31.1 + k8s.io/client-go v0.31.1 + k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 // indirect + k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 // indirect + sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect + sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect + sigs.k8s.io/yaml v1.4.0 // indirect ) + +replace inference.networking.x-k8s.io/llm-instance-gateway v0.0.0 => /src \ No newline at end of file diff --git a/pkg/ext-proc/main.go b/pkg/ext-proc/main.go index f899c3b7..ce8dc425 100644 --- a/pkg/ext-proc/main.go +++ b/pkg/ext-proc/main.go @@ -7,7 +7,7 @@ import ( "net" "os" "os/signal" - "strings" + "sync" "syscall" "time" @@ -16,7 +16,12 @@ import ( "google.golang.org/grpc/codes" healthPb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/status" + "inference.networking.x-k8s.io/llm-instance-gateway/api/v1alpha1" + "k8s.io/apimachinery/pkg/runtime" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + clientgoscheme "k8s.io/client-go/kubernetes/scheme" klog "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" "ext-proc/backend" "ext-proc/backend/vllm" @@ -25,12 +30,16 @@ import ( ) var ( - port = flag.Int("port", 9002, "gRPC port") - targetPodHeader = flag.String("targetPodHeader", "target-pod", "the header key for the target pod address to instruct Envoy to send the request to. This must match Envoy configuration.") - podIPsFlag = flag.String("podIPs", "", "Comma-separated list of pod IPs") - + port = flag.Int("port", 9002, "gRPC port") + targetPodHeader = flag.String("targetPodHeader", "target-pod", "the header key for the target pod address to instruct Envoy to send the request to. This must match Envoy configuration.") + serverPoolName = flag.String("serverPoolName", "", "Name of the serverPool this ext-proc is associated with.") + serviceName = flag.String("serviceName", "", "Name of the service that will be used to read the endpointslices from") + namespace = flag.String("namespace", "default", "The Namespace that the server pool should exist in.") + zone = flag.String("zone", "", "The zone that this instance is created in. Will be passed to the corresponding endpointSlice. ") + desiredPort = flag.String("desiredPort", "8000", "The port that the model server exposes") refreshPodsInterval = flag.Duration("refreshPodsInterval", 10*time.Second, "interval to refresh pods") refreshMetricsInterval = flag.Duration("refreshMetricsInterval", 50*time.Millisecond, "interval to refresh metrics") + scheme = runtime.NewScheme() ) type healthServer struct{} @@ -44,26 +53,17 @@ func (s *healthServer) Watch(in *healthPb.HealthCheckRequest, srv healthPb.Healt return status.Error(codes.Unimplemented, "Watch is not implemented") } +func init() { + utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(v1alpha1.AddToScheme(scheme)) +} + func main() { + klog.InitFlags(nil) flag.Parse() - // This is the list of addresses of backend pods. - // TODO (https://github.com/kubernetes-sigs/llm-instance-gateway/issues/12): Remove this once dynamic pod listing is implemented. - if *podIPsFlag == "" { - klog.Fatal("No pods or pod IPs provided. Use the -pods and -podIPs flags to specify comma-separated lists of pod addresses and pod IPs.") - } - podIPs := strings.Split(*podIPsFlag, ",") - klog.Infof("Pods: %v", podIPs) - pods := make(backend.PodSet) - for _, ip := range podIPs { - pod := backend.Pod{ - Namespace: "default", - Name: ip, - Address: ip, - } - pods[pod] = true - } + ctrl.SetLogger(klog.TODO()) klog.Infof("Listening on %q", fmt.Sprintf(":%d", *port)) lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port)) @@ -71,9 +71,50 @@ func main() { klog.Fatalf("failed to listen: %v", err) } + datastore := &backend.K8sDatastore{LLMServerPool: &v1alpha1.LLMServerPool{}, Pods: &sync.Map{}, Port: *desiredPort} + + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ + Scheme: scheme, + }) + if err != nil { + klog.Error(err, "unable to start manager") + os.Exit(1) + } + + if err := (&backend.LLMServerPoolReconciler{ + Datastore: datastore, + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + ServerPoolName: *serverPoolName, + Namespace: *namespace, + Record: mgr.GetEventRecorderFor("llmserverpool"), + }).SetupWithManager(mgr); err != nil { + klog.Error(err, "Error setting up LLMServerPoolReconciler") + } + + if err := (&backend.EndpointSliceReconciler{ + Datastore: datastore, + Scheme: mgr.GetScheme(), + Client: mgr.GetClient(), + Record: mgr.GetEventRecorderFor("endpointslice"), + ServiceName: *serviceName, + Zone: *zone, + ServerPoolName: *serverPoolName, + }).SetupWithManager(mgr); err != nil { + klog.Error(err, "Error setting up EndpointSliceReconciler") + } + + errChan := make(chan error) + go func() { + if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { + klog.Error(err, "Error running manager") + errChan <- err + } + }() + s := grpc.NewServer() - pp := backend.NewProvider(&vllm.PodMetricsClientImpl{}, &backend.FakePodLister{Pods: pods}) + pp := backend.NewProvider(&vllm.PodMetricsClientImpl{}, datastore) if err := pp.Init(*refreshPodsInterval, *refreshMetricsInterval); err != nil { klog.Fatalf("failed to initialize: %v", err) } @@ -87,9 +128,15 @@ func main() { signal.Notify(gracefulStop, syscall.SIGTERM) signal.Notify(gracefulStop, syscall.SIGINT) go func() { - sig := <-gracefulStop - klog.Infof("caught sig: %+v", sig) - os.Exit(0) + select { + case sig := <-gracefulStop: + klog.Infof("caught sig: %+v", sig) + os.Exit(0) + case err := <-errChan: + klog.Infof("caught error in controller: %+v", err) + os.Exit(0) + } + }() s.Serve(lis)