Skip to content

Commit 4392ff6

Browse files
authored
[runner] fix MonotonicTimestamp (#1728)
* make it monotonic * don't allow counter overflow — in a worst-case scenario it will repeat the same timestamp with 999 milliseconds, which is still better than out-of-order timestamps Fixes: #1727
1 parent 3947297 commit 4392ff6

File tree

2 files changed

+72
-14
lines changed

2 files changed

+72
-14
lines changed

runner/internal/executor/timestamp.go

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -9,37 +9,52 @@ import (
99
)
1010

1111
type MonotonicTimestamp struct {
12-
unix int64
13-
counter int
14-
mu sync.RWMutex
12+
initial time.Time
13+
initialUnix int64 // seconds
14+
elapsed int64 // seconds since initial
15+
counter int // surrogate milliseconds
16+
overflow bool
17+
mu sync.RWMutex
18+
getNow func() time.Time
1519
}
1620

1721
func NewMonotonicTimestamp() *MonotonicTimestamp {
22+
return newMonotonicTimestamp(time.Now)
23+
}
24+
25+
func newMonotonicTimestamp(getNow func() time.Time) *MonotonicTimestamp {
26+
// getNow must return time.Time with monotonic reading
27+
now := getNow()
1828
return &MonotonicTimestamp{
19-
unix: time.Now().Unix(),
20-
counter: 0,
21-
mu: sync.RWMutex{},
29+
initial: now,
30+
initialUnix: now.Unix(),
31+
mu: sync.RWMutex{},
32+
getNow: getNow,
2233
}
2334
}
2435

2536
func (t *MonotonicTimestamp) GetLatest() int64 {
2637
t.mu.RLock()
2738
defer t.mu.RUnlock()
28-
return t.unix*1000 + int64(t.counter)
39+
return (t.initialUnix+t.elapsed)*1000 + int64(t.counter)
2940
}
3041

3142
func (t *MonotonicTimestamp) Next() int64 {
32-
// warning: time.Now() is not monotonic in general
3343
t.mu.Lock()
34-
now := time.Now().Unix()
35-
if now == t.unix {
36-
t.counter++
37-
if t.counter == 1000 {
38-
log.Warning(context.TODO(), "Monotonic timestamp counter overflowed", "timestamp", now)
44+
now := t.getNow()
45+
elapsed := int64(now.Sub(t.initial) / time.Second)
46+
if elapsed == t.elapsed {
47+
if t.counter < 999 {
48+
t.counter++
49+
} else if !t.overflow {
50+
// warn only once per second to avoid log spamming
51+
log.Warning(context.TODO(), "Monotonic timestamp counter overflowed", "unix", t.initialUnix+elapsed)
52+
t.overflow = true
3953
}
4054
} else {
41-
t.unix = now
55+
t.elapsed = elapsed
4256
t.counter = 0
57+
t.overflow = false
4358
}
4459
t.mu.Unlock()
4560
return t.GetLatest()
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package executor
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
func TestTimestamp_Counter(t *testing.T) {
11+
now := time.Now()
12+
ts := newMonotonicTimestamp(func() time.Time { return now })
13+
initial := ts.GetLatest()
14+
assert.Equal(t, int64(1), ts.Next()-initial)
15+
assert.Equal(t, int64(2), ts.Next()-initial)
16+
now = now.Add(999 * time.Millisecond)
17+
assert.Equal(t, int64(3), ts.Next()-initial)
18+
now = now.Add(100 * time.Millisecond)
19+
assert.Equal(t, int64(1000), ts.Next()-initial)
20+
assert.Equal(t, int64(1001), ts.Next()-initial)
21+
}
22+
23+
func TestTimestamp_CounterOverflow(t *testing.T) {
24+
now := time.Now()
25+
ts := newMonotonicTimestamp(func() time.Time { return now })
26+
initial := ts.GetLatest()
27+
for i := 0; i < 997; i++ {
28+
ts.Next()
29+
}
30+
assert.Equal(t, int64(998), ts.Next()-initial)
31+
assert.False(t, ts.overflow)
32+
assert.Equal(t, int64(999), ts.Next()-initial)
33+
assert.False(t, ts.overflow)
34+
assert.Equal(t, int64(999), ts.Next()-initial)
35+
assert.True(t, ts.overflow)
36+
assert.Equal(t, int64(999), ts.Next()-initial)
37+
assert.True(t, ts.overflow)
38+
now = now.Add(time.Second)
39+
assert.Equal(t, int64(1000), ts.Next()-initial)
40+
assert.False(t, ts.overflow)
41+
assert.Equal(t, int64(1001), ts.Next()-initial)
42+
assert.False(t, ts.overflow)
43+
}

0 commit comments

Comments
 (0)