forked from kubernetes-sigs/gateway-api-inference-extension
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprovider.go
122 lines (107 loc) · 2.6 KB
/
provider.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package backend
import (
"fmt"
"sync"
"time"
dto "github.com/prometheus/client_model/go"
klog "k8s.io/klog/v2"
)
func NewProvider(pmc PodMetricsClient, pl PodLister) *Provider {
p := &Provider{
podMetrics: sync.Map{},
pmc: pmc,
pl: pl,
}
return p
}
// Provider provides backend pods and information such as metrics.
type Provider struct {
// key: Pod, value: *PodMetrics
podMetrics sync.Map
pmc PodMetricsClient
pl PodLister
}
type PodMetricsClient interface {
FetchMetrics(pod Pod) (map[string]*dto.MetricFamily, error)
}
type PodLister interface {
List() (PodSet, error)
}
func (p *Provider) AllPodMetrics() []*PodMetrics {
res := []*PodMetrics{}
fn := func(k, v any) bool {
res = append(res, v.(*PodMetrics))
return true
}
p.podMetrics.Range(fn)
return res
}
func (p *Provider) UpdatePodMetrics(pod Pod, pm *PodMetrics) {
p.podMetrics.Store(pod, pm)
}
func (p *Provider) GetPodMetrics(pod Pod) (*PodMetrics, bool) {
val, ok := p.podMetrics.Load(pod)
if ok {
return val.(*PodMetrics), true
}
return nil, false
}
func (p *Provider) Init(refreshPodsInterval, refreshMetricsInterval time.Duration) error {
if err := p.refreshPodsOnce(); err != nil {
return fmt.Errorf("failed to init pods: %v", err)
}
if err := p.refreshMetricsOnce(); err != nil {
return fmt.Errorf("failed to init metrics: %v", err)
}
klog.V(2).Infof("Initialized pods and metrics: %+v", p.AllPodMetrics())
// periodically refresh pods
go func() {
for {
time.Sleep(refreshPodsInterval)
if err := p.refreshPodsOnce(); err != nil {
klog.V(1).Infof("Failed to refresh podslist pods: %v", err)
}
}
}()
// periodically refresh metrics
go func() {
for {
time.Sleep(refreshMetricsInterval)
if err := p.refreshMetricsOnce(); err != nil {
klog.V(1).Infof("Failed to refresh metrics: %v", err)
}
}
}()
return nil
}
// refreshPodsOnce lists pods and updates keys in the podMetrics map.
// Note this function doesn't update the PodMetrics value, it's done separately.
func (p *Provider) refreshPodsOnce() error {
pods, err := p.pl.List()
if err != nil {
return err
}
// merge new pods with cached ones.
// add new pod to the map
for pod := range pods {
if _, ok := p.podMetrics.Load(pod); !ok {
new := &PodMetrics{
Pod: pod,
Metrics: Metrics{
CachedModels: make(map[string]int),
},
}
p.podMetrics.Store(pod, new)
}
}
// remove pods that don't exist any more.
mergeFn := func(k, v any) bool {
pod := k.(Pod)
if _, ok := pods[pod]; !ok {
p.podMetrics.Delete(pod)
}
return true
}
p.podMetrics.Range(mergeFn)
return nil
}