diff --git a/metrics/resource_view.go b/metrics/resource_view.go index 7135af70aa..236fd588ba 100644 --- a/metrics/resource_view.go +++ b/metrics/resource_view.go @@ -82,10 +82,16 @@ func cleanup() { expiryCutoff := allMeters.clock.Now().Add(-1 * maxMeterExporterAge) allMeters.lock.Lock() defer allMeters.lock.Unlock() + resourceViews.lock.Lock() + defer resourceViews.lock.Unlock() for key, meter := range allMeters.meters { if key != "" && meter.t.Before(expiryCutoff) { flushGivenExporter(meter.e) + // Make a copy of views to avoid data races + viewsCopy := copyViews(resourceViews.views) + meter.m.Unregister(viewsCopy...) delete(allMeters.meters, key) + meter.m.Stop() } } } @@ -139,7 +145,7 @@ func RegisterResourceView(views ...*view.View) error { return nil } -// UnregisterResourceView is similar to view.Unregiste(), except that it will +// UnregisterResourceView is similar to view.Unregister(), except that it will // unregister the view across all Resources tracked byt he system, rather than // simply the default view. func UnregisterResourceView(views ...*view.View) { diff --git a/metrics/resource_view_test.go b/metrics/resource_view_test.go index 8814447aa8..37766239ad 100644 --- a/metrics/resource_view_test.go +++ b/metrics/resource_view_test.go @@ -25,6 +25,7 @@ import ( "go.opencensus.io/stats" "go.opencensus.io/stats/view" "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/wait" ) var ( @@ -74,6 +75,8 @@ type testExporter struct { id string } +func (testExporter) ExportView(viewData *view.Data) {} + func TestSetFactory(t *testing.T) { var oldFactory ResourceExporterFactory func() { @@ -170,7 +173,13 @@ func TestAllMetersExpiration(t *testing.T) { // Expire the second entry fakeClock.Step(9 * time.Minute) // t+12m - time.Sleep(time.Second) // Wait a second on the wallclock, so that the cleanup thread has time to finish a loop + _ = wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (bool, error) { + // Non-expiring defaultMeter should be available along with the non-expired entry + allMeters.lock.Lock() + defer allMeters.lock.Unlock() + return len(allMeters.meters) == 2, nil + }) + allMeters.lock.Lock() if len(allMeters.meters) != 2 { t.Errorf("len(allMeters)=%d, want: 2", len(allMeters.meters)) @@ -189,6 +198,68 @@ func TestAllMetersExpiration(t *testing.T) { // (123=9m, 456=evicted, 789=0m) } +func TestIfAllMeterResourcesAreRemoved(t *testing.T) { + allMeters.clock = clock.Clock(clock.NewFakeClock(time.Now())) + var fakeClock *clock.FakeClock = allMeters.clock.(*clock.FakeClock) + ClearMetersForTest() // t+0m + // Register many resources at once + for i := 1; i <= 100; i++ { + res := resource.Resource{Labels: map[string]string{"foo": "bar"}} + res.Labels["id"] = fmt.Sprint(i) + if _, err := optionForResource(&res); err != nil { + t.Error("Should succeed getting option, instead got error ", err) + } + m := stats.Int64("testView_sum", "", stats.UnitDimensionless) + v := view.View{Name: fmt.Sprintf("testview-%d", i), Measure: m, Aggregation: view.Sum()} + + err := RegisterResourceView(&v) + if err != nil { + t.Fatal("RegisterResourceView =", err) + } + } + + func() { + allMeters.lock.Lock() + resourceViews.lock.Lock() + defer allMeters.lock.Unlock() + defer resourceViews.lock.Unlock() + // Make a copy to test against as allMeters should be cleaned up + copyAllMeters := make(map[string]*meterExporter, len(allMeters.meters)) + for k, v := range allMeters.meters { + copyAllMeters[k] = v + } + + for _, meter := range copyAllMeters { + for _, mView := range resourceViews.views { + want, got := mView, meter.m.Find(mView.Name) + if got == nil { + t.Errorf("View %s is not available", want.Name) + } else if want.Name != got.Name { + t.Errorf("Want %v, got %v", want.Name, got.Name) + } + } + } + }() + + // Expire all meters and views + // We need to unlock before we move the clock ahead in time + fakeClock.Step(12 * time.Minute) // t+12m + _ = wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (bool, error) { + // Non-expiring defaultMeter should be available + allMeters.lock.Lock() + defer allMeters.lock.Unlock() + return len(allMeters.meters) == 1, nil + }) + + allMeters.lock.Lock() + defer allMeters.lock.Unlock() + resourceViews.lock.Lock() + defer resourceViews.lock.Unlock() + if len(allMeters.meters) != 1 { + t.Errorf("len(allMeters)=%d, want: 1", len(allMeters.meters)) + } +} + func TestResourceAsString(t *testing.T) { r1 := &resource.Resource{Type: "foobar", Labels: map[string]string{"k1": "v1", "k3": "v3", "k2": "v2"}} r2 := &resource.Resource{Type: "foobar", Labels: map[string]string{"k2": "v2", "k3": "v3", "k1": "v1"}}