Skip to content

Commit 2ee038d

Browse files
authored
Merge pull request #17612 from chaochn47/release-3.5-backport-fix-watch-event-loss-after-compact
[release-3.5] backport fix watch event loss after compaction
2 parents 8383107 + 46d2caa commit 2ee038d

File tree

2 files changed

+65
-1
lines changed

2 files changed

+65
-1
lines changed

server/mvcc/watchable_store.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,11 @@ func (s *watchableStore) syncWatchers() int {
366366
var victims watcherBatch
367367
wb := newWatcherBatch(wg, evs)
368368
for w := range wg.watchers {
369+
if w.minRev < compactionRev {
370+
// Skip the watcher that failed to send compacted watch response due to w.ch is full.
371+
// Next retry of syncWatchers would try to resend the compacted watch response to w.ch
372+
continue
373+
}
369374
w.minRev = curRev + 1
370375

371376
eb, ok := wb[w]

server/mvcc/watchable_store_test.go

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,14 @@ import (
2323
"testing"
2424
"time"
2525

26+
"github.com/stretchr/testify/require"
27+
"go.uber.org/zap"
28+
"go.uber.org/zap/zaptest"
29+
2630
"go.etcd.io/etcd/api/v3/mvccpb"
2731
"go.etcd.io/etcd/pkg/v3/traceutil"
2832
"go.etcd.io/etcd/server/v3/lease"
2933
betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing"
30-
"go.uber.org/zap"
3134
)
3235

3336
func TestWatch(t *testing.T) {
@@ -259,6 +262,62 @@ func TestWatchCompacted(t *testing.T) {
259262
}
260263
}
261264

265+
func TestWatchNoEventLossOnCompact(t *testing.T) {
266+
oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync
267+
b, tmpPath := betesting.NewDefaultTmpBackend(t)
268+
lg := zaptest.NewLogger(t)
269+
s := newWatchableStore(lg, b, &lease.FakeLessor{}, StoreConfig{})
270+
271+
defer func() {
272+
cleanup(s, b, tmpPath)
273+
chanBufLen, maxWatchersPerSync = oldChanBufLen, oldMaxWatchersPerSync
274+
}()
275+
276+
chanBufLen, maxWatchersPerSync = 1, 4
277+
testKey, testValue := []byte("foo"), []byte("bar")
278+
279+
maxRev := 10
280+
compactRev := int64(5)
281+
for i := 0; i < maxRev; i++ {
282+
s.Put(testKey, testValue, lease.NoLease)
283+
}
284+
_, err := s.Compact(traceutil.TODO(), compactRev)
285+
require.NoErrorf(t, err, "failed to compact kv (%v)", err)
286+
287+
w := s.NewWatchStream()
288+
defer w.Close()
289+
290+
watchers := map[WatchID]int64{
291+
0: 1,
292+
1: 1, // create unsyncd watchers with startRev < compactRev
293+
2: 6, // create unsyncd watchers with compactRev < startRev < currentRev
294+
}
295+
for id, startRev := range watchers {
296+
_, err := w.Watch(id, testKey, nil, startRev)
297+
require.NoError(t, err)
298+
}
299+
// fill up w.Chan() with 1 buf via 2 compacted watch response
300+
s.syncWatchers()
301+
302+
for len(watchers) > 0 {
303+
resp := <-w.Chan()
304+
if resp.CompactRevision != 0 {
305+
require.Equal(t, resp.CompactRevision, compactRev)
306+
require.Contains(t, watchers, resp.WatchID)
307+
delete(watchers, resp.WatchID)
308+
continue
309+
}
310+
nextRev := watchers[resp.WatchID]
311+
for _, ev := range resp.Events {
312+
require.Equalf(t, nextRev, ev.Kv.ModRevision, "got event revision %d but want %d for watcher with watch ID %d", ev.Kv.ModRevision, nextRev, resp.WatchID)
313+
nextRev++
314+
}
315+
if nextRev == s.rev()+1 {
316+
delete(watchers, resp.WatchID)
317+
}
318+
}
319+
}
320+
262321
func TestWatchFutureRev(t *testing.T) {
263322
b, tmpPath := betesting.NewDefaultTmpBackend(t)
264323
s := newWatchableStore(zap.NewExample(), b, &lease.FakeLessor{}, StoreConfig{})

0 commit comments

Comments
 (0)