Skip to content

Commit c0709f1

Browse files
authored
Mark files stored in the PersistentCache as reused in RecoveryState (#67559)
Before this commit we were marking all files as recovered during searchable snapshots pre-warm phase. After introducing the persistent cache, it's possible that some of these files are already on disk and are in fact reused. This commit takes that fact into account and mark files that are in the persistent cache as reused. Backport of #67425
1 parent 8e6adba commit c0709f1

File tree

7 files changed

+203
-27
lines changed

7 files changed

+203
-27
lines changed

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -802,7 +802,7 @@ public boolean isComplete() {
802802
}
803803

804804
public static class Index extends Timer implements ToXContentFragment, Writeable {
805-
private final RecoveryFilesDetails fileDetails;
805+
protected final RecoveryFilesDetails fileDetails;
806806

807807
public static final long UNKNOWN = -1L;
808808

x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotRecoveryStateIntegrationTests.java

Lines changed: 152 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,30 +7,44 @@
77
package org.elasticsearch.xpack.searchablesnapshots;
88

99
import com.carrotsearch.hppc.ObjectContainer;
10-
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
1110
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
1211
import org.elasticsearch.cluster.metadata.IndexMetadata;
1312
import org.elasticsearch.cluster.node.DiscoveryNode;
1413
import org.elasticsearch.cluster.node.DiscoveryNodes;
15-
import org.elasticsearch.common.Strings;
14+
import org.elasticsearch.cluster.service.ClusterService;
1615
import org.elasticsearch.common.SuppressForbidden;
1716
import org.elasticsearch.common.settings.Settings;
1817
import org.elasticsearch.common.unit.ByteSizeUnit;
1918
import org.elasticsearch.common.unit.ByteSizeValue;
19+
import org.elasticsearch.common.util.BigArrays;
20+
import org.elasticsearch.common.util.CollectionUtils;
21+
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
22+
import org.elasticsearch.env.Environment;
2023
import org.elasticsearch.index.Index;
2124
import org.elasticsearch.index.IndexService;
25+
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
2226
import org.elasticsearch.indices.IndicesService;
27+
import org.elasticsearch.indices.recovery.RecoverySettings;
2328
import org.elasticsearch.indices.recovery.RecoveryState;
29+
import org.elasticsearch.plugins.Plugin;
30+
import org.elasticsearch.plugins.RepositoryPlugin;
31+
import org.elasticsearch.repositories.IndexId;
32+
import org.elasticsearch.repositories.RepositoriesService;
33+
import org.elasticsearch.repositories.Repository;
34+
import org.elasticsearch.repositories.RepositoryData;
35+
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
36+
import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
37+
import org.elasticsearch.repositories.fs.FsRepository;
2438
import org.elasticsearch.snapshots.SnapshotInfo;
2539
import org.elasticsearch.test.ESIntegTestCase;
2640
import org.elasticsearch.threadpool.ThreadPool;
27-
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
28-
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest;
2941
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
3042

3143
import java.io.File;
3244
import java.nio.file.Files;
3345
import java.nio.file.Path;
46+
import java.util.Collection;
47+
import java.util.Collections;
3448
import java.util.List;
3549
import java.util.Locale;
3650
import java.util.Map;
@@ -40,10 +54,16 @@
4054
import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING;
4155
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
4256
import static org.hamcrest.Matchers.equalTo;
57+
import static org.hamcrest.Matchers.instanceOf;
4358

4459
@ESIntegTestCase.ClusterScope(numDataNodes = 1)
4560
public class SearchableSnapshotRecoveryStateIntegrationTests extends BaseSearchableSnapshotsIntegTestCase {
4661

62+
@Override
63+
protected Collection<Class<? extends Plugin>> nodePlugins() {
64+
return CollectionUtils.appendToCopy(super.nodePlugins(), TestRepositoryPlugin.class);
65+
}
66+
4767
@Override
4868
protected Settings nodeSettings(int nodeOrdinal) {
4969
final Settings.Builder builder = Settings.builder().put(super.nodeSettings(nodeOrdinal));
@@ -70,18 +90,69 @@ public void testRecoveryStateRecoveredBytesMatchPhysicalCacheState() throws Exce
7090

7191
assertAcked(client().admin().indices().prepareDelete(indexName));
7292

73-
final MountSearchableSnapshotRequest req = new MountSearchableSnapshotRequest(
74-
restoredIndexName,
75-
fsRepoName,
76-
snapshotInfo.snapshotId().getName(),
77-
indexName,
78-
Settings.EMPTY,
79-
Strings.EMPTY_ARRAY,
80-
true
93+
mountSnapshot(fsRepoName, snapshotName, indexName, restoredIndexName, Settings.EMPTY);
94+
ensureGreen(restoredIndexName);
95+
96+
final Index restoredIndex = client().admin()
97+
.cluster()
98+
.prepareState()
99+
.clear()
100+
.setMetadata(true)
101+
.get()
102+
.getState()
103+
.metadata()
104+
.index(restoredIndexName)
105+
.getIndex();
106+
107+
assertExecutorIsIdle(SearchableSnapshotsConstants.CACHE_PREWARMING_THREAD_POOL_NAME);
108+
assertExecutorIsIdle(SearchableSnapshotsConstants.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
109+
110+
RecoveryState recoveryState = getRecoveryState(restoredIndexName);
111+
112+
assertThat(recoveryState.getStage(), equalTo(RecoveryState.Stage.DONE));
113+
114+
long recoveredBytes = recoveryState.getIndex().recoveredBytes();
115+
long physicalCacheSize = getPhysicalCacheSize(restoredIndex, snapshotInfo.snapshotId().getUUID());
116+
117+
assertThat("Physical cache size doesn't match with recovery state data", physicalCacheSize, equalTo(recoveredBytes));
118+
assertThat("Expected to recover 100% of files", recoveryState.getIndex().recoveredBytesPercent(), equalTo(100.0f));
119+
}
120+
121+
public void testFilesStoredInThePersistentCacheAreMarkedAsReusedInRecoveryState() throws Exception {
122+
final String fsRepoName = randomAlphaOfLength(10);
123+
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
124+
final String restoredIndexName = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
125+
final String snapshotName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
126+
127+
createRepository(fsRepoName, "test-fs");
128+
int numberOfShards = 1;
129+
130+
assertAcked(
131+
prepareCreate(
132+
indexName,
133+
Settings.builder()
134+
.put(INDEX_SOFT_DELETES_SETTING.getKey(), true)
135+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numberOfShards)
136+
)
81137
);
138+
ensureGreen(indexName);
139+
140+
final int documentCount = randomIntBetween(1000, 3000);
141+
populateIndex(indexName, documentCount);
142+
143+
final SnapshotInfo snapshotInfo = createFullSnapshot(fsRepoName, snapshotName);
82144

83-
final RestoreSnapshotResponse restoreSnapshotResponse = client().execute(MountSearchableSnapshotAction.INSTANCE, req).get();
84-
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
145+
assertAcked(client().admin().indices().prepareDelete(indexName));
146+
147+
mountSnapshot(fsRepoName, snapshotName, indexName, restoredIndexName, Settings.EMPTY);
148+
ensureGreen(restoredIndexName);
149+
assertBusy(() -> assertThat(getRecoveryState(restoredIndexName).getStage(), equalTo(RecoveryState.Stage.DONE)));
150+
151+
for (CacheService cacheService : internalCluster().getDataNodeInstances(CacheService.class)) {
152+
cacheService.synchronizeCache();
153+
}
154+
155+
internalCluster().restartRandomDataNode();
85156
ensureGreen(restoredIndexName);
86157

87158
final Index restoredIndex = client().admin()
@@ -98,22 +169,57 @@ public void testRecoveryStateRecoveredBytesMatchPhysicalCacheState() throws Exce
98169
assertExecutorIsIdle(SearchableSnapshotsConstants.CACHE_PREWARMING_THREAD_POOL_NAME);
99170
assertExecutorIsIdle(SearchableSnapshotsConstants.CACHE_FETCH_ASYNC_THREAD_POOL_NAME);
100171

101-
final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(restoredIndexName).get();
102-
Map<String, List<RecoveryState>> shardRecoveries = recoveryResponse.shardRecoveryStates();
103-
assertThat(shardRecoveries.containsKey(restoredIndexName), equalTo(true));
104-
List<RecoveryState> recoveryStates = shardRecoveries.get(restoredIndexName);
105-
assertThat(recoveryStates.size(), equalTo(1));
106-
RecoveryState recoveryState = recoveryStates.get(0);
172+
RecoveryState recoveryState = getRecoveryState(restoredIndexName);
107173

108174
assertThat(recoveryState.getStage(), equalTo(RecoveryState.Stage.DONE));
109175

110176
long recoveredBytes = recoveryState.getIndex().recoveredBytes();
111177
long physicalCacheSize = getPhysicalCacheSize(restoredIndex, snapshotInfo.snapshotId().getUUID());
112178

113-
assertThat("Physical cache size doesn't match with recovery state data", physicalCacheSize, equalTo(recoveredBytes));
179+
assertThat("Expected to reuse all data from the persistent cache but it didn't", 0L, equalTo(recoveredBytes));
180+
181+
final Repository repository = internalCluster().getDataNodeInstance(RepositoriesService.class).repository(fsRepoName);
182+
assertThat(repository, instanceOf(BlobStoreRepository.class));
183+
final BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
184+
185+
final RepositoryData repositoryData = ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository);
186+
final IndexId indexId = repositoryData.resolveIndexId(indexName);
187+
long inMemoryCacheSize = 0;
188+
long expectedPhysicalCacheSize = 0;
189+
for (int shardId = 0; shardId < numberOfShards; shardId++) {
190+
final BlobStoreIndexShardSnapshot snapshot = blobStoreRepository.loadShardSnapshot(
191+
blobStoreRepository.shardContainer(indexId, shardId),
192+
snapshotInfo.snapshotId()
193+
);
194+
inMemoryCacheSize += snapshot.indexFiles()
195+
.stream()
196+
.filter(f -> f.metadata().hashEqualsContents())
197+
.mapToLong(BlobStoreIndexShardSnapshot.FileInfo::length)
198+
.sum();
199+
200+
expectedPhysicalCacheSize += snapshot.indexFiles()
201+
.stream()
202+
.filter(f -> f.metadata().hashEqualsContents() == false)
203+
.mapToLong(BlobStoreIndexShardSnapshot.FileInfo::length)
204+
.sum();
205+
}
206+
207+
assertThat(physicalCacheSize, equalTo(expectedPhysicalCacheSize));
208+
assertThat(physicalCacheSize + inMemoryCacheSize, equalTo(recoveryState.getIndex().reusedBytes()));
114209
assertThat("Expected to recover 100% of files", recoveryState.getIndex().recoveredBytesPercent(), equalTo(100.0f));
115210

116-
assertAcked(client().admin().indices().prepareDelete(restoredIndexName));
211+
for (RecoveryState.FileDetail fileDetail : recoveryState.getIndex().fileDetails()) {
212+
assertThat(fileDetail.name() + " wasn't mark as reused", fileDetail.reused(), equalTo(true));
213+
}
214+
}
215+
216+
private RecoveryState getRecoveryState(String indexName) {
217+
final RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries(indexName).get();
218+
Map<String, List<RecoveryState>> shardRecoveries = recoveryResponse.shardRecoveryStates();
219+
assertThat(shardRecoveries.containsKey(indexName), equalTo(true));
220+
List<RecoveryState> recoveryStates = shardRecoveries.get(indexName);
221+
assertThat(recoveryStates.size(), equalTo(1));
222+
return recoveryStates.get(0);
117223
}
118224

119225
@SuppressForbidden(reason = "Uses FileSystem APIs")
@@ -148,4 +254,28 @@ private void assertExecutorIsIdle(String executorName) throws Exception {
148254
private DiscoveryNodes getDiscoveryNodes() {
149255
return client().admin().cluster().prepareState().clear().setNodes(true).get().getState().nodes();
150256
}
257+
258+
/**
259+
* A fs repository plugin that allows using its methods from any thread
260+
*/
261+
public static class TestRepositoryPlugin extends Plugin implements RepositoryPlugin {
262+
@Override
263+
public Map<String, Repository.Factory> getRepositories(
264+
Environment env,
265+
NamedXContentRegistry namedXContentRegistry,
266+
ClusterService clusterService,
267+
BigArrays bigArrays,
268+
RecoverySettings recoverySettings
269+
) {
270+
return Collections.singletonMap(
271+
"test-fs",
272+
(metadata) -> new FsRepository(metadata, env, namedXContentRegistry, clusterService, bigArrays, recoverySettings) {
273+
@Override
274+
protected void assertSnapshotOrGenericThread() {
275+
// ignore
276+
}
277+
}
278+
);
279+
}
280+
}
151281
}

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -480,8 +480,12 @@ private void prewarmCache(ActionListener<Void> listener) {
480480

481481
logger.trace("{} warming cache for [{}] part [{}/{}]", shardId, file.physicalName(), part + 1, numberOfParts);
482482
final long startTimeInNanos = statsCurrentTimeNanosSupplier.getAsLong();
483-
((CachedBlobContainerIndexInput) input).prefetchPart(part);
484-
recoveryState.getIndex().addRecoveredBytesToFile(file.physicalName(), file.partBytes(part));
483+
final long persistentCacheLength = ((CachedBlobContainerIndexInput) input).prefetchPart(part).v1();
484+
if (persistentCacheLength == file.length()) {
485+
recoveryState.markIndexFileAsReused(file.physicalName());
486+
} else {
487+
recoveryState.getIndex().addRecoveredBytesToFile(file.physicalName(), file.partBytes(part));
488+
}
485489

486490
logger.trace(
487491
() -> new ParameterizedMessage(

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,13 @@ SortedSet<Tuple<Long, Long>> getCompletedRanges() {
163163
return tracker.getCompletedRanges();
164164
}
165165

166+
/**
167+
* Number of bytes that were present on the persistent when this cache file was created
168+
*/
169+
public long getInitialLength() {
170+
return tracker.getInitialLength();
171+
}
172+
166173
public void acquire(final EvictionListener listener) throws IOException {
167174
assert listener != null;
168175

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -435,8 +435,9 @@ private boolean readChecksumFromFileInfo(ByteBuffer b) throws IOException {
435435

436436
/**
437437
* Prefetches a complete part and writes it in cache. This method is used to prewarm the cache.
438+
* @return a tuple with {@code Tuple<Persistent Cache Length, Prefetched Length>} values
438439
*/
439-
public void prefetchPart(final int part) throws IOException {
440+
public Tuple<Long, Long> prefetchPart(final int part) throws IOException {
440441
ensureContext(ctx -> ctx == CACHE_WARMING_CONTEXT);
441442
if (part >= fileInfo.numberOfParts()) {
442443
throw new IllegalArgumentException("Unexpected part number [" + part + "]");
@@ -456,7 +457,7 @@ public void prefetchPart(final int part) throws IOException {
456457
partRange.v2(),
457458
cacheFileReference
458459
);
459-
return;
460+
return Tuple.tuple(cacheFile.getInitialLength(), 0L);
460461
}
461462

462463
final long rangeStart = range.v1();
@@ -511,6 +512,7 @@ public void prefetchPart(final int part) throws IOException {
511512
stats.addCachedBytesWritten(totalBytesWritten.get(), endTimeNanos - startTimeNanos);
512513
}
513514
assert totalBytesRead == rangeLength;
515+
return Tuple.tuple(cacheFile.getInitialLength(), rangeLength);
514516
} catch (final Exception e) {
515517
throw new IOException("Failed to prefetch file part in cache", e);
516518
}

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/SparseFileTracker.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,12 @@ public class SparseFileTracker {
3737

3838
private final long length;
3939

40+
/**
41+
* Number of bytes that were initially present in the case where the sparse file tracker was initialized with some completed ranges.
42+
* See {@link #SparseFileTracker(String, long, SortedSet)}
43+
*/
44+
private final long initialLength;
45+
4046
/**
4147
* Creates a new empty {@link SparseFileTracker}
4248
*
@@ -60,6 +66,7 @@ public SparseFileTracker(String description, long length, SortedSet<Tuple<Long,
6066
if (length < 0) {
6167
throw new IllegalArgumentException("Length [" + length + "] must be equal to or greater than 0 for [" + description + "]");
6268
}
69+
long initialLength = 0;
6370
if (ranges.isEmpty() == false) {
6471
synchronized (mutex) {
6572
Range previous = null;
@@ -77,10 +84,12 @@ public SparseFileTracker(String description, long length, SortedSet<Tuple<Long,
7784
final boolean added = this.ranges.add(range);
7885
assert added : range + " already exist in " + this.ranges;
7986
previous = range;
87+
initialLength += range.end - range.start;
8088
}
8189
assert invariant();
8290
}
8391
}
92+
this.initialLength = initialLength;
8493
}
8594

8695
public long getLength() {
@@ -104,6 +113,15 @@ public SortedSet<Tuple<Long, Long>> getCompletedRanges() {
104113
return completedRanges == null ? Collections.emptySortedSet() : completedRanges;
105114
}
106115

116+
/**
117+
* Returns the number of bytes that were initially present in the case where the sparse file tracker was initialized with some
118+
* completed ranges.
119+
* See {@link #SparseFileTracker(String, long, SortedSet)}
120+
*/
121+
public long getInitialLength() {
122+
return initialLength;
123+
}
124+
107125
/**
108126
* @return the sum of the length of the ranges
109127
*/

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/indices/recovery/SearchableSnapshotRecoveryState.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@ public synchronized void ignoreFile(String name) {
6060
index.addFileToIgnore(name);
6161
}
6262

63+
public synchronized void markIndexFileAsReused(String name) {
64+
SearchableSnapshotRecoveryState.Index index = (Index) getIndex();
65+
index.markFileAsReused(name);
66+
}
67+
6368
private static final class Index extends RecoveryState.Index {
6469
// We ignore the files that won't be part of the pre-warming
6570
// phase since the information for those files won't be
@@ -86,6 +91,10 @@ public synchronized void addFileDetail(String name, long length, boolean reused)
8691
super.addFileDetail(name, length, reused);
8792
}
8893

94+
private synchronized void markFileAsReused(String name) {
95+
((SearchableSnapshotRecoveryFilesDetails) fileDetails).markFileAsReused(name);
96+
}
97+
8998
// We have to bypass all the calls to the timer
9099
@Override
91100
public synchronized void start() {}
@@ -120,6 +129,12 @@ public void addFileDetails(String name, long length, boolean reused) {
120129
+ "]";
121130
}
122131

132+
void markFileAsReused(String name) {
133+
final FileDetail fileDetail = fileDetails.get(name);
134+
assert fileDetail != null;
135+
fileDetails.put(name, new FileDetail(fileDetail.name(), fileDetail.length(), true));
136+
}
137+
123138
@Override
124139
public void clear() {
125140
// Since we don't want to remove the recovery information that might have been

0 commit comments

Comments
 (0)