Skip to content

Commit 554933e

Browse files
imorphamberpixels
authored andcommitted
Add: exponential backoff for CAS operations on floats (prometheus#1661)
* add: exponential backoff for CAS operations of floats Signed-off-by: Ivan Goncharov <[email protected]> * add: some more benchmark use cases (higher contention) Signed-off-by: Ivan Goncharov <[email protected]> * fmt: fumpted some files Signed-off-by: Ivan Goncharov <[email protected]> * add: license header Signed-off-by: Ivan Goncharov <[email protected]> * add: comment explaining origin of backoff constants Signed-off-by: Ivan Goncharov <[email protected]> --------- Signed-off-by: Ivan Goncharov <[email protected]> Signed-off-by: Eugene <[email protected]>
1 parent 645b842 commit 554933e

File tree

6 files changed

+236
-36
lines changed

6 files changed

+236
-36
lines changed

prometheus/atomic_update.go

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
// Copyright 2014 The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package prometheus
15+
16+
import (
17+
"math"
18+
"sync/atomic"
19+
"time"
20+
)
21+
22+
// atomicUpdateFloat atomically updates the float64 value pointed to by bits
23+
// using the provided updateFunc, with an exponential backoff on contention.
24+
func atomicUpdateFloat(bits *uint64, updateFunc func(float64) float64) {
25+
const (
26+
// both numbers are derived from empirical observations
27+
// documented in this PR: https://github.com/prometheus/client_golang/pull/1661
28+
maxBackoff = 320 * time.Millisecond
29+
initialBackoff = 10 * time.Millisecond
30+
)
31+
backoff := initialBackoff
32+
33+
for {
34+
loadedBits := atomic.LoadUint64(bits)
35+
oldFloat := math.Float64frombits(loadedBits)
36+
newFloat := updateFunc(oldFloat)
37+
newBits := math.Float64bits(newFloat)
38+
39+
if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) {
40+
break
41+
} else {
42+
// Exponential backoff with sleep and cap to avoid infinite wait
43+
time.Sleep(backoff)
44+
backoff *= 2
45+
if backoff > maxBackoff {
46+
backoff = maxBackoff
47+
}
48+
}
49+
}
50+
}

prometheus/atomic_update_test.go

+167
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
// Copyright 2014 The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
package prometheus
15+
16+
import (
17+
"math"
18+
"sync"
19+
"sync/atomic"
20+
"testing"
21+
"unsafe"
22+
)
23+
24+
var output float64
25+
26+
func TestAtomicUpdateFloat(t *testing.T) {
27+
var val float64 = 0.0
28+
bits := (*uint64)(unsafe.Pointer(&val))
29+
var wg sync.WaitGroup
30+
numGoroutines := 100000
31+
increment := 1.0
32+
33+
for i := 0; i < numGoroutines; i++ {
34+
wg.Add(1)
35+
go func() {
36+
defer wg.Done()
37+
atomicUpdateFloat(bits, func(f float64) float64 {
38+
return f + increment
39+
})
40+
}()
41+
}
42+
43+
wg.Wait()
44+
expected := float64(numGoroutines) * increment
45+
if val != expected {
46+
t.Errorf("Expected %f, got %f", expected, val)
47+
}
48+
}
49+
50+
// Benchmark for atomicUpdateFloat with single goroutine (no contention).
51+
func BenchmarkAtomicUpdateFloat_SingleGoroutine(b *testing.B) {
52+
var val float64 = 0.0
53+
bits := (*uint64)(unsafe.Pointer(&val))
54+
55+
for i := 0; i < b.N; i++ {
56+
atomicUpdateFloat(bits, func(f float64) float64 {
57+
return f + 1.0
58+
})
59+
}
60+
61+
output = val
62+
}
63+
64+
// Benchmark for old implementation with single goroutine (no contention) -> to check overhead of backoff
65+
func BenchmarkAtomicNoBackoff_SingleGoroutine(b *testing.B) {
66+
var val float64 = 0.0
67+
bits := (*uint64)(unsafe.Pointer(&val))
68+
69+
for i := 0; i < b.N; i++ {
70+
for {
71+
loadedBits := atomic.LoadUint64(bits)
72+
newBits := math.Float64bits(math.Float64frombits(loadedBits) + 1.0)
73+
if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) {
74+
break
75+
}
76+
}
77+
}
78+
79+
output = val
80+
}
81+
82+
// Benchmark varying the number of goroutines.
83+
func benchmarkAtomicUpdateFloatConcurrency(b *testing.B, numGoroutines int) {
84+
var val float64 = 0.0
85+
bits := (*uint64)(unsafe.Pointer(&val))
86+
b.SetParallelism(numGoroutines)
87+
88+
b.ResetTimer()
89+
b.RunParallel(func(pb *testing.PB) {
90+
for pb.Next() {
91+
atomicUpdateFloat(bits, func(f float64) float64 {
92+
return f + 1.0
93+
})
94+
}
95+
})
96+
97+
output = val
98+
}
99+
100+
func benchmarkAtomicNoBackoffFloatConcurrency(b *testing.B, numGoroutines int) {
101+
var val float64 = 0.0
102+
bits := (*uint64)(unsafe.Pointer(&val))
103+
b.SetParallelism(numGoroutines)
104+
105+
b.ResetTimer()
106+
b.RunParallel(func(pb *testing.PB) {
107+
for pb.Next() {
108+
for {
109+
loadedBits := atomic.LoadUint64(bits)
110+
newBits := math.Float64bits(math.Float64frombits(loadedBits) + 1.0)
111+
if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) {
112+
break
113+
}
114+
}
115+
}
116+
})
117+
118+
output = val
119+
}
120+
121+
func BenchmarkAtomicUpdateFloat_1Goroutine(b *testing.B) {
122+
benchmarkAtomicUpdateFloatConcurrency(b, 1)
123+
}
124+
125+
func BenchmarkAtomicNoBackoff_1Goroutine(b *testing.B) {
126+
benchmarkAtomicNoBackoffFloatConcurrency(b, 1)
127+
}
128+
129+
func BenchmarkAtomicUpdateFloat_2Goroutines(b *testing.B) {
130+
benchmarkAtomicUpdateFloatConcurrency(b, 2)
131+
}
132+
133+
func BenchmarkAtomicNoBackoff_2Goroutines(b *testing.B) {
134+
benchmarkAtomicNoBackoffFloatConcurrency(b, 2)
135+
}
136+
137+
func BenchmarkAtomicUpdateFloat_4Goroutines(b *testing.B) {
138+
benchmarkAtomicUpdateFloatConcurrency(b, 4)
139+
}
140+
141+
func BenchmarkAtomicNoBackoff_4Goroutines(b *testing.B) {
142+
benchmarkAtomicNoBackoffFloatConcurrency(b, 4)
143+
}
144+
145+
func BenchmarkAtomicUpdateFloat_8Goroutines(b *testing.B) {
146+
benchmarkAtomicUpdateFloatConcurrency(b, 8)
147+
}
148+
149+
func BenchmarkAtomicNoBackoff_8Goroutines(b *testing.B) {
150+
benchmarkAtomicNoBackoffFloatConcurrency(b, 8)
151+
}
152+
153+
func BenchmarkAtomicUpdateFloat_16Goroutines(b *testing.B) {
154+
benchmarkAtomicUpdateFloatConcurrency(b, 16)
155+
}
156+
157+
func BenchmarkAtomicNoBackoff_16Goroutines(b *testing.B) {
158+
benchmarkAtomicNoBackoffFloatConcurrency(b, 16)
159+
}
160+
161+
func BenchmarkAtomicUpdateFloat_32Goroutines(b *testing.B) {
162+
benchmarkAtomicUpdateFloatConcurrency(b, 32)
163+
}
164+
165+
func BenchmarkAtomicNoBackoff_32Goroutines(b *testing.B) {
166+
benchmarkAtomicNoBackoffFloatConcurrency(b, 32)
167+
}

prometheus/counter.go

+3-7
Original file line numberDiff line numberDiff line change
@@ -134,13 +134,9 @@ func (c *counter) Add(v float64) {
134134
return
135135
}
136136

137-
for {
138-
oldBits := atomic.LoadUint64(&c.valBits)
139-
newBits := math.Float64bits(math.Float64frombits(oldBits) + v)
140-
if atomic.CompareAndSwapUint64(&c.valBits, oldBits, newBits) {
141-
return
142-
}
143-
}
137+
atomicUpdateFloat(&c.valBits, func(oldVal float64) float64 {
138+
return oldVal + v
139+
})
144140
}
145141

146142
func (c *counter) AddWithExemplar(v float64, e Labels) {

prometheus/gauge.go

+3-7
Original file line numberDiff line numberDiff line change
@@ -120,13 +120,9 @@ func (g *gauge) Dec() {
120120
}
121121

122122
func (g *gauge) Add(val float64) {
123-
for {
124-
oldBits := atomic.LoadUint64(&g.valBits)
125-
newBits := math.Float64bits(math.Float64frombits(oldBits) + val)
126-
if atomic.CompareAndSwapUint64(&g.valBits, oldBits, newBits) {
127-
return
128-
}
129-
}
123+
atomicUpdateFloat(&g.valBits, func(oldVal float64) float64 {
124+
return oldVal + val
125+
})
130126
}
131127

132128
func (g *gauge) Sub(val float64) {

prometheus/histogram.go

+3-7
Original file line numberDiff line numberDiff line change
@@ -1641,13 +1641,9 @@ func waitForCooldown(count uint64, counts *histogramCounts) {
16411641
// atomicAddFloat adds the provided float atomically to another float
16421642
// represented by the bit pattern the bits pointer is pointing to.
16431643
func atomicAddFloat(bits *uint64, v float64) {
1644-
for {
1645-
loadedBits := atomic.LoadUint64(bits)
1646-
newBits := math.Float64bits(math.Float64frombits(loadedBits) + v)
1647-
if atomic.CompareAndSwapUint64(bits, loadedBits, newBits) {
1648-
break
1649-
}
1650-
}
1644+
atomicUpdateFloat(bits, func(oldVal float64) float64 {
1645+
return oldVal + v
1646+
})
16511647
}
16521648

16531649
// atomicDecUint32 atomically decrements the uint32 p points to. See

prometheus/summary.go

+10-15
Original file line numberDiff line numberDiff line change
@@ -471,13 +471,9 @@ func (s *noObjectivesSummary) Observe(v float64) {
471471
n := atomic.AddUint64(&s.countAndHotIdx, 1)
472472
hotCounts := s.counts[n>>63]
473473

474-
for {
475-
oldBits := atomic.LoadUint64(&hotCounts.sumBits)
476-
newBits := math.Float64bits(math.Float64frombits(oldBits) + v)
477-
if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) {
478-
break
479-
}
480-
}
474+
atomicUpdateFloat(&hotCounts.sumBits, func(oldVal float64) float64 {
475+
return oldVal + v
476+
})
481477
// Increment count last as we take it as a signal that the observation
482478
// is complete.
483479
atomic.AddUint64(&hotCounts.count, 1)
@@ -519,14 +515,13 @@ func (s *noObjectivesSummary) Write(out *dto.Metric) error {
519515
// Finally add all the cold counts to the new hot counts and reset the cold counts.
520516
atomic.AddUint64(&hotCounts.count, count)
521517
atomic.StoreUint64(&coldCounts.count, 0)
522-
for {
523-
oldBits := atomic.LoadUint64(&hotCounts.sumBits)
524-
newBits := math.Float64bits(math.Float64frombits(oldBits) + sum.GetSampleSum())
525-
if atomic.CompareAndSwapUint64(&hotCounts.sumBits, oldBits, newBits) {
526-
atomic.StoreUint64(&coldCounts.sumBits, 0)
527-
break
528-
}
529-
}
518+
519+
// Use atomicUpdateFloat to update hotCounts.sumBits atomically.
520+
atomicUpdateFloat(&hotCounts.sumBits, func(oldVal float64) float64 {
521+
return oldVal + sum.GetSampleSum()
522+
})
523+
atomic.StoreUint64(&coldCounts.sumBits, 0)
524+
530525
return nil
531526
}
532527

0 commit comments

Comments
 (0)