Skip to content

Commit 4be6af1

Browse files
[FIXED] Desync after quit during catchup (#6459)
If a replica was catching up based on a stream snapshot and it would be incomplete due to the RAFT node/server quitting then there could be desync if there was at least one entry after it that got `n.Applied` and stored in a snapshot upon shutdown. Signed-off-by: Maurice van Veen <[email protected]>
2 parents bc18f1a + d22ae09 commit 4be6af1

File tree

2 files changed

+91
-1
lines changed

2 files changed

+91
-1
lines changed

server/jetstream_cluster.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -2512,7 +2512,7 @@ func (js *jetStream) monitorStream(mset *stream, sa *streamAssignment, sendSnaps
25122512
ce.ReturnToPool()
25132513
} else {
25142514
// Our stream was closed out from underneath of us, simply return here.
2515-
if err == errStreamClosed {
2515+
if err == errStreamClosed || err == errCatchupStreamStopped || err == ErrServerNotRunning {
25162516
aq.recycle(&ces)
25172517
return
25182518
}

server/jetstream_cluster_4_test.go

+90
Original file line numberDiff line numberDiff line change
@@ -4111,6 +4111,96 @@ func TestJetStreamClusterConsumerReplicasAfterScale(t *testing.T) {
41114111
require_Equal(t, len(ci.Cluster.Replicas), 2)
41124112
}
41134113

4114+
func TestJetStreamClusterDesyncAfterQuitDuringCatchup(t *testing.T) {
4115+
for title, test := range map[string]func(s *Server, rn RaftNode){
4116+
"RAFT": func(s *Server, rn RaftNode) {
4117+
rn.Stop()
4118+
rn.WaitForStop()
4119+
},
4120+
"server": func(s *Server, rn RaftNode) {
4121+
s.running.Store(false)
4122+
},
4123+
} {
4124+
t.Run(title, func(t *testing.T) {
4125+
c := createJetStreamClusterExplicit(t, "R3S", 3)
4126+
defer c.shutdown()
4127+
4128+
nc, js := jsClientConnect(t, c.randomServer())
4129+
defer nc.Close()
4130+
4131+
_, err := js.AddStream(&nats.StreamConfig{
4132+
Name: "TEST",
4133+
Subjects: []string{"foo"},
4134+
Replicas: 3,
4135+
})
4136+
require_NoError(t, err)
4137+
4138+
// Wait for all servers to have applied everything up to this point.
4139+
checkFor(t, 5*time.Second, 500*time.Millisecond, func() error {
4140+
for _, s := range c.servers {
4141+
acc, err := s.lookupAccount(globalAccountName)
4142+
if err != nil {
4143+
return err
4144+
}
4145+
mset, err := acc.lookupStream("TEST")
4146+
if err != nil {
4147+
return err
4148+
}
4149+
_, _, applied := mset.raftNode().Progress()
4150+
if applied != 1 {
4151+
return fmt.Errorf("expected applied to be %d, got %d", 1, applied)
4152+
}
4153+
}
4154+
return nil
4155+
})
4156+
4157+
rs := c.randomNonStreamLeader(globalAccountName, "TEST")
4158+
acc, err := rs.lookupAccount(globalAccountName)
4159+
require_NoError(t, err)
4160+
mset, err := acc.lookupStream("TEST")
4161+
require_NoError(t, err)
4162+
4163+
rn := mset.raftNode()
4164+
snap, err := json.Marshal(streamSnapshot{Msgs: 1, Bytes: 1, FirstSeq: 100, LastSeq: 100, Failed: 0, Deleted: nil})
4165+
require_NoError(t, err)
4166+
esm := encodeStreamMsgAllowCompress("foo", _EMPTY_, nil, nil, 0, 0, false)
4167+
4168+
// Lock stream so that we can go into processSnapshot but must wait for this to unlock.
4169+
mset.mu.Lock()
4170+
var unlocked bool
4171+
defer func() {
4172+
if !unlocked {
4173+
mset.mu.Unlock()
4174+
}
4175+
}()
4176+
4177+
_, err = rn.ApplyQ().push(newCommittedEntry(100, []*Entry{newEntry(EntrySnapshot, snap)}))
4178+
require_NoError(t, err)
4179+
_, err = rn.ApplyQ().push(newCommittedEntry(101, []*Entry{newEntry(EntryNormal, esm)}))
4180+
require_NoError(t, err)
4181+
4182+
// Waiting for the apply queue entry to be captured in monitorStream first.
4183+
time.Sleep(time.Second)
4184+
4185+
// Set commit to a very high number, just so that we allow upping Applied()
4186+
n := rn.(*raft)
4187+
n.Lock()
4188+
n.commit = 1000
4189+
n.Unlock()
4190+
4191+
// Now stop the underlying RAFT node/server so processSnapshot must exit because of it.
4192+
test(rs, rn)
4193+
mset.mu.Unlock()
4194+
unlocked = true
4195+
4196+
// Allow some time for the applied number to be updated, in which case it's an error.
4197+
time.Sleep(time.Second)
4198+
_, _, applied := mset.raftNode().Progress()
4199+
require_Equal(t, applied, 1)
4200+
})
4201+
}
4202+
}
4203+
41144204
func TestJetStreamClusterDesyncAfterErrorDuringCatchup(t *testing.T) {
41154205
tests := []struct {
41164206
title string

0 commit comments

Comments
 (0)