Skip to content

Commit d2bd9db

Browse files
authored
Make searchable snapshots cache persistent (#66275)
The searchable snapshots cache implemented in 7.10 is not persisted across node restarts, forcing data nodes to download files from the snapshot repository again once the node is restarted. This commit introduces a new Lucene index that is used to store information about cache files. The information about cache files are periodically updated and committed in this index as part of the cache synchronization task added in #64696. When the data node starts the Lucene index is used to load in memory the cache files information; these information are then used to repopulate the searchable snapshots cache with the cache files that exist on disk. Since data nodes can have one or more data paths, this change introduces a Lucene index per data path. Information about cache files are updated in the Lucene index located on the same data path of the cache files. Backport of #65725 for 7.11
1 parent 279165d commit d2bd9db

File tree

14 files changed

+1013
-175
lines changed

14 files changed

+1013
-175
lines changed
Lines changed: 47 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -4,88 +4,41 @@
44
* you may not use this file except in compliance with the Elastic License.
55
*/
66

7-
package org.elasticsearch.xpack.searchablesnapshots;
7+
package org.elasticsearch.xpack.searchablesnapshots.cache;
88

9-
import org.apache.lucene.mockfile.FilterFileSystemProvider;
10-
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
9+
import org.apache.lucene.document.Document;
1110
import org.elasticsearch.cluster.metadata.IndexMetadata;
1211
import org.elasticsearch.cluster.node.DiscoveryNode;
1312
import org.elasticsearch.cluster.node.DiscoveryNodes;
14-
import org.elasticsearch.common.Strings;
15-
import org.elasticsearch.common.io.PathUtils;
16-
import org.elasticsearch.common.io.PathUtilsForTesting;
1713
import org.elasticsearch.common.settings.Settings;
1814
import org.elasticsearch.common.unit.ByteSizeUnit;
1915
import org.elasticsearch.common.unit.ByteSizeValue;
2016
import org.elasticsearch.index.Index;
2117
import org.elasticsearch.index.IndexService;
18+
import org.elasticsearch.index.shard.ShardPath;
2219
import org.elasticsearch.indices.IndicesService;
2320
import org.elasticsearch.snapshots.SnapshotInfo;
2421
import org.elasticsearch.test.InternalTestCluster;
25-
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
26-
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotRequest;
27-
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
28-
import org.junit.AfterClass;
29-
import org.junit.BeforeClass;
22+
import org.elasticsearch.xpack.searchablesnapshots.BaseSearchableSnapshotsIntegTestCase;
3023

3124
import java.io.IOException;
3225
import java.nio.file.DirectoryStream;
33-
import java.nio.file.FileSystem;
3426
import java.nio.file.Files;
3527
import java.nio.file.Path;
3628
import java.util.HashSet;
3729
import java.util.Locale;
30+
import java.util.Map;
3831
import java.util.Set;
39-
import java.util.concurrent.atomic.AtomicBoolean;
4032

4133
import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_ROUTING_REQUIRE_GROUP_PREFIX;
4234
import static org.elasticsearch.index.IndexSettings.INDEX_SOFT_DELETES_SETTING;
4335
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
36+
import static org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache.resolveCacheIndexFolder;
4437
import static org.hamcrest.Matchers.equalTo;
4538
import static org.hamcrest.Matchers.greaterThan;
39+
import static org.hamcrest.Matchers.notNullValue;
4640

47-
public class SearchableSnapshotsCacheClearingIntegTests extends BaseSearchableSnapshotsIntegTestCase {
48-
49-
private static DeleteBlockingFileSystemProvider deleteBlockingFileSystemProvider;
50-
51-
@BeforeClass
52-
public static void installDeleteBlockingFileSystemProvider() {
53-
FileSystem current = PathUtils.getDefaultFileSystem();
54-
deleteBlockingFileSystemProvider = new DeleteBlockingFileSystemProvider(current);
55-
PathUtilsForTesting.installMock(deleteBlockingFileSystemProvider.getFileSystem(null));
56-
}
57-
58-
@AfterClass
59-
public static void removeDeleteBlockingFileSystemProvider() {
60-
PathUtilsForTesting.teardown();
61-
}
62-
63-
void startBlockingDeletes() {
64-
deleteBlockingFileSystemProvider.injectFailures.set(true);
65-
}
66-
67-
void stopBlockingDeletes() {
68-
deleteBlockingFileSystemProvider.injectFailures.set(false);
69-
}
70-
71-
private static class DeleteBlockingFileSystemProvider extends FilterFileSystemProvider {
72-
73-
AtomicBoolean injectFailures = new AtomicBoolean();
74-
75-
DeleteBlockingFileSystemProvider(FileSystem inner) {
76-
super("deleteblocking://", inner);
77-
}
78-
79-
@Override
80-
public boolean deleteIfExists(Path path) throws IOException {
81-
if (injectFailures.get()) {
82-
throw new IOException("blocked deletion of " + path);
83-
} else {
84-
return super.deleteIfExists(path);
85-
}
86-
}
87-
88-
}
41+
public class SearchableSnapshotsPersistentCacheIntegTests extends BaseSearchableSnapshotsIntegTestCase {
8942

9043
@Override
9144
protected Settings nodeSettings(int nodeOrdinal) {
@@ -96,7 +49,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
9649
.build();
9750
}
9851

99-
public void testCacheDirectoriesRemovedOnStartup() throws Exception {
52+
public void testCacheSurviveRestart() throws Exception {
10053
final String fsRepoName = randomAlphaOfLength(10);
10154
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
10255
final String restoredIndexName = randomBoolean() ? indexName : randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
@@ -117,18 +70,13 @@ public void testCacheDirectoriesRemovedOnStartup() throws Exception {
11770
final DiscoveryNodes discoveryNodes = client().admin().cluster().prepareState().clear().setNodes(true).get().getState().nodes();
11871
final String dataNode = randomFrom(discoveryNodes.getDataNodes().values().toArray(DiscoveryNode.class)).getName();
11972

120-
final MountSearchableSnapshotRequest req = new MountSearchableSnapshotRequest(
121-
restoredIndexName,
73+
mountSnapshot(
12274
fsRepoName,
12375
snapshotName,
12476
indexName,
125-
Settings.builder().put(INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._name", dataNode).build(),
126-
Strings.EMPTY_ARRAY,
127-
true
77+
restoredIndexName,
78+
Settings.builder().put(INDEX_ROUTING_REQUIRE_GROUP_PREFIX + "._name", dataNode).build()
12879
);
129-
130-
final RestoreSnapshotResponse restoreSnapshotResponse = client().execute(MountSearchableSnapshotAction.INSTANCE, req).get();
131-
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
13280
ensureGreen(restoredIndexName);
13381

13482
final Index restoredIndex = client().admin()
@@ -143,7 +91,9 @@ public void testCacheDirectoriesRemovedOnStartup() throws Exception {
14391
.getIndex();
14492

14593
final IndexService indexService = internalCluster().getInstance(IndicesService.class, dataNode).indexService(restoredIndex);
146-
final Path shardCachePath = CacheService.getShardCachePath(indexService.getShard(0).shardPath());
94+
final ShardPath shardPath = indexService.getShard(0).shardPath();
95+
final Path shardCachePath = CacheService.getShardCachePath(shardPath);
96+
14797
assertTrue(Files.isDirectory(shardCachePath));
14898
final Set<Path> cacheFiles = new HashSet<>();
14999
try (DirectoryStream<Path> snapshotCacheStream = Files.newDirectoryStream(shardCachePath)) {
@@ -159,25 +109,49 @@ public void testCacheDirectoriesRemovedOnStartup() throws Exception {
159109
}
160110
assertFalse("no cache files found", cacheFiles.isEmpty());
161111

162-
startBlockingDeletes();
112+
CacheService cacheService = internalCluster().getInstance(CacheService.class, dataNode);
113+
cacheService.synchronizeCache();
114+
115+
PersistentCache persistentCache = cacheService.getPersistentCache();
116+
assertThat(persistentCache.getNumDocs(), equalTo((long) cacheFiles.size()));
117+
163118
internalCluster().restartNode(dataNode, new InternalTestCluster.RestartCallback() {
164119
@Override
165120
public Settings onNodeStopped(String nodeName) {
166-
assertTrue(Files.isDirectory(shardCachePath));
167-
for (Path cacheFile : cacheFiles) {
168-
assertTrue(cacheFile + " should not have been cleaned up yet", Files.isRegularFile(cacheFile));
121+
try {
122+
assertTrue(Files.isDirectory(shardCachePath));
123+
124+
final Path persistentCacheIndexDir = resolveCacheIndexFolder(shardPath.getRootDataPath());
125+
assertTrue(Files.isDirectory(persistentCacheIndexDir));
126+
127+
final Map<String, Document> documents = PersistentCache.loadDocuments(persistentCacheIndexDir);
128+
assertThat(documents.size(), equalTo(cacheFiles.size()));
129+
130+
for (Path cacheFile : cacheFiles) {
131+
final String cacheFileName = cacheFile.getFileName().toString();
132+
assertTrue(cacheFileName + " should exist on disk", Files.isRegularFile(cacheFile));
133+
assertThat(cacheFileName + " should exist in persistent cache index", documents.get(cacheFileName), notNullValue());
134+
}
135+
} catch (IOException e) {
136+
throw new AssertionError(e);
169137
}
170-
stopBlockingDeletes();
171138
return Settings.EMPTY;
172139
}
173140
});
174141

142+
persistentCache = internalCluster().getInstance(CacheService.class, dataNode).getPersistentCache();
143+
assertThat(persistentCache.getNumDocs(), equalTo((long) cacheFiles.size()));
175144
ensureGreen(restoredIndexName);
176145

177-
for (Path cacheFile : cacheFiles) {
178-
assertFalse(cacheFile + " should have been cleaned up", Files.exists(cacheFile));
179-
}
146+
cacheFiles.forEach(cacheFile -> assertTrue(cacheFile + " should have survived node restart", Files.exists(cacheFile)));
180147

181148
assertAcked(client().admin().indices().prepareDelete(restoredIndexName));
149+
150+
assertBusy(() -> cacheFiles.forEach(cacheFile -> assertFalse(cacheFile + " should have been cleaned up", Files.exists(cacheFile))));
151+
cacheService = internalCluster().getInstance(CacheService.class, dataNode);
152+
cacheService.synchronizeCache();
153+
154+
persistentCache = cacheService.getPersistentCache();
155+
assertThat(persistentCache.getNumDocs(), equalTo(0L));
182156
}
183157
}

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CacheFile.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,11 @@ FileChannel getChannel() {
158158
return reference == null ? null : reference.fileChannel;
159159
}
160160

161+
// Only used in tests
162+
SortedSet<Tuple<Long, Long>> getCompletedRanges() {
163+
return tracker.getCompletedRanges();
164+
}
165+
161166
public void acquire(final EvictionListener listener) throws IOException {
162167
assert listener != null;
163168

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotIndexEventListener.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232

3333
import java.nio.file.Path;
3434

35+
import static org.elasticsearch.index.store.SearchableSnapshotDirectory.unwrapDirectory;
3536
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_ID_SETTING;
3637
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_INDEX_NAME_SETTING;
3738
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_SNAPSHOT_ID_SETTING;
@@ -49,6 +50,13 @@ public SearchableSnapshotIndexEventListener(Settings settings, @Nullable CacheSe
4950
this.cacheService = cacheService;
5051
}
5152

53+
/**
54+
* Called before a searchable snapshot {@link IndexShard} starts to recover. This event is used to trigger the loading of the shard
55+
* snapshot information that contains the list of shard's Lucene files.
56+
*
57+
* @param indexShard the shard that is about to recover
58+
* @param indexSettings the shard's index settings
59+
*/
5260
@Override
5361
public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) {
5462
assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC);
@@ -57,7 +65,7 @@ public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexS
5765
}
5866

5967
private static void ensureSnapshotIsLoaded(IndexShard indexShard) {
60-
final SearchableSnapshotDirectory directory = SearchableSnapshotDirectory.unwrapDirectory(indexShard.store().directory());
68+
final SearchableSnapshotDirectory directory = unwrapDirectory(indexShard.store().directory());
6169
assert directory != null;
6270
final StepListener<Void> preWarmListener = new StepListener<>();
6371
final boolean success = directory.loadSnapshot(indexShard.recoveryState(), preWarmListener);

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
import org.elasticsearch.xpack.searchablesnapshots.action.TransportRepositoryStatsAction;
6666
import org.elasticsearch.xpack.searchablesnapshots.action.TransportSearchableSnapshotsStatsAction;
6767
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
68-
import org.elasticsearch.xpack.searchablesnapshots.cache.NodeEnvironmentCacheCleaner;
68+
import org.elasticsearch.xpack.searchablesnapshots.cache.PersistentCache;
6969
import org.elasticsearch.xpack.searchablesnapshots.rest.RestClearSearchableSnapshotsCacheAction;
7070
import org.elasticsearch.xpack.searchablesnapshots.rest.RestMountSearchableSnapshotAction;
7171
import org.elasticsearch.xpack.searchablesnapshots.rest.RestRepositoryStatsAction;
@@ -218,12 +218,7 @@ public Collection<Object> createComponents(
218218
this.threadPool.set(threadPool);
219219
this.failShardsListener.set(new FailShardsOnInvalidLicenseClusterListener(getLicenseState(), clusterService.getRerouteService()));
220220
if (DiscoveryNode.isDataNode(settings)) {
221-
final CacheService cacheService = new CacheService(
222-
settings,
223-
clusterService,
224-
threadPool,
225-
new NodeEnvironmentCacheCleaner(nodeEnvironment)
226-
);
221+
final CacheService cacheService = new CacheService(settings, clusterService, threadPool, new PersistentCache(nodeEnvironment));
227222
this.cacheService.set(cacheService);
228223
components.add(cacheService);
229224
final BlobStoreCacheService blobStoreCacheService = new BlobStoreCacheService(
@@ -234,6 +229,8 @@ public Collection<Object> createComponents(
234229
);
235230
this.blobStoreCacheService.set(blobStoreCacheService);
236231
components.add(blobStoreCacheService);
232+
} else {
233+
PersistentCache.cleanUp(settings, nodeEnvironment);
237234
}
238235
return Collections.unmodifiableList(components);
239236
}

0 commit comments

Comments
 (0)