Skip to content

Commit b484711

Browse files
[IMPROVED] Optimized calculating source sequences for sourced stream with lots of deletes. (#6461)
Added LoadPrevMsg to optimize when we walk backwards looking for source sequences and we have lots of interior deletes. Signed-off-by: Derek Collison <[email protected]>
2 parents 3d61421 + 7cddfb4 commit b484711

File tree

5 files changed

+164
-8
lines changed

5 files changed

+164
-8
lines changed

server/filestore.go

+51
Original file line numberDiff line numberDiff line change
@@ -7306,6 +7306,57 @@ func (fs *fileStore) LoadNextMsg(filter string, wc bool, start uint64, sm *Store
73067306
return nil, fs.state.LastSeq, ErrStoreEOF
73077307
}
73087308

7309+
// Will load the next non-deleted msg starting at the start sequence and walking backwards.
7310+
func (fs *fileStore) LoadPrevMsg(start uint64, smp *StoreMsg) (sm *StoreMsg, err error) {
7311+
fs.mu.RLock()
7312+
defer fs.mu.RUnlock()
7313+
7314+
if fs.closed {
7315+
return nil, ErrStoreClosed
7316+
}
7317+
if fs.state.Msgs == 0 || start < fs.state.FirstSeq {
7318+
return nil, ErrStoreEOF
7319+
}
7320+
7321+
if start > fs.state.LastSeq {
7322+
start = fs.state.LastSeq
7323+
}
7324+
if smp == nil {
7325+
smp = new(StoreMsg)
7326+
}
7327+
7328+
if bi, _ := fs.selectMsgBlockWithIndex(start); bi >= 0 {
7329+
for i := bi; i >= 0; i-- {
7330+
mb := fs.blks[i]
7331+
mb.mu.Lock()
7332+
// Need messages loaded from here on out.
7333+
if mb.cacheNotLoaded() {
7334+
if err := mb.loadMsgsWithLock(); err != nil {
7335+
mb.mu.Unlock()
7336+
return nil, err
7337+
}
7338+
}
7339+
7340+
lseq, fseq := atomic.LoadUint64(&mb.last.seq), atomic.LoadUint64(&mb.first.seq)
7341+
if start > lseq {
7342+
start = lseq
7343+
}
7344+
for seq := start; seq >= fseq; seq-- {
7345+
if mb.dmap.Exists(seq) {
7346+
continue
7347+
}
7348+
if sm, err := mb.cacheLookup(seq, smp); err == nil {
7349+
mb.mu.Unlock()
7350+
return sm, nil
7351+
}
7352+
}
7353+
mb.mu.Unlock()
7354+
}
7355+
}
7356+
7357+
return nil, ErrStoreEOF
7358+
}
7359+
73097360
// Type returns the type of the underlying store.
73107361
func (fs *fileStore) Type() StorageType {
73117362
return FileStorage

server/memstore.go

+27
Original file line numberDiff line numberDiff line change
@@ -1509,6 +1509,33 @@ func (ms *memStore) LoadNextMsg(filter string, wc bool, start uint64, smp *Store
15091509
return nil, ms.state.LastSeq, ErrStoreEOF
15101510
}
15111511

1512+
// Will load the next non-deleted msg starting at the start sequence and walking backwards.
1513+
func (ms *memStore) LoadPrevMsg(start uint64, smp *StoreMsg) (sm *StoreMsg, err error) {
1514+
ms.mu.RLock()
1515+
defer ms.mu.RUnlock()
1516+
1517+
if ms.msgs == nil {
1518+
return nil, ErrStoreClosed
1519+
}
1520+
if ms.state.Msgs == 0 || start < ms.state.FirstSeq {
1521+
return nil, ErrStoreEOF
1522+
}
1523+
if start > ms.state.LastSeq {
1524+
start = ms.state.LastSeq
1525+
}
1526+
1527+
for seq := start; seq >= ms.state.FirstSeq; seq-- {
1528+
if sm, ok := ms.msgs[seq]; ok {
1529+
if smp == nil {
1530+
smp = new(StoreMsg)
1531+
}
1532+
sm.copy(smp)
1533+
return smp, nil
1534+
}
1535+
}
1536+
return nil, ErrStoreEOF
1537+
}
1538+
15121539
// RemoveMsg will remove the message from this store.
15131540
// Will return the number of bytes removed.
15141541
func (ms *memStore) RemoveMsg(seq uint64) (bool, error) {

server/norace_test.go

+68
Original file line numberDiff line numberDiff line change
@@ -9014,6 +9014,7 @@ func TestNoRaceStoreStreamEncoderDecoder(t *testing.T) {
90149014
}
90159015
ms, err := newMemStore(cfg)
90169016
require_NoError(t, err)
9017+
defer ms.Stop()
90179018

90189019
fs, err := newFileStore(
90199020
FileStoreConfig{StoreDir: t.TempDir()},
@@ -11298,3 +11299,70 @@ func TestNoRaceJetStreamClusterLargeMetaSnapshotTiming(t *testing.T) {
1129811299
require_NoError(t, n.InstallSnapshot(snap))
1129911300
t.Logf("Took %v to snap meta with size of %v\n", time.Since(start), friendlyBytes(len(snap)))
1130011301
}
11302+
11303+
func TestNoRaceStoreReverseWalkWithDeletesPerf(t *testing.T) {
11304+
cfg := StreamConfig{Name: "zzz", Subjects: []string{"foo.*"}, Storage: FileStorage}
11305+
11306+
fs, err := newFileStore(FileStoreConfig{StoreDir: t.TempDir()}, cfg)
11307+
require_NoError(t, err)
11308+
defer fs.Stop()
11309+
11310+
cfg.Storage = MemoryStorage
11311+
ms, err := newMemStore(&cfg)
11312+
require_NoError(t, err)
11313+
defer ms.Stop()
11314+
11315+
msg := []byte("Hello")
11316+
11317+
for _, store := range []StreamStore{fs, ms} {
11318+
store.StoreMsg("foo.A", nil, msg, 0)
11319+
for i := 0; i < 1_000_000; i++ {
11320+
store.StoreMsg("foo.B", nil, msg, 0)
11321+
}
11322+
store.StoreMsg("foo.C", nil, msg, 0)
11323+
11324+
var ss StreamState
11325+
store.FastState(&ss)
11326+
require_Equal(t, ss.Msgs, 1_000_002)
11327+
11328+
// Create a bunch of interior deletes.
11329+
p, err := store.PurgeEx("foo.B", 1, 0)
11330+
require_NoError(t, err)
11331+
require_Equal(t, p, 1_000_000)
11332+
11333+
// Now simulate a walk backwards as we currently do when searching for starting sequence numbers in sourced streams.
11334+
start := time.Now()
11335+
var smv StoreMsg
11336+
for seq := ss.LastSeq; seq > 0; seq-- {
11337+
_, err := store.LoadMsg(seq, &smv)
11338+
if err == errDeletedMsg || err == ErrStoreMsgNotFound {
11339+
continue
11340+
}
11341+
require_NoError(t, err)
11342+
}
11343+
elapsed := time.Since(start)
11344+
11345+
// Now use the optimized load prev.
11346+
seq, seen := ss.LastSeq, 0
11347+
start = time.Now()
11348+
for {
11349+
sm, err := store.LoadPrevMsg(seq, &smv)
11350+
if err == ErrStoreEOF {
11351+
break
11352+
}
11353+
require_NoError(t, err)
11354+
seq = sm.seq - 1
11355+
seen++
11356+
}
11357+
elapsedNew := time.Since(start)
11358+
require_Equal(t, seen, 2)
11359+
11360+
switch store.(type) {
11361+
case *memStore:
11362+
require_True(t, elapsedNew < elapsed)
11363+
case *fileStore:
11364+
// Bigger gains for filestore, 10x
11365+
require_True(t, elapsedNew*10 < elapsed)
11366+
}
11367+
}
11368+
}

server/store.go

+1
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ type StreamStore interface {
9696
LoadNextMsg(filter string, wc bool, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error)
9797
LoadNextMsgMulti(sl *Sublist, start uint64, smp *StoreMsg) (sm *StoreMsg, skip uint64, err error)
9898
LoadLastMsg(subject string, sm *StoreMsg) (*StoreMsg, error)
99+
LoadPrevMsg(start uint64, smp *StoreMsg) (sm *StoreMsg, err error)
99100
RemoveMsg(seq uint64) (bool, error)
100101
EraseMsg(seq uint64) (bool, error)
101102
Purge() (uint64, error)

server/stream.go

+17-8
Original file line numberDiff line numberDiff line change
@@ -3663,16 +3663,21 @@ func (mset *stream) setStartingSequenceForSources(iNames map[string]struct{}) {
36633663
}
36643664

36653665
var smv StoreMsg
3666-
for seq := state.LastSeq; seq >= state.FirstSeq; seq-- {
3667-
sm, err := mset.store.LoadMsg(seq, &smv)
3668-
if err != nil || len(sm.hdr) == 0 {
3666+
for seq := state.LastSeq; seq >= state.FirstSeq; {
3667+
sm, err := mset.store.LoadPrevMsg(seq, &smv)
3668+
if err == ErrStoreEOF || err != nil {
3669+
break
3670+
}
3671+
seq = sm.seq - 1
3672+
if len(sm.hdr) == 0 {
36693673
continue
36703674
}
3675+
36713676
ss := getHeader(JSStreamSource, sm.hdr)
36723677
if len(ss) == 0 {
36733678
continue
36743679
}
3675-
streamName, indexName, sseq := streamAndSeq(string(ss))
3680+
streamName, indexName, sseq := streamAndSeq(bytesToString(ss))
36763681

36773682
if _, ok := iNames[indexName]; ok {
36783683
si := mset.sources[indexName]
@@ -3778,17 +3783,21 @@ func (mset *stream) startingSequenceForSources() {
37783783
}
37793784

37803785
var smv StoreMsg
3781-
for seq := state.LastSeq; seq >= state.FirstSeq; seq-- {
3782-
sm, err := mset.store.LoadMsg(seq, &smv)
3783-
if err != nil || sm == nil || len(sm.hdr) == 0 {
3786+
for seq := state.LastSeq; ; {
3787+
sm, err := mset.store.LoadPrevMsg(seq, &smv)
3788+
if err == ErrStoreEOF || err != nil {
3789+
break
3790+
}
3791+
seq = sm.seq - 1
3792+
if len(sm.hdr) == 0 {
37843793
continue
37853794
}
37863795
ss := getHeader(JSStreamSource, sm.hdr)
37873796
if len(ss) == 0 {
37883797
continue
37893798
}
37903799

3791-
streamName, iName, sseq := streamAndSeq(string(ss))
3800+
streamName, iName, sseq := streamAndSeq(bytesToString(ss))
37923801
if iName == _EMPTY_ { // Pre-2.10 message header means it's a match for any source using that stream name
37933802
for _, ssi := range mset.cfg.Sources {
37943803
if streamName == ssi.Name || (ssi.External != nil && streamName == ssi.Name+":"+getHash(ssi.External.ApiPrefix)) {

0 commit comments

Comments
 (0)