diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/client-go/tools/record/events_cache.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/client-go/tools/record/events_cache.go index d5246df5b6b7..9f9be862ca7c 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/client-go/tools/record/events_cache.go +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/client-go/tools/record/events_cache.go @@ -93,7 +93,7 @@ type EventAggregatorMessageFunc func(event *v1.Event) string // EventAggregratorByReasonMessageFunc returns an aggregate message by prefixing the incoming message func EventAggregatorByReasonMessageFunc(event *v1.Event) string { - return "(events with common reason combined)" + return "(combined from similar events): " + event.Message } // EventAggregator identifies similar events and aggregates them into a single event @@ -141,11 +141,22 @@ type aggregateRecord struct { lastTimestamp metav1.Time } -// EventAggregate identifies similar events and groups into a common event if required -func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, error) { - aggregateKey, localKey := e.keyFunc(newEvent) +// EventAggregate checks if a similar event has been seen according to the +// aggregation configuration (max events, max interval, etc) and returns: +// +// - The (potentially modified) event that should be created +// - The cache key for the event, for correlation purposes. This will be set to +// the full key for normal events, and to the result of +// EventAggregatorMessageFunc for aggregate events. +func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, string) { now := metav1.NewTime(e.clock.Now()) - record := aggregateRecord{localKeys: sets.NewString(), lastTimestamp: now} + var record aggregateRecord + // eventKey is the full cache key for this event + eventKey := getEventKey(newEvent) + // aggregateKey is for the aggregate event, if one is needed. + aggregateKey, localKey := e.keyFunc(newEvent) + + // Do we have a record of similar events in our cache? e.Lock() defer e.Unlock() value, found := e.cache.Get(aggregateKey) @@ -153,24 +164,30 @@ func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, error) record = value.(aggregateRecord) } - // if the last event was far enough in the past, it is not aggregated, and we must reset state + // Is the previous record too old? If so, make a fresh one. Note: if we didn't + // find a similar record, its lastTimestamp will be the zero value, so we + // create a new one in that case. maxInterval := time.Duration(e.maxIntervalInSeconds) * time.Second interval := now.Time.Sub(record.lastTimestamp.Time) if interval > maxInterval { record = aggregateRecord{localKeys: sets.NewString()} } + + // Write the new event into the aggregation record and put it on the cache record.localKeys.Insert(localKey) record.lastTimestamp = now e.cache.Add(aggregateKey, record) + // If we are not yet over the threshold for unique events, don't correlate them if record.localKeys.Len() < e.maxEvents { - return newEvent, nil + return newEvent, eventKey } // do not grow our local key set any larger than max record.localKeys.PopAny() - // create a new aggregate event + // create a new aggregate event, and return the aggregateKey as the cache key + // (so that it can be overwritten.) eventCopy := &v1.Event{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%v.%x", newEvent.InvolvedObject.Name, now.UnixNano()), @@ -185,7 +202,7 @@ func (e *EventAggregator) EventAggregate(newEvent *v1.Event) (*v1.Event, error) Reason: newEvent.Reason, Source: newEvent.Source, } - return eventCopy, nil + return eventCopy, aggregateKey } // eventLog records data about when an event was observed @@ -215,22 +232,22 @@ func newEventLogger(lruCacheEntries int, clock clock.Clock) *eventLogger { return &eventLogger{cache: lru.New(lruCacheEntries), clock: clock} } -// eventObserve records the event, and determines if its frequency should update -func (e *eventLogger) eventObserve(newEvent *v1.Event) (*v1.Event, []byte, error) { +// eventObserve records an event, or updates an existing one if key is a cache hit +func (e *eventLogger) eventObserve(newEvent *v1.Event, key string) (*v1.Event, []byte, error) { var ( patch []byte err error ) - key := getEventKey(newEvent) eventCopy := *newEvent event := &eventCopy e.Lock() defer e.Unlock() + // Check if there is an existing event we should update lastObservation := e.lastEventObservationFromCache(key) - // we have seen this event before, so we must prepare a patch + // If we found a result, prepare a patch if lastObservation.count > 0 { // update the event based on the last observation so patch will work as desired event.Name = lastObservation.name @@ -241,6 +258,7 @@ func (e *eventLogger) eventObserve(newEvent *v1.Event) (*v1.Event, []byte, error eventCopy2 := *event eventCopy2.Count = 0 eventCopy2.LastTimestamp = metav1.NewTime(time.Unix(0, 0)) + eventCopy2.Message = "" newData, _ := json.Marshal(event) oldData, _ := json.Marshal(eventCopy2) @@ -337,6 +355,7 @@ func NewEventCorrelator(clock clock.Clock) *EventCorrelator { defaultAggregateMaxEvents, defaultAggregateIntervalInSeconds, clock), + logger: newEventLogger(cacheSize, clock), } } @@ -346,11 +365,8 @@ func (c *EventCorrelator) EventCorrelate(newEvent *v1.Event) (*EventCorrelateRes if c.filterFunc(newEvent) { return &EventCorrelateResult{Skip: true}, nil } - aggregateEvent, err := c.aggregator.EventAggregate(newEvent) - if err != nil { - return &EventCorrelateResult{}, err - } - observedEvent, patch, err := c.logger.eventObserve(aggregateEvent) + aggregateEvent, ckey := c.aggregator.EventAggregate(newEvent) + observedEvent, patch, err := c.logger.eventObserve(aggregateEvent, ckey) return &EventCorrelateResult{Event: observedEvent, Patch: patch}, err } diff --git a/vendor/k8s.io/kubernetes/staging/src/k8s.io/client-go/tools/record/events_cache_test.go b/vendor/k8s.io/kubernetes/staging/src/k8s.io/client-go/tools/record/events_cache_test.go index b6b52881cd93..d45dd65cf6d4 100644 --- a/vendor/k8s.io/kubernetes/staging/src/k8s.io/client-go/tools/record/events_cache_test.go +++ b/vendor/k8s.io/kubernetes/staging/src/k8s.io/client-go/tools/record/events_cache_test.go @@ -157,10 +157,11 @@ func TestEventAggregatorByReasonFunc(t *testing.T) { // TestEventAggregatorByReasonMessageFunc validates the proper output for an aggregate message func TestEventAggregatorByReasonMessageFunc(t *testing.T) { - expected := "(events with common reason combined)" + expectedPrefix := "(combined from similar events): " event1 := makeEvent("end-of-world", "it was fun", makeObjectReference("Pod", "pod1", "other")) - if actual := EventAggregatorByReasonMessageFunc(&event1); expected != actual { - t.Errorf("Expected %v got %v", expected, actual) + actual := EventAggregatorByReasonMessageFunc(&event1) + if !strings.HasPrefix(actual, expectedPrefix) { + t.Errorf("Expected %v to begin with prefix %v", actual, expectedPrefix) } }