Skip to content

Cleanup Redundant Futures in Recovery Code #48805

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 30 additions & 41 deletions server/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.CheckedRunnable;
import org.elasticsearch.common.Nullable;
Expand Down Expand Up @@ -1787,34 +1788,38 @@ public ShardPath shardPath() {
return path;
}

public boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdateConsumer,
List<IndexShard> localShards) throws IOException {
public void recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdateConsumer, List<IndexShard> localShards,
ActionListener<Boolean> listener) throws IOException {
assert shardRouting.primary() : "recover from local shards only makes sense if the shard is a primary shard";
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS : "invalid recovery type: " +
recoveryState.getRecoverySource();
final List<LocalShardSnapshot> snapshots = new ArrayList<>();
final ActionListener<Boolean> recoveryListener = ActionListener.runBefore(listener, () -> IOUtils.close(snapshots));
boolean success = false;
try {
for (IndexShard shard : localShards) {
snapshots.add(new LocalShardSnapshot(shard));
}

// we are the first primary, recover from the gateway
// if its post api allocation, the index should exists
assert shardRouting.primary() : "recover from local shards only makes sense if the shard is a primary shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
return storeRecovery.recoverFromLocalShards(mappingUpdateConsumer, this, snapshots);
storeRecovery.recoverFromLocalShards(mappingUpdateConsumer, this, snapshots, recoveryListener);
success = true;
} finally {
IOUtils.close(snapshots);
if (success == false) {
IOUtils.close(snapshots);
}
}
}

public boolean recoverFromStore() {
public void recoverFromStore(ActionListener<Boolean> listener) {
// we are the first primary, recover from the gateway
// if its post api allocation, the index should exists
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
assert shardRouting.initializing() : "can only start recovery on initializing shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
return storeRecovery.recoverFromStore(this);
storeRecovery.recoverFromStore(this, listener);
}

public void restoreFromRepository(Repository repository, ActionListener<Boolean> listener) {
Expand Down Expand Up @@ -2484,17 +2489,7 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService
switch (recoveryState.getRecoverySource().getType()) {
case EMPTY_STORE:
case EXISTING_STORE:
markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(() -> {
try {
if (recoverFromStore()) {
recoveryListener.onRecoveryDone(recoveryState);
}
} catch (Exception e) {
recoveryListener.onRecoveryFailure(recoveryState,
new RecoveryFailedException(recoveryState, null, e), true);
}
});
executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore);
break;
case PEER:
try {
Expand All @@ -2507,17 +2502,9 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService
}
break;
case SNAPSHOT:
markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread
SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) recoveryState.getRecoverySource();
threadPool.generic().execute(
ActionRunnable.<Boolean>wrap(ActionListener.wrap(r -> {
if (r) {
recoveryListener.onRecoveryDone(recoveryState);
}
},
e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)),
restoreListener -> restoreFromRepository(
repositoriesService.repository(recoverySource.snapshot().getRepository()), restoreListener)));
final String repo = ((SnapshotRecoverySource) recoveryState.getRecoverySource()).snapshot().getRepository();
executeRecovery("from snapshot",
recoveryState, recoveryListener, l -> restoreFromRepository(repositoriesService.repository(repo), l));
break;
case LOCAL_SHARDS:
final IndexMetaData indexMetaData = indexSettings().getIndexMetaData();
Expand All @@ -2542,18 +2529,9 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService

if (numShards == startedShards.size()) {
assert requiredShards.isEmpty() == false;
markAsRecovering("from local shards", recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(() -> {
try {
if (recoverFromLocalShards(mappingUpdateConsumer, startedShards.stream()
.filter((s) -> requiredShards.contains(s.shardId())).collect(Collectors.toList()))) {
recoveryListener.onRecoveryDone(recoveryState);
}
} catch (Exception e) {
recoveryListener.onRecoveryFailure(recoveryState,
new RecoveryFailedException(recoveryState, null, e), true);
}
});
executeRecovery("from local shards", recoveryState, recoveryListener,
l -> recoverFromLocalShards(mappingUpdateConsumer,
startedShards.stream().filter((s) -> requiredShards.contains(s.shardId())).collect(Collectors.toList()), l));
} else {
final RuntimeException e;
if (numShards == -1) {
Expand All @@ -2571,6 +2549,17 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService
}
}

private void executeRecovery(String reason, RecoveryState recoveryState, PeerRecoveryTargetService.RecoveryListener recoveryListener,
CheckedConsumer<ActionListener<Boolean>, Exception> action) {
markAsRecovering(reason, recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(ActionRunnable.wrap(ActionListener.wrap(r -> {
if (r) {
recoveryListener.onRecoveryDone(recoveryState);
}
},
e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)), action));
}

/**
* Returns whether the shard is a relocated primary, i.e. not in charge anymore of replicating changes (see {@link ReplicationTracker}).
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.lucene.store.IndexInput;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.RecoverySource;
Expand Down Expand Up @@ -81,31 +80,27 @@ final class StoreRecovery {
* exist on disk ie. has been previously allocated or if the shard is a brand new allocation without pre-existing index
* files / transaction logs. This
* @param indexShard the index shard instance to recovery the shard into
* @return <code>true</code> if the shard has been recovered successfully, <code>false</code> if the recovery
* has been ignored due to a concurrent modification of if the clusters state has changed due to async updates.
* @param listener resolves to <code>true</code> if the shard has been recovered successfully, <code>false</code> if the recovery
* has been ignored due to a concurrent modification of if the clusters state has changed due to async updates.
* @see Store
*/
boolean recoverFromStore(final IndexShard indexShard) {
void recoverFromStore(final IndexShard indexShard, ActionListener<Boolean> listener) {
if (canRecover(indexShard)) {
RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType();
assert recoveryType == RecoverySource.Type.EMPTY_STORE || recoveryType == RecoverySource.Type.EXISTING_STORE :
"expected store recovery type but was: " + recoveryType;
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
final ActionListener<Boolean> recoveryListener = recoveryListener(indexShard, future);
try {
ActionListener.completeWith(recoveryListener(indexShard, listener), () -> {
logger.debug("starting recovery from store ...");
internalRecoverFromStore(indexShard);
recoveryListener.onResponse(true);
} catch (Exception e) {
recoveryListener.onFailure(e);
}
return future.actionGet();
return true;
});
} else {
listener.onResponse(false);
}
return false;
}

boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdateConsumer,
final IndexShard indexShard, final List<LocalShardSnapshot> shards) {
void recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdateConsumer, IndexShard indexShard,
List<LocalShardSnapshot> shards, ActionListener<Boolean> listener) {
if (canRecover(indexShard)) {
RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType();
assert recoveryType == RecoverySource.Type.LOCAL_SHARDS: "expected local shards recovery type: " + recoveryType;
Expand All @@ -125,8 +120,7 @@ boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdate
Sort indexSort = indexShard.getIndexSort();
final boolean hasNested = indexShard.mapperService().hasNested();
final boolean isSplit = sourceMetaData.getNumberOfShards() < indexShard.indexSettings().getNumberOfShards();
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
ActionListener.completeWith(recoveryListener(indexShard, future), () -> {
ActionListener.completeWith(recoveryListener(indexShard, listener), () -> {
logger.debug("starting recovery from local shards {}", shards);
try {
final Directory directory = indexShard.store().directory(); // don't close this directory!!
Expand All @@ -146,10 +140,9 @@ boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdate
throw new IndexShardRecoveryException(indexShard.shardId(), "failed to recover from local shards", ex);
}
});
assert future.isDone();
return future.actionGet();
} else {
listener.onResponse(false);
}
return false;
}

void addIndices(final RecoveryState.Index indexRecoveryStats, final Directory target, final Sort indexSort, final Directory[] sources,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
import static org.elasticsearch.index.shard.IndexShardTestCase.getTranslog;
import static org.elasticsearch.index.shard.IndexShardTestCase.recoverFromStore;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
Expand Down Expand Up @@ -641,7 +642,7 @@ public void testCircuitBreakerIncrementedByIndexShard() throws Exception {
public static final IndexShard recoverShard(IndexShard newShard) throws IOException {
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
assertTrue(newShard.recoverFromStore());
recoverFromStore(newShard);
IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted());
return newShard;
}
Expand Down
Loading