Skip to content

Commit 279165d

Browse files
Improve searchable snapshot mount time (elastic#66198)
Reduce the range sizes we fetch during mounting to speed up mount time until shard started. On resource constrained setups (rate limiter, disk or network), the time to mount multiple shards is proportional to the amount of data to fetch and for most files in a snapshot, we need to fetch only a small piece of the files to start the shard.
1 parent f2f04b2 commit 279165d

File tree

6 files changed

+81
-24
lines changed

6 files changed

+81
-24
lines changed

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -374,7 +374,14 @@ public IndexInput openInput(final String name, final IOContext context) throws I
374374

375375
final IndexInputStats inputStats = stats.computeIfAbsent(name, n -> createIndexInputStats(fileInfo.length()));
376376
if (useCache && isExcludedFromCache(name) == false) {
377-
return new CachedBlobContainerIndexInput(this, fileInfo, context, inputStats, cacheService.getRangeSize());
377+
return new CachedBlobContainerIndexInput(
378+
this,
379+
fileInfo,
380+
context,
381+
inputStats,
382+
cacheService.getRangeSize(),
383+
cacheService.getRecoveryRangeSize()
384+
);
378385
} else {
379386
return new DirectBlobContainerIndexInput(
380387
blobContainer(),
@@ -400,6 +407,13 @@ private boolean isExcludedFromCache(String name) {
400407
return ext != null && excludedFileTypes.contains(ext);
401408
}
402409

410+
public boolean isRecoveryFinalized() {
411+
SearchableSnapshotRecoveryState recoveryState = this.recoveryState;
412+
if (recoveryState == null) return false;
413+
RecoveryState.Stage stage = recoveryState.getStage();
414+
return stage == RecoveryState.Stage.DONE || stage == RecoveryState.Stage.FINALIZE;
415+
}
416+
403417
@Override
404418
public String toString() {
405419
return this.getClass().getSimpleName() + "@snapshotId=" + snapshotId + " lockFactory=" + lockFactory + " shard=" + shardId;

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
6262
private final SearchableSnapshotDirectory directory;
6363
private final CacheFileReference cacheFileReference;
6464
private final int defaultRangeSize;
65+
private final int recoveryRangeSize;
6566

6667
// last read position is kept around in order to detect (non)contiguous reads for stats
6768
private long lastReadPosition;
@@ -73,7 +74,8 @@ public CachedBlobContainerIndexInput(
7374
FileInfo fileInfo,
7475
IOContext context,
7576
IndexInputStats stats,
76-
int rangeSize
77+
int rangeSize,
78+
int recoveryRangeSize
7779
) {
7880
this(
7981
"CachedBlobContainerIndexInput(" + fileInfo.physicalName() + ")",
@@ -84,7 +86,8 @@ public CachedBlobContainerIndexInput(
8486
0L,
8587
fileInfo.length(),
8688
new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length()),
87-
rangeSize
89+
rangeSize,
90+
recoveryRangeSize
8891
);
8992
assert getBufferSize() <= BlobStoreCacheService.DEFAULT_CACHED_BLOB_SIZE; // must be able to cache at least one buffer's worth
9093
stats.incrementOpenCount();
@@ -99,14 +102,16 @@ private CachedBlobContainerIndexInput(
99102
long offset,
100103
long length,
101104
CacheFileReference cacheFileReference,
102-
int rangeSize
105+
int rangeSize,
106+
int recoveryRangeSize
103107
) {
104108
super(resourceDesc, directory.blobContainer(), fileInfo, context, stats, offset, length);
105109
this.directory = directory;
106110
this.cacheFileReference = cacheFileReference;
107111
this.lastReadPosition = this.offset;
108112
this.lastSeekPosition = this.offset;
109113
this.defaultRangeSize = rangeSize;
114+
this.recoveryRangeSize = recoveryRangeSize;
110115
}
111116

112117
@Override
@@ -124,7 +129,9 @@ private void ensureContext(Predicate<IOContext> predicate) throws IOException {
124129
}
125130

126131
private long getDefaultRangeSize() {
127-
return (context != CACHE_WARMING_CONTEXT) ? defaultRangeSize : fileInfo.partSize().getBytes();
132+
return (context != CACHE_WARMING_CONTEXT)
133+
? (directory.isRecoveryFinalized() ? defaultRangeSize : recoveryRangeSize)
134+
: fileInfo.partSize().getBytes();
128135
}
129136

130137
private Tuple<Long, Long> computeRange(long position) {
@@ -729,7 +736,8 @@ public IndexInput slice(String sliceDescription, long offset, long length) {
729736
this.offset + offset,
730737
length,
731738
cacheFileReference,
732-
defaultRangeSize
739+
defaultRangeSize,
740+
recoveryRangeSize
733741
);
734742
slice.isClone = true;
735743
return slice;

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheService.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,13 @@ public class CacheService extends AbstractLifecycleComponent {
7373

7474
public static final ByteSizeValue MIN_SNAPSHOT_CACHE_RANGE_SIZE = new ByteSizeValue(4, ByteSizeUnit.KB);
7575
public static final ByteSizeValue MAX_SNAPSHOT_CACHE_RANGE_SIZE = new ByteSizeValue(Integer.MAX_VALUE, ByteSizeUnit.BYTES);
76+
77+
/**
78+
* If a search needs data from the repository then we expand it to a larger contiguous range whose size is determined by this setting,
79+
* in anticipation of needing nearby data in subsequent reads. Repository reads typically have quite high latency (think ~100ms) and
80+
* the default of 32MB for this setting represents the approximate point at which size starts to matter. In other words, reads of
81+
* ranges smaller than 32MB don't usually happen much quicker, so we may as well expand all the way to 32MB ranges.
82+
*/
7683
public static final Setting<ByteSizeValue> SNAPSHOT_CACHE_RANGE_SIZE_SETTING = Setting.byteSizeSetting(
7784
SETTINGS_PREFIX + "range_size",
7885
new ByteSizeValue(32, ByteSizeUnit.MB), // default
@@ -81,6 +88,20 @@ public class CacheService extends AbstractLifecycleComponent {
8188
Setting.Property.NodeScope
8289
);
8390

91+
/**
92+
* Starting up a shard involves reading small parts of some files from the repository, independently of the pre-warming process. If we
93+
* expand those ranges using {@link CacheService#SNAPSHOT_CACHE_RANGE_SIZE_SETTING} then we end up reading quite a few 32MB ranges. If
94+
* we read enough of these ranges for the restore throttling rate limiter to kick in then all the read threads will end up waiting on
95+
* the throttle, blocking subsequent reads. By using a smaller read size during restore we avoid clogging up the rate limiter so much.
96+
*/
97+
public static final Setting<ByteSizeValue> SNAPSHOT_CACHE_RECOVERY_RANGE_SIZE_SETTING = Setting.byteSizeSetting(
98+
SETTINGS_PREFIX + "recovery_range_size",
99+
new ByteSizeValue(128, ByteSizeUnit.KB), // default
100+
MIN_SNAPSHOT_CACHE_RANGE_SIZE, // min
101+
MAX_SNAPSHOT_CACHE_RANGE_SIZE, // max
102+
Setting.Property.NodeScope
103+
);
104+
84105
public static final TimeValue MIN_SNAPSHOT_CACHE_SYNC_INTERVAL = TimeValue.timeValueSeconds(1L);
85106
public static final Setting<TimeValue> SNAPSHOT_CACHE_SYNC_INTERVAL_SETTING = Setting.timeSetting(
86107
SETTINGS_PREFIX + "sync.interval",
@@ -118,6 +139,7 @@ public class CacheService extends AbstractLifecycleComponent {
118139
private final ByteSizeValue cacheSize;
119140
private final Runnable cacheCleaner;
120141
private final ByteSizeValue rangeSize;
142+
private final ByteSizeValue recoveryRangeSize;
121143
private final KeyedLock<ShardEviction> shardsEvictionLock;
122144
private final Set<ShardEviction> evictedShards;
123145

@@ -133,6 +155,7 @@ public CacheService(
133155
this.cacheSize = SNAPSHOT_CACHE_SIZE_SETTING.get(settings);
134156
this.cacheCleaner = Objects.requireNonNull(cacheCleaner);
135157
this.rangeSize = SNAPSHOT_CACHE_RANGE_SIZE_SETTING.get(settings);
158+
this.recoveryRangeSize = SNAPSHOT_CACHE_RECOVERY_RANGE_SIZE_SETTING.get(settings);
136159
this.cache = CacheBuilder.<CacheKey, CacheFile>builder()
137160
.setMaximumWeight(cacheSize.getBytes())
138161
.weigher((key, entry) -> entry.getLength())
@@ -222,6 +245,13 @@ public int getRangeSize() {
222245
return toIntBytes(rangeSize.getBytes());
223246
}
224247

248+
/**
249+
* @return the cache range size (in bytes) to use during recovery (until post_recovery)
250+
*/
251+
public int getRecoveryRangeSize() {
252+
return toIntBytes(recoveryRangeSize.getBytes());
253+
}
254+
225255
/**
226256
* Retrieves the {@link CacheFile} instance associated with the specified {@link CacheKey} in the cache. If the key is not already
227257
* associated with a {@link CacheFile}, this method creates a new instance using the given file length and cache directory.

x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/SearchableSnapshotDirectoryTests.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -461,7 +461,7 @@ private void testDirectories(
461461
final boolean prewarmCache,
462462
final CheckedBiConsumer<Directory, SearchableSnapshotDirectory, Exception> consumer
463463
) throws Exception {
464-
testDirectories(enableCache, prewarmCache, createRecoveryState(), Settings.EMPTY, consumer);
464+
testDirectories(enableCache, prewarmCache, createRecoveryState(randomBoolean()), Settings.EMPTY, consumer);
465465
}
466466

467467
private void testDirectories(
@@ -715,7 +715,7 @@ public void testClearCache() throws Exception {
715715
threadPool
716716
)
717717
) {
718-
final RecoveryState recoveryState = createRecoveryState();
718+
final RecoveryState recoveryState = createRecoveryState(randomBoolean());
719719
final PlainActionFuture<Void> f = PlainActionFuture.newFuture();
720720
final boolean loaded = directory.loadSnapshot(recoveryState, f);
721721
f.get();
@@ -784,7 +784,7 @@ public void testRecoveryStateIsKeptOpenAfterPreWarmFailures() throws Exception {
784784
PathUtilsForTesting.installMock(fileSystem);
785785

786786
try {
787-
SearchableSnapshotRecoveryState recoveryState = createRecoveryState();
787+
SearchableSnapshotRecoveryState recoveryState = createRecoveryState(true);
788788
testDirectories(true, true, recoveryState, Settings.EMPTY, (directory, snapshotDirectory) -> {
789789
boolean areAllFilesReused = snapshotDirectory.snapshot()
790790
.indexFiles()
@@ -805,7 +805,7 @@ public void testRecoveryStateIsKeptOpenAfterPreWarmFailures() throws Exception {
805805
}
806806

807807
public void testRecoveryStateIsEmptyWhenTheCacheIsNotPreWarmed() throws Exception {
808-
SearchableSnapshotRecoveryState recoveryState = createRecoveryState();
808+
SearchableSnapshotRecoveryState recoveryState = createRecoveryState(true);
809809
testDirectories(true, false, recoveryState, Settings.EMPTY, (directory, snapshotDirectory) -> {
810810
assertThat(recoveryState.getStage(), equalTo(RecoveryState.Stage.DONE));
811811
assertThat(recoveryState.getIndex().recoveredBytes(), equalTo(0L));
@@ -814,7 +814,7 @@ public void testRecoveryStateIsEmptyWhenTheCacheIsNotPreWarmed() throws Exceptio
814814
}
815815

816816
public void testNonCachedFilesAreExcludedFromRecoveryState() throws Exception {
817-
SearchableSnapshotRecoveryState recoveryState = createRecoveryState();
817+
SearchableSnapshotRecoveryState recoveryState = createRecoveryState(true);
818818

819819
List<String> allFileExtensions = Arrays.asList(
820820
"fdt",
@@ -850,7 +850,7 @@ public void testNonCachedFilesAreExcludedFromRecoveryState() throws Exception {
850850
}
851851

852852
public void testFilesWithHashEqualsContentsAreMarkedAsReusedOnRecoveryState() throws Exception {
853-
SearchableSnapshotRecoveryState recoveryState = createRecoveryState();
853+
SearchableSnapshotRecoveryState recoveryState = createRecoveryState(true);
854854

855855
testDirectories(true, true, recoveryState, Settings.EMPTY, (directory, snapshotDirectory) -> {
856856
assertBusy(() -> assertTrue(recoveryState.isPreWarmComplete()));

x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInputTests.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,12 @@ public void testRandomReads() throws Exception {
8383
final BlobContainer singleBlobContainer = singleSplitBlobContainer(blobName, input, partSize);
8484
final BlobContainer blobContainer;
8585
if (input.length == partSize && input.length <= cacheService.getCacheSize() && prewarmEnabled == false) {
86-
blobContainer = new CountingBlobContainer(singleBlobContainer, cacheService.getRangeSize());
86+
blobContainer = new CountingBlobContainer(singleBlobContainer);
8787
} else {
8888
blobContainer = singleBlobContainer;
8989
}
9090

91+
final boolean recoveryFinalizedDone = randomBoolean();
9192
final Path shardDir;
9293
try {
9394
shardDir = new NodeEnvironment.NodePath(createTempDir()).resolve(shardId);
@@ -116,7 +117,7 @@ public void testRandomReads() throws Exception {
116117
threadPool
117118
)
118119
) {
119-
RecoveryState recoveryState = createRecoveryState();
120+
RecoveryState recoveryState = createRecoveryState(recoveryFinalizedDone);
120121
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
121122
final boolean loaded = directory.loadSnapshot(recoveryState, future);
122123
if (randomBoolean()) {
@@ -136,7 +137,10 @@ public void testRandomReads() throws Exception {
136137
}
137138

138139
if (blobContainer instanceof CountingBlobContainer) {
139-
long numberOfRanges = TestUtils.numberOfRanges(input.length, cacheService.getRangeSize());
140+
long numberOfRanges = TestUtils.numberOfRanges(
141+
input.length,
142+
recoveryFinalizedDone ? cacheService.getRangeSize() : cacheService.getRecoveryRangeSize()
143+
);
140144
assertThat(
141145
"Expected at most " + numberOfRanges + " ranges fetched from the source",
142146
((CountingBlobContainer) blobContainer).totalOpens.sum(),
@@ -211,7 +215,7 @@ public void testThrowsEOFException() throws Exception {
211215
threadPool
212216
)
213217
) {
214-
RecoveryState recoveryState = createRecoveryState();
218+
RecoveryState recoveryState = createRecoveryState(randomBoolean());
215219
final PlainActionFuture<Void> f = PlainActionFuture.newFuture();
216220
final boolean loaded = searchableSnapshotDirectory.loadSnapshot(recoveryState, f);
217221
try {
@@ -262,11 +266,8 @@ private static class CountingBlobContainer extends FilterBlobContainer {
262266

263267
private final AtomicInteger openStreams = new AtomicInteger(0);
264268

265-
private final int rangeSize;
266-
267-
CountingBlobContainer(BlobContainer in, int rangeSize) {
269+
CountingBlobContainer(BlobContainer in) {
268270
super(in);
269-
this.rangeSize = rangeSize;
270271
}
271272

272273
@Override
@@ -276,7 +277,7 @@ public InputStream readBlob(String blobName, long position, long length) throws
276277

277278
@Override
278279
protected BlobContainer wrapChild(BlobContainer child) {
279-
return new CountingBlobContainer(child, this.rangeSize);
280+
return new CountingBlobContainer(child);
280281
}
281282

282283
@Override

x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/AbstractSearchableSnapshotsTestCase.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ protected CacheService randomCacheService() {
9999
if (randomBoolean()) {
100100
cacheSettings.put(CacheService.SNAPSHOT_CACHE_RANGE_SIZE_SETTING.getKey(), randomCacheRangeSize());
101101
}
102+
if (randomBoolean()) {
103+
cacheSettings.put(CacheService.SNAPSHOT_CACHE_RECOVERY_RANGE_SIZE_SETTING.getKey(), randomCacheRangeSize());
104+
}
102105
if (randomBoolean()) {
103106
cacheSettings.put(
104107
CacheService.SNAPSHOT_CACHE_SYNC_INTERVAL_SETTING.getKey(),
@@ -142,7 +145,7 @@ protected static ByteSizeValue randomCacheRangeSize() {
142145
);
143146
}
144147

145-
protected static SearchableSnapshotRecoveryState createRecoveryState() {
148+
protected static SearchableSnapshotRecoveryState createRecoveryState(boolean finalizedDone) {
146149
ShardRouting shardRouting = TestShardRouting.newShardRouting(
147150
new ShardId(randomAlphaOfLength(10), randomAlphaOfLength(10), 0),
148151
randomAlphaOfLength(10),
@@ -163,8 +166,9 @@ protected static SearchableSnapshotRecoveryState createRecoveryState() {
163166
.setStage(RecoveryState.Stage.VERIFY_INDEX)
164167
.setStage(RecoveryState.Stage.TRANSLOG);
165168
recoveryState.getIndex().setFileDetailsComplete();
166-
recoveryState.setStage(RecoveryState.Stage.FINALIZE).setStage(RecoveryState.Stage.DONE);
167-
169+
if (finalizedDone) {
170+
recoveryState.setStage(RecoveryState.Stage.FINALIZE).setStage(RecoveryState.Stage.DONE);
171+
}
168172
return recoveryState;
169173
}
170174

0 commit comments

Comments
 (0)