Skip to content

Commit 4ab94ce

Browse files
committed
Add pre recovery hook
1 parent 32df1c4 commit 4ab94ce

File tree

7 files changed

+95
-4
lines changed

7 files changed

+95
-4
lines changed

server/src/main/java/org/elasticsearch/index/CompositeIndexEventListener.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,4 +257,17 @@ public void onStoreClosed(ShardId shardId) {
257257
}
258258
}
259259
}
260+
261+
@Override
262+
public void beforeIndexShardRecovery(final IndexShard indexShard, final IndexSettings indexSettings) {
263+
for (IndexEventListener listener : listeners) {
264+
try {
265+
listener.beforeIndexShardRecovery(indexShard, indexSettings);
266+
} catch (Exception e) {
267+
logger.warn(() -> new ParameterizedMessage("failed to invoke the listener before the shard recovery starts for {}",
268+
indexShard.shardId()), e);
269+
throw e;
270+
}
271+
}
272+
}
260273
}

server/src/main/java/org/elasticsearch/index/shard/IndexEventListener.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,4 +167,15 @@ default void onStoreCreated(ShardId shardId) {}
167167
* @param shardId the shard ID the store belongs to
168168
*/
169169
default void onStoreClosed(ShardId shardId) {}
170+
171+
/**
172+
* Called before the index shard starts to recover.
173+
* Note: unlike all other methods in this class, this method is not called using the cluster state update thread. When this method is
174+
* called the shard already transitioned to the RECOVERING state.
175+
*
176+
* @param indexShard the shard that is about to recover
177+
* @param indexSettings the shard's index settings
178+
*/
179+
default void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) {
180+
}
170181
}

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1313,6 +1313,13 @@ public void close(String reason, boolean flushEngine) throws IOException {
13131313
}
13141314
}
13151315

1316+
public void preRecovery() {
1317+
if (state != IndexShardState.RECOVERING) {
1318+
throw new IndexShardNotRecoveringException(shardId, state);
1319+
}
1320+
indexEventListener.beforeIndexShardRecovery(this, indexSettings);
1321+
}
1322+
13161323
public void postRecovery(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException {
13171324
synchronized (postRecoveryMutex) {
13181325
// we need to refresh again to expose all operations that were index until now. Otherwise

server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,7 @@ private ActionListener<Boolean> recoveryListener(IndexShard indexShard, ActionLi
359359
* Recovers the state of the shard from the store.
360360
*/
361361
private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRecoveryException {
362+
indexShard.preRecovery();
362363
final RecoveryState recoveryState = indexShard.recoveryState();
363364
final boolean indexShouldExists = recoveryState.getRecoverySource().getType() != RecoverySource.Type.EMPTY_STORE;
364365
indexShard.prepareForIndexRecovery();
@@ -449,6 +450,7 @@ private void addRecoveredFileDetails(SegmentInfos si, Store store, RecoveryState
449450
private void restore(IndexShard indexShard, Repository repository, SnapshotRecoverySource restoreSource,
450451
ActionListener<Boolean> listener) {
451452
logger.debug("restoring from {} ...", indexShard.recoveryState().getRecoverySource());
453+
indexShard.preRecovery();
452454
final RecoveryState.Translog translogState = indexShard.recoveryState().getTranslog();
453455
if (restoreSource == null) {
454456
listener.onFailure(new IndexShardRestoreFailedException(shardId, "empty restore source"));

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,11 +171,13 @@ private void doRecovery(final long recoveryId) {
171171
final RecoveryTarget recoveryTarget = recoveryRef.target();
172172
timer = recoveryTarget.state().getTimer();
173173
cancellableThreads = recoveryTarget.cancellableThreads();
174+
final IndexShard indexShard = recoveryTarget.indexShard();
174175
try {
176+
indexShard.preRecovery();
175177
assert recoveryTarget.sourceNode() != null : "can not do a recovery without a source node";
176178
logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId());
177-
recoveryTarget.indexShard().prepareForIndexRecovery();
178-
final long startingSeqNo = recoveryTarget.indexShard().recoverLocallyUpToGlobalCheckpoint();
179+
indexShard.prepareForIndexRecovery();
180+
final long startingSeqNo = indexShard.recoverLocallyUpToGlobalCheckpoint();
179181
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG :
180182
"unexpected recovery stage [" + recoveryTarget.state().getStage() + "] starting seqno [ " + startingSeqNo + "]";
181183
request = getStartRecoveryRequest(logger, clusterService.localNode(), recoveryTarget, startingSeqNo);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.searchablesnapshots;
7+
8+
import org.apache.lucene.index.SegmentInfos;
9+
import org.elasticsearch.index.IndexSettings;
10+
import org.elasticsearch.index.seqno.SequenceNumbers;
11+
import org.elasticsearch.index.shard.IndexEventListener;
12+
import org.elasticsearch.index.shard.IndexShard;
13+
import org.elasticsearch.index.shard.ShardId;
14+
import org.elasticsearch.index.translog.Translog;
15+
import org.elasticsearch.index.translog.TranslogException;
16+
import org.elasticsearch.threadpool.ThreadPool;
17+
18+
import java.nio.file.Path;
19+
20+
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.isSearchableSnapshotStore;
21+
22+
public class SearchableSnapshotIndexEventListener implements IndexEventListener {
23+
24+
@Override
25+
public void beforeIndexShardRecovery(IndexShard indexShard, IndexSettings indexSettings) {
26+
assert Thread.currentThread().getName().contains(ThreadPool.Names.GENERIC);
27+
associateNewEmptyTranslogWithIndex(indexShard);
28+
}
29+
30+
private static void associateNewEmptyTranslogWithIndex(IndexShard indexShard) {
31+
final ShardId shardId = indexShard.shardId();
32+
assert isSearchableSnapshotStore(indexShard.indexSettings().getSettings()) : "Expected a searchable snapshot shard " + shardId;
33+
try {
34+
final SegmentInfos segmentInfos = indexShard.store().readLastCommittedSegmentsInfo();
35+
final long localCheckpoint = Long.parseLong(segmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY));
36+
final long primaryTerm = indexShard.getPendingPrimaryTerm();
37+
final String translogUUID = segmentInfos.userData.get(Translog.TRANSLOG_UUID_KEY);
38+
final Path translogLocation = indexShard.shardPath().resolveTranslog();
39+
Translog.createEmptyTranslog(translogLocation, shardId, localCheckpoint, primaryTerm, translogUUID, null);
40+
} catch (Exception e) {
41+
throw new TranslogException(shardId, "failed to associate a new translog", e);
42+
}
43+
}
44+
}

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshots.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
2323
import org.elasticsearch.env.Environment;
2424
import org.elasticsearch.env.NodeEnvironment;
25+
import org.elasticsearch.index.IndexModule;
2526
import org.elasticsearch.index.IndexSettings;
2627
import org.elasticsearch.index.engine.EngineFactory;
2728
import org.elasticsearch.index.engine.ReadOnlyEngine;
@@ -40,8 +41,8 @@
4041
import org.elasticsearch.script.ScriptService;
4142
import org.elasticsearch.threadpool.ThreadPool;
4243
import org.elasticsearch.watcher.ResourceWatcherService;
43-
import org.elasticsearch.xpack.searchablesnapshots.action.ClearSearchableSnapshotsCacheAction;
4444
import org.elasticsearch.xpack.core.searchablesnapshots.MountSearchableSnapshotAction;
45+
import org.elasticsearch.xpack.searchablesnapshots.action.ClearSearchableSnapshotsCacheAction;
4546
import org.elasticsearch.xpack.searchablesnapshots.action.SearchableSnapshotsStatsAction;
4647
import org.elasticsearch.xpack.searchablesnapshots.action.TransportClearSearchableSnapshotsCacheAction;
4748
import org.elasticsearch.xpack.searchablesnapshots.action.TransportMountSearchableSnapshotAction;
@@ -125,6 +126,13 @@ public void onRepositoriesModule(RepositoriesModule repositoriesModule) {
125126
repositoriesService.set(repositoriesModule.getRepositoryService());
126127
}
127128

129+
@Override
130+
public void onIndexModule(IndexModule indexModule) {
131+
if (isSearchableSnapshotStore(indexModule.getSettings())) {
132+
indexModule.addIndexEventListener(new SearchableSnapshotIndexEventListener());
133+
}
134+
}
135+
128136
@Override
129137
public Map<String, DirectoryFactory> getDirectoryFactories() {
130138
return Map.of(SNAPSHOT_DIRECTORY_FACTORY_KEY, (indexSettings, shardPath) -> {
@@ -138,7 +146,7 @@ public Map<String, DirectoryFactory> getDirectoryFactories() {
138146

139147
@Override
140148
public Optional<EngineFactory> getEngineFactory(IndexSettings indexSettings) {
141-
if (SNAPSHOT_DIRECTORY_FACTORY_KEY.equals(INDEX_STORE_TYPE_SETTING.get(indexSettings.getSettings()))
149+
if (isSearchableSnapshotStore(indexSettings.getSettings())
142150
&& indexSettings.getSettings().getAsBoolean("index.frozen", false) == false) {
143151
return Optional.of(engineConfig -> new ReadOnlyEngine(engineConfig, null, new TranslogStats(), false, Function.identity()));
144152
}
@@ -169,5 +177,9 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
169177
public Map<String, ExistingShardsAllocator> getExistingShardsAllocators() {
170178
return Collections.singletonMap(SearchableSnapshotAllocator.ALLOCATOR_NAME, new SearchableSnapshotAllocator());
171179
}
180+
181+
static boolean isSearchableSnapshotStore(Settings indexSettings) {
182+
return SNAPSHOT_DIRECTORY_FACTORY_KEY.equals(INDEX_STORE_TYPE_SETTING.get(indexSettings));
183+
}
172184
}
173185

0 commit comments

Comments
 (0)