Skip to content

feat: support metrics of events matched #172

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func main() {
kubecfg.Burst = cfg.KubeBurst

metrics.Init(*addr, *tlsConf)
metricsStore := metrics.NewMetricsStore(cfg.MetricsNamePrefix)
metricsStore := metrics.NewMetricsStore(cfg.MetricsNamePrefix, cfg.Route.GetMatchNames())

engine := exporter.NewEngine(&cfg, &exporter.ChannelBasedReceiverRegistry{MetricsStore: metricsStore})
onEvent := engine.OnEvent
Expand Down
7 changes: 6 additions & 1 deletion pkg/exporter/channel_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@ type ChannelBasedReceiverRegistry struct {
MetricsStore *metrics.Store
}

func (r *ChannelBasedReceiverRegistry) SendEvent(name string, event *kube.EnhancedEvent) {
func (r *ChannelBasedReceiverRegistry) SendEvent(name string, event *kube.EnhancedEvent, ruleName string) {
ch := r.ch[name]
if ch == nil {
log.Error().Str("name", name).Msg("There is no channel")
}
if counter, ok := r.MetricsStore.EventsMatched[ruleName]; ok {
counter.Inc()
} else {
r.MetricsStore.EventsMatched[metrics.DefaultRuleName].Inc()
}

go func() {
ch <- *event
Expand Down
2 changes: 1 addition & 1 deletion pkg/exporter/receivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

// ReceiverRegistry registers a receiver with the appropriate sink
type ReceiverRegistry interface {
SendEvent(string, *kube.EnhancedEvent)
SendEvent(string, *kube.EnhancedEvent, string)
Register(string, sinks.Sink)
Close()
}
15 changes: 14 additions & 1 deletion pkg/exporter/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (r *Route) ProcessEvent(ev *kube.EnhancedEvent, registry ReceiverRegistry)
for _, rule := range r.Match {
if rule.MatchesEvent(ev) {
if rule.Receiver != "" {
registry.SendEvent(rule.Receiver, ev)
registry.SendEvent(rule.Receiver, ev, rule.Name)
// Send the event down the hole
}
} else {
Expand All @@ -39,3 +39,16 @@ func (r *Route) ProcessEvent(ev *kube.EnhancedEvent, registry ReceiverRegistry)
}
}
}

// get names of match rules
func (r *Route) GetMatchNames() []string {
var names []string
for _, route := range r.Routes {
for _, rule := range route.Match {
if rule.Name != "" {
names = append(names, rule.Name)
}
}
}
return names
}
1 change: 1 addition & 0 deletions pkg/exporter/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func matchString(pattern, s string) bool {

// Rule is for matching an event
type Rule struct {
Name string `yaml:"name"`
Labels map[string]string
Annotations map[string]string
Message string
Expand Down
32 changes: 31 additions & 1 deletion pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/rs/zerolog/log"
)

const DefaultRuleName = "default"

type Store struct {
EventsProcessed prometheus.Counter
EventsDiscarded prometheus.Counter
Expand All @@ -22,6 +24,7 @@ type Store struct {
BuildInfo prometheus.GaugeFunc
KubeApiReadCacheHits prometheus.Counter
KubeApiReadRequests prometheus.Counter
EventsMatched map[string]prometheus.Counter
}

// promLogger implements promhttp.Logger
Expand Down Expand Up @@ -89,7 +92,30 @@ func Init(addr string, tlsConf string) {
go web.ListenAndServe(&metricsServer, &metricsFlags, promLogger)
}

func NewMetricsStore(name_prefix string) *Store {
func NewMetricsStore(name_prefix string, matchRouteNames []string) *Store {
eventsMatched := make(map[string]prometheus.Counter)
hasDefault := false
for i := range matchRouteNames {
if matchRouteNames[i] == DefaultRuleName {
hasDefault = true
}
eventsMatched[matchRouteNames[i]] = promauto.NewCounter(prometheus.CounterOpts{
Name: name_prefix + "events_matched",
Help: "The total number of events matched by the route",
ConstLabels: prometheus.Labels{
"route": matchRouteNames[i],
},
})
}
if !hasDefault {
eventsMatched[DefaultRuleName] = promauto.NewCounter(prometheus.CounterOpts{
Name: name_prefix + "events_matched",
Help: "The total number of events matched by the route",
ConstLabels: prometheus.Labels{
"route": DefaultRuleName,
},
})
}
return &Store{
BuildInfo: promauto.NewGaugeFunc(
prometheus.GaugeOpts{
Expand Down Expand Up @@ -129,6 +155,7 @@ func NewMetricsStore(name_prefix string) *Store {
Name: name_prefix + "kube_api_read_cache_misses",
Help: "The total number of read requests served from kube-apiserver when looking up object metadata",
}),
EventsMatched: eventsMatched,
}
}

Expand All @@ -140,5 +167,8 @@ func DestroyMetricsStore(store *Store) {
prometheus.Unregister(store.BuildInfo)
prometheus.Unregister(store.KubeApiReadCacheHits)
prometheus.Unregister(store.KubeApiReadRequests)
for _, counter := range store.EventsMatched {
prometheus.Unregister(counter)
}
store = nil
}