Skip to content

Commit b7d7ff1

Browse files
authored
Unregister views to avoid slow oom issue during meter cleanup (#2005) (#2027)
* unregister views * add a test * fix string issue * fix * fixes * fixe races in tests * fix exporter issue * stop meter in cleanup * fixes * typo * revert timeout change * fixes * change msg
1 parent 51c72e2 commit b7d7ff1

File tree

2 files changed

+79
-2
lines changed

2 files changed

+79
-2
lines changed

metrics/resource_view.go

+7-1
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,16 @@ func cleanup() {
8282
expiryCutoff := allMeters.clock.Now().Add(-1 * maxMeterExporterAge)
8383
allMeters.lock.Lock()
8484
defer allMeters.lock.Unlock()
85+
resourceViews.lock.Lock()
86+
defer resourceViews.lock.Unlock()
8587
for key, meter := range allMeters.meters {
8688
if key != "" && meter.t.Before(expiryCutoff) {
8789
flushGivenExporter(meter.e)
90+
// Make a copy of views to avoid data races
91+
viewsCopy := copyViews(resourceViews.views)
92+
meter.m.Unregister(viewsCopy...)
8893
delete(allMeters.meters, key)
94+
meter.m.Stop()
8995
}
9096
}
9197
}
@@ -139,7 +145,7 @@ func RegisterResourceView(views ...*view.View) error {
139145
return nil
140146
}
141147

142-
// UnregisterResourceView is similar to view.Unregiste(), except that it will
148+
// UnregisterResourceView is similar to view.Unregister(), except that it will
143149
// unregister the view across all Resources tracked byt he system, rather than
144150
// simply the default view.
145151
func UnregisterResourceView(views ...*view.View) {

metrics/resource_view_test.go

+72-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"go.opencensus.io/stats"
2626
"go.opencensus.io/stats/view"
2727
"k8s.io/apimachinery/pkg/util/clock"
28+
"k8s.io/apimachinery/pkg/util/wait"
2829
)
2930

3031
var (
@@ -74,6 +75,8 @@ type testExporter struct {
7475
id string
7576
}
7677

78+
func (testExporter) ExportView(viewData *view.Data) {}
79+
7780
func TestSetFactory(t *testing.T) {
7881
var oldFactory ResourceExporterFactory
7982
func() {
@@ -170,7 +173,13 @@ func TestAllMetersExpiration(t *testing.T) {
170173

171174
// Expire the second entry
172175
fakeClock.Step(9 * time.Minute) // t+12m
173-
time.Sleep(time.Second) // Wait a second on the wallclock, so that the cleanup thread has time to finish a loop
176+
_ = wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (bool, error) {
177+
// Non-expiring defaultMeter should be available along with the non-expired entry
178+
allMeters.lock.Lock()
179+
defer allMeters.lock.Unlock()
180+
return len(allMeters.meters) == 2, nil
181+
})
182+
174183
allMeters.lock.Lock()
175184
if len(allMeters.meters) != 2 {
176185
t.Errorf("len(allMeters)=%d, want: 2", len(allMeters.meters))
@@ -189,6 +198,68 @@ func TestAllMetersExpiration(t *testing.T) {
189198
// (123=9m, 456=evicted, 789=0m)
190199
}
191200

201+
func TestIfAllMeterResourcesAreRemoved(t *testing.T) {
202+
allMeters.clock = clock.Clock(clock.NewFakeClock(time.Now()))
203+
var fakeClock *clock.FakeClock = allMeters.clock.(*clock.FakeClock)
204+
ClearMetersForTest() // t+0m
205+
// Register many resources at once
206+
for i := 1; i <= 100; i++ {
207+
res := resource.Resource{Labels: map[string]string{"foo": "bar"}}
208+
res.Labels["id"] = fmt.Sprint(i)
209+
if _, err := optionForResource(&res); err != nil {
210+
t.Error("Should succeed getting option, instead got error ", err)
211+
}
212+
m := stats.Int64("testView_sum", "", stats.UnitDimensionless)
213+
v := view.View{Name: fmt.Sprintf("testview-%d", i), Measure: m, Aggregation: view.Sum()}
214+
215+
err := RegisterResourceView(&v)
216+
if err != nil {
217+
t.Fatal("RegisterResourceView =", err)
218+
}
219+
}
220+
221+
func() {
222+
allMeters.lock.Lock()
223+
resourceViews.lock.Lock()
224+
defer allMeters.lock.Unlock()
225+
defer resourceViews.lock.Unlock()
226+
// Make a copy to test against as allMeters should be cleaned up
227+
copyAllMeters := make(map[string]*meterExporter, len(allMeters.meters))
228+
for k, v := range allMeters.meters {
229+
copyAllMeters[k] = v
230+
}
231+
232+
for _, meter := range copyAllMeters {
233+
for _, mView := range resourceViews.views {
234+
want, got := mView, meter.m.Find(mView.Name)
235+
if got == nil {
236+
t.Errorf("View %s is not available", want.Name)
237+
} else if want.Name != got.Name {
238+
t.Errorf("Want %v, got %v", want.Name, got.Name)
239+
}
240+
}
241+
}
242+
}()
243+
244+
// Expire all meters and views
245+
// We need to unlock before we move the clock ahead in time
246+
fakeClock.Step(12 * time.Minute) // t+12m
247+
_ = wait.PollImmediate(100*time.Millisecond, 5*time.Second, func() (bool, error) {
248+
// Non-expiring defaultMeter should be available
249+
allMeters.lock.Lock()
250+
defer allMeters.lock.Unlock()
251+
return len(allMeters.meters) == 1, nil
252+
})
253+
254+
allMeters.lock.Lock()
255+
defer allMeters.lock.Unlock()
256+
resourceViews.lock.Lock()
257+
defer resourceViews.lock.Unlock()
258+
if len(allMeters.meters) != 1 {
259+
t.Errorf("len(allMeters)=%d, want: 1", len(allMeters.meters))
260+
}
261+
}
262+
192263
func TestResourceAsString(t *testing.T) {
193264
r1 := &resource.Resource{Type: "foobar", Labels: map[string]string{"k1": "v1", "k3": "v3", "k2": "v2"}}
194265
r2 := &resource.Resource{Type: "foobar", Labels: map[string]string{"k2": "v2", "k3": "v3", "k1": "v1"}}

0 commit comments

Comments
 (0)