forked from kubernetes-sigs/gateway-api-inference-extension
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathendpointslice_reconciler.go
119 lines (103 loc) · 3.66 KB
/
endpointslice_reconciler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package backend
import (
"context"
"strconv"
"inference.networking.x-k8s.io/gateway-api-inference-extension/api/v1alpha1"
logutil "inference.networking.x-k8s.io/gateway-api-inference-extension/pkg/ext-proc/util/logging"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
klog "k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)
var (
serviceOwnerLabel = "kubernetes.io/service-name"
)
type EndpointSliceReconciler struct {
client.Client
Scheme *runtime.Scheme
Record record.EventRecorder
ServiceName string
Zone string
Datastore *K8sDatastore
}
func (c *EndpointSliceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
klog.V(logutil.DEFAULT).Info("Reconciling EndpointSlice ", req.NamespacedName)
endpointSlice := &discoveryv1.EndpointSlice{}
if err := c.Get(ctx, req.NamespacedName, endpointSlice); err != nil {
klog.Errorf("Unable to get EndpointSlice: %v", err)
return ctrl.Result{}, err
}
inferencePool, err := c.Datastore.getInferencePool()
if err != nil {
return ctrl.Result{}, err
}
c.updateDatastore(endpointSlice, inferencePool)
return ctrl.Result{}, nil
}
// TODO: Support multiple endpointslices for a single service
func (c *EndpointSliceReconciler) updateDatastore(
slice *discoveryv1.EndpointSlice,
inferencePool *v1alpha1.InferencePool) {
podMap := make(map[Pod]bool)
for _, endpoint := range slice.Endpoints {
klog.V(logutil.DEFAULT).Infof("Zone: %v \n endpoint: %+v \n", c.Zone, endpoint)
if c.validPod(endpoint) {
pod := Pod{
Name: endpoint.TargetRef.Name,
Address: endpoint.Addresses[0] + ":" + strconv.Itoa(int(inferencePool.Spec.TargetPortNumber)),
}
podMap[pod] = true
klog.V(logutil.DEFAULT).Infof("Storing pod %v", pod)
c.Datastore.pods.Store(pod, true)
}
}
removeOldPods := func(k, v any) bool {
pod, ok := k.(Pod)
if !ok {
klog.Errorf("Unable to cast key to Pod: %v", k)
return false
}
if _, ok := podMap[pod]; !ok {
klog.V(logutil.DEFAULT).Infof("Removing pod %v", pod)
c.Datastore.pods.Delete(pod)
}
return true
}
c.Datastore.pods.Range(removeOldPods)
}
func (c *EndpointSliceReconciler) SetupWithManager(mgr ctrl.Manager) error {
inferencePoolAvailable := func(object client.Object) bool {
_, err := c.Datastore.getInferencePool()
if err != nil {
klog.V(logutil.DEFAULT).Infof("Skipping reconciling EndpointSlice because the InferencePool is not available yet: %v", err)
}
return err == nil
}
ownsEndPointSlice := func(object client.Object) bool {
// Check if the object is an EndpointSlice
endpointSlice, ok := object.(*discoveryv1.EndpointSlice)
if !ok {
return false
}
gotLabel := endpointSlice.ObjectMeta.Labels[serviceOwnerLabel]
wantLabel := c.ServiceName
if gotLabel != wantLabel {
namesapcedName := endpointSlice.ObjectMeta.Namespace + "/" + endpointSlice.ObjectMeta.Name
klog.V(logutil.DEFAULT).Infof("Skipping EndpointSlice %v because its service owner label %v doesn't match the pool service name %v", namesapcedName, gotLabel, wantLabel)
}
return gotLabel == wantLabel
}
return ctrl.NewControllerManagedBy(mgr).
For(&discoveryv1.EndpointSlice{},
builder.WithPredicates(predicate.NewPredicateFuncs(inferencePoolAvailable),
predicate.NewPredicateFuncs(ownsEndPointSlice))).
Complete(c)
}
func (c *EndpointSliceReconciler) validPod(endpoint discoveryv1.Endpoint) bool {
validZone := c.Zone == "" || c.Zone != "" && *endpoint.Zone == c.Zone
return validZone && *endpoint.Conditions.Ready
}