Skip to content

Commit e3da7c3

Browse files
imorphdhartunian
authored andcommitted
Add: exponential backoff for CAS operations on floats (prometheus#1661)
* add: exponential backoff for CAS operations of floats Signed-off-by: Ivan Goncharov <[email protected]> * add: some more benchmark use cases (higher contention) Signed-off-by: Ivan Goncharov <[email protected]> * fmt: fumpted some files Signed-off-by: Ivan Goncharov <[email protected]> * add: license header Signed-off-by: Ivan Goncharov <[email protected]> * add: comment explaining origin of backoff constants Signed-off-by: Ivan Goncharov <[email protected]> --------- Signed-off-by: Ivan Goncharov <[email protected]>
1 parent 2d4b7d3 commit e3da7c3

File tree

6 files changed

+236
-36
lines changed

6 files changed

+236
-36
lines changed

Diff for: prometheus/atomic_update.go

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Copyright 2014 The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package prometheus
15+
16+
import (
17+
"math"
18+
"sync/atomic"
19+
"time"
20+
)
21+
22+
// atomicUpdateFloat atomically updates the float64 value pointed to by bits
23+
// using the provided updateFunc, with an exponential backoff on contention.
24+
func atomicUpdateFloat(bits *uint64, updateFunc func(float64) float64) {
25+
const (
26+
// both numbers are derived from empirical observations
27+
// documented in this PR: https://github.com/prometheus/client_golang/pull/1661
28+
maxBackoff = 320 * time.Millisecond
29+
initialBackoff = 10 * time.Millisecond
30+
)
31+
backoff := initialBackoff
32+
33+
for {
34+
loadedBits := atomic.LoadUint64(bits)
35+
oldFloat := math.Float64frombits(loadedBits)
36+
newFloat := updateFunc(oldFloat)
37+
newBits := math.Float64bits(newFloat)
38+
39+
if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) {
40+
break
41+
} else {
42+
// Exponential backoff with sleep and cap to avoid infinite wait
43+
time.Sleep(backoff)
44+
backoff *= 2
45+
if backoff > maxBackoff {
46+
backoff = maxBackoff
47+
}
48+
}
49+
}
50+
}

Diff for: prometheus/atomic_update_test.go

+167
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
// Copyright 2014 The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package prometheus
15+
16+
import (
17+
"math"
18+
"sync"
19+
"sync/atomic"
20+
"testing"
21+
"unsafe"
22+
)
23+
24+
var output float64
25+
26+
func TestAtomicUpdateFloat(t *testing.T) {
27+
var val float64 = 0.0
28+
bits := (*uint64)(unsafe.Pointer(&val))
29+
var wg sync.WaitGroup
30+
numGoroutines := 100000
31+
increment := 1.0
32+
33+
for i := 0; i < numGoroutines; i++ {
34+
wg.Add(1)
35+
go func() {
36+
defer wg.Done()
37+
atomicUpdateFloat(bits, func(f float64) float64 {
38+
return f + increment
39+
})
40+
}()
41+
}
42+
43+
wg.Wait()
44+
expected := float64(numGoroutines) * increment
45+
if val != expected {
46+
t.Errorf("Expected %f, got %f", expected, val)
47+
}
48+
}
49+
50+
// Benchmark for atomicUpdateFloat with single goroutine (no contention).
51+
func BenchmarkAtomicUpdateFloat_SingleGoroutine(b *testing.B) {
52+
var val float64 = 0.0
53+
bits := (*uint64)(unsafe.Pointer(&val))
54+
55+
for i := 0; i < b.N; i++ {
56+
atomicUpdateFloat(bits, func(f float64) float64 {
57+
return f + 1.0
58+
})
59+
}
60+
61+
output = val
62+
}
63+
64+
// Benchmark for old implementation with single goroutine (no contention) -> to check overhead of backoff
65+
func BenchmarkAtomicNoBackoff_SingleGoroutine(b *testing.B) {
66+
var val float64 = 0.0
67+
bits := (*uint64)(unsafe.Pointer(&val))
68+
69+
for i := 0; i < b.N; i++ {
70+
for {
71+
loadedBits := atomic.LoadUint64(bits)
72+
newBits := math.Float64bits(math.Float64frombits(loadedBits) + 1.0)
73+
if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) {
74+
break
75+
}
76+
}
77+
}
78+
79+
output = val
80+
}
81+
82+
// Benchmark varying the number of goroutines.
83+
func benchmarkAtomicUpdateFloatConcurrency(b *testing.B, numGoroutines int) {
84+
var val float64 = 0.0
85+
bits := (*uint64)(unsafe.Pointer(&val))
86+
b.SetParallelism(numGoroutines)
87+
88+
b.ResetTimer()
89+
b.RunParallel(func(pb *testing.PB) {
90+
for pb.Next() {
91+
atomicUpdateFloat(bits, func(f float64) float64 {
92+
return f + 1.0
93+
})
94+
}
95+
})
96+
97+
output = val
98+
}
99+
100+
func benchmarkAtomicNoBackoffFloatConcurrency(b *testing.B, numGoroutines int) {
101+
var val float64 = 0.0
102+
bits := (*uint64)(unsafe.Pointer(&val))
103+
b.SetParallelism(numGoroutines)
104+
105+
b.ResetTimer()
106+
b.RunParallel(func(pb *testing.PB) {
107+
for pb.Next() {
108+
for {
109+
loadedBits := atomic.LoadUint64(bits)
110+
newBits := math.Float64bits(math.Float64frombits(loadedBits) + 1.0)
111+
if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) {
112+
break
113+
}
114+
}
115+
}
116+
})
117+
118+
output = val
119+
}
120+
121+
func BenchmarkAtomicUpdateFloat_1Goroutine(b *testing.B) {
122+
benchmarkAtomicUpdateFloatConcurrency(b, 1)
123+
}
124+
125+
func BenchmarkAtomicNoBackoff_1Goroutine(b *testing.B) {
126+
benchmarkAtomicNoBackoffFloatConcurrency(b, 1)
127+
}
128+
129+
func BenchmarkAtomicUpdateFloat_2Goroutines(b *testing.B) {
130+
benchmarkAtomicUpdateFloatConcurrency(b, 2)
131+
}
132+
133+
func BenchmarkAtomicNoBackoff_2Goroutines(b *testing.B) {
134+
benchmarkAtomicNoBackoffFloatConcurrency(b, 2)
135+
}
136+
137+
func BenchmarkAtomicUpdateFloat_4Goroutines(b *testing.B) {
138+
benchmarkAtomicUpdateFloatConcurrency(b, 4)
139+
}
140+
141+
func BenchmarkAtomicNoBackoff_4Goroutines(b *testing.B) {
142+
benchmarkAtomicNoBackoffFloatConcurrency(b, 4)
143+
}
144+
145+
func BenchmarkAtomicUpdateFloat_8Goroutines(b *testing.B) {
146+
benchmarkAtomicUpdateFloatConcurrency(b, 8)
147+
}
148+
149+
func BenchmarkAtomicNoBackoff_8Goroutines(b *testing.B) {
150+
benchmarkAtomicNoBackoffFloatConcurrency(b, 8)
151+
}
152+
153+
func BenchmarkAtomicUpdateFloat_16Goroutines(b *testing.B) {
154+
benchmarkAtomicUpdateFloatConcurrency(b, 16)
155+
}
156+
157+
func BenchmarkAtomicNoBackoff_16Goroutines(b *testing.B) {
158+
benchmarkAtomicNoBackoffFloatConcurrency(b, 16)
159+
}
160+
161+
func BenchmarkAtomicUpdateFloat_32Goroutines(b *testing.B) {
162+
benchmarkAtomicUpdateFloatConcurrency(b, 32)
163+
}
164+
165+
func BenchmarkAtomicNoBackoff_32Goroutines(b *testing.B) {
166+
benchmarkAtomicNoBackoffFloatConcurrency(b, 32)
167+
}

Diff for: prometheus/counter.go

+3-7
Original file line numberDiff line numberDiff line change
@@ -127,13 +127,9 @@ func (c *counter) Add(v float64) {
127127
return
128128
}
129129

130-
for {
131-
oldBits := atomic.LoadUint64(&c.valBits)
132-
newBits := math.Float64bits(math.Float64frombits(oldBits) + v)
133-
if atomic.CompareAndSwapUint64(&c.valBits, oldBits, newBits) {
134-
return
135-
}
136-
}
130+
atomicUpdateFloat(&c.valBits, func(oldVal float64) float64 {
131+
return oldVal + v
132+
})
137133
}
138134

139135
func (c *counter) AddWithExemplar(v float64, e Labels) {

Diff for: prometheus/gauge.go

+3-7
Original file line numberDiff line numberDiff line change
@@ -120,13 +120,9 @@ func (g *gauge) Dec() {
120120
}
121121

122122
func (g *gauge) Add(val float64) {
123-
for {
124-
oldBits := atomic.LoadUint64(&g.valBits)
125-
newBits := math.Float64bits(math.Float64frombits(oldBits) + val)
126-
if atomic.CompareAndSwapUint64(&g.valBits, oldBits, newBits) {
127-
return
128-
}
129-
}
123+
atomicUpdateFloat(&g.valBits, func(oldVal float64) float64 {
124+
return oldVal + val
125+
})
130126
}
131127

132128
func (g *gauge) Sub(val float64) {

Diff for: prometheus/histogram.go

+3-7
Original file line numberDiff line numberDiff line change
@@ -1493,13 +1493,9 @@ func waitForCooldown(count uint64, counts *histogramCounts) {
14931493
// atomicAddFloat adds the provided float atomically to another float
14941494
// represented by the bit pattern the bits pointer is pointing to.
14951495
func atomicAddFloat(bits *uint64, v float64) {
1496-
for {
1497-
loadedBits := atomic.LoadUint64(bits)
1498-
newBits := math.Float64bits(math.Float64frombits(loadedBits) + v)
1499-
if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) {
1500-
break
1501-
}
1502-
}
1496+
atomicUpdateFloat(bits, func(oldVal float64) float64 {
1497+
return oldVal + v
1498+
})
15031499
}
15041500

15051501
// atomicDecUint32 atomically decrements the uint32 p points to. See

Diff for: prometheus/summary.go

+10-15
Original file line numberDiff line numberDiff line change
@@ -453,13 +453,9 @@ func (s *noObjectivesSummary) Observe(v float64) {
453453
n := atomic.AddUint64(&s.countAndHotIdx, 1)
454454
hotCounts := s.counts[n>>63]
455455

456-
for {
457-
oldBits := atomic.LoadUint64(&hotCounts.sumBits)
458-
newBits := math.Float64bits(math.Float64frombits(oldBits) + v)
459-
if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) {
460-
break
461-
}
462-
}
456+
atomicUpdateFloat(&hotCounts.sumBits, func(oldVal float64) float64 {
457+
return oldVal + v
458+
})
463459
// Increment count last as we take it as a signal that the observation
464460
// is complete.
465461
atomic.AddUint64(&hotCounts.count, 1)
@@ -500,14 +496,13 @@ func (s *noObjectivesSummary) Write(out *dto.Metric) error {
500496
// Finally add all the cold counts to the new hot counts and reset the cold counts.
501497
atomic.AddUint64(&hotCounts.count, count)
502498
atomic.StoreUint64(&coldCounts.count, 0)
503-
for {
504-
oldBits := atomic.LoadUint64(&hotCounts.sumBits)
505-
newBits := math.Float64bits(math.Float64frombits(oldBits) + sum.GetSampleSum())
506-
if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) {
507-
atomic.StoreUint64(&coldCounts.sumBits, 0)
508-
break
509-
}
510-
}
499+
500+
// Use atomicUpdateFloat to update hotCounts.sumBits atomically.
501+
atomicUpdateFloat(&hotCounts.sumBits, func(oldVal float64) float64 {
502+
return oldVal + sum.GetSampleSum()
503+
})
504+
atomic.StoreUint64(&coldCounts.sumBits, 0)
505+
511506
return nil
512507
}
513508

0 commit comments

Comments
 (0)