Skip to content

Commit a22f6fb

Browse files
Cleanup Redundant Futures in Recovery Code (elastic#48805) (elastic#48832)
Follow up to elastic#48110 cleaning up the redundant future uses that were left over from that change.
1 parent 4c70770 commit a22f6fb

File tree

9 files changed

+74
-81
lines changed

9 files changed

+74
-81
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+30-41
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
5555
import org.elasticsearch.cluster.routing.ShardRouting;
5656
import org.elasticsearch.common.Booleans;
57+
import org.elasticsearch.common.CheckedConsumer;
5758
import org.elasticsearch.common.CheckedFunction;
5859
import org.elasticsearch.common.CheckedRunnable;
5960
import org.elasticsearch.common.Nullable;
@@ -1821,34 +1822,38 @@ public ShardPath shardPath() {
18211822
return path;
18221823
}
18231824

1824-
public boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdateConsumer,
1825-
List<IndexShard> localShards) throws IOException {
1825+
public void recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdateConsumer, List<IndexShard> localShards,
1826+
ActionListener<Boolean> listener) throws IOException {
18261827
assert shardRouting.primary() : "recover from local shards only makes sense if the shard is a primary shard";
18271828
assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS : "invalid recovery type: " +
18281829
recoveryState.getRecoverySource();
18291830
final List<LocalShardSnapshot> snapshots = new ArrayList<>();
1831+
final ActionListener<Boolean> recoveryListener = ActionListener.runBefore(listener, () -> IOUtils.close(snapshots));
1832+
boolean success = false;
18301833
try {
18311834
for (IndexShard shard : localShards) {
18321835
snapshots.add(new LocalShardSnapshot(shard));
18331836
}
1834-
18351837
// we are the first primary, recover from the gateway
18361838
// if its post api allocation, the index should exists
18371839
assert shardRouting.primary() : "recover from local shards only makes sense if the shard is a primary shard";
18381840
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
1839-
return storeRecovery.recoverFromLocalShards(mappingUpdateConsumer, this, snapshots);
1841+
storeRecovery.recoverFromLocalShards(mappingUpdateConsumer, this, snapshots, recoveryListener);
1842+
success = true;
18401843
} finally {
1841-
IOUtils.close(snapshots);
1844+
if (success == false) {
1845+
IOUtils.close(snapshots);
1846+
}
18421847
}
18431848
}
18441849

1845-
public boolean recoverFromStore() {
1850+
public void recoverFromStore(ActionListener<Boolean> listener) {
18461851
// we are the first primary, recover from the gateway
18471852
// if its post api allocation, the index should exists
18481853
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
18491854
assert shardRouting.initializing() : "can only start recovery on initializing shard";
18501855
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
1851-
return storeRecovery.recoverFromStore(this);
1856+
storeRecovery.recoverFromStore(this, listener);
18521857
}
18531858

18541859
public void restoreFromRepository(Repository repository, ActionListener<Boolean> listener) {
@@ -2520,17 +2525,7 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService
25202525
switch (recoveryState.getRecoverySource().getType()) {
25212526
case EMPTY_STORE:
25222527
case EXISTING_STORE:
2523-
markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread
2524-
threadPool.generic().execute(() -> {
2525-
try {
2526-
if (recoverFromStore()) {
2527-
recoveryListener.onRecoveryDone(recoveryState);
2528-
}
2529-
} catch (Exception e) {
2530-
recoveryListener.onRecoveryFailure(recoveryState,
2531-
new RecoveryFailedException(recoveryState, null, e), true);
2532-
}
2533-
});
2528+
executeRecovery("from store", recoveryState, recoveryListener, this::recoverFromStore);
25342529
break;
25352530
case PEER:
25362531
try {
@@ -2543,17 +2538,9 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService
25432538
}
25442539
break;
25452540
case SNAPSHOT:
2546-
markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread
2547-
SnapshotRecoverySource recoverySource = (SnapshotRecoverySource) recoveryState.getRecoverySource();
2548-
threadPool.generic().execute(
2549-
ActionRunnable.<Boolean>wrap(ActionListener.wrap(r -> {
2550-
if (r) {
2551-
recoveryListener.onRecoveryDone(recoveryState);
2552-
}
2553-
},
2554-
e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)),
2555-
restoreListener -> restoreFromRepository(
2556-
repositoriesService.repository(recoverySource.snapshot().getRepository()), restoreListener)));
2541+
final String repo = ((SnapshotRecoverySource) recoveryState.getRecoverySource()).snapshot().getRepository();
2542+
executeRecovery("from snapshot",
2543+
recoveryState, recoveryListener, l -> restoreFromRepository(repositoriesService.repository(repo), l));
25572544
break;
25582545
case LOCAL_SHARDS:
25592546
final IndexMetaData indexMetaData = indexSettings().getIndexMetaData();
@@ -2578,18 +2565,9 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService
25782565

25792566
if (numShards == startedShards.size()) {
25802567
assert requiredShards.isEmpty() == false;
2581-
markAsRecovering("from local shards", recoveryState); // mark the shard as recovering on the cluster state thread
2582-
threadPool.generic().execute(() -> {
2583-
try {
2584-
if (recoverFromLocalShards(mappingUpdateConsumer, startedShards.stream()
2585-
.filter((s) -> requiredShards.contains(s.shardId())).collect(Collectors.toList()))) {
2586-
recoveryListener.onRecoveryDone(recoveryState);
2587-
}
2588-
} catch (Exception e) {
2589-
recoveryListener.onRecoveryFailure(recoveryState,
2590-
new RecoveryFailedException(recoveryState, null, e), true);
2591-
}
2592-
});
2568+
executeRecovery("from local shards", recoveryState, recoveryListener,
2569+
l -> recoverFromLocalShards(mappingUpdateConsumer,
2570+
startedShards.stream().filter((s) -> requiredShards.contains(s.shardId())).collect(Collectors.toList()), l));
25932571
} else {
25942572
final RuntimeException e;
25952573
if (numShards == -1) {
@@ -2607,6 +2585,17 @@ public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService
26072585
}
26082586
}
26092587

2588+
private void executeRecovery(String reason, RecoveryState recoveryState, PeerRecoveryTargetService.RecoveryListener recoveryListener,
2589+
CheckedConsumer<ActionListener<Boolean>, Exception> action) {
2590+
markAsRecovering(reason, recoveryState); // mark the shard as recovering on the cluster state thread
2591+
threadPool.generic().execute(ActionRunnable.wrap(ActionListener.wrap(r -> {
2592+
if (r) {
2593+
recoveryListener.onRecoveryDone(recoveryState);
2594+
}
2595+
},
2596+
e -> recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)), action));
2597+
}
2598+
26102599
/**
26112600
* Returns whether the shard is a relocated primary, i.e. not in charge anymore of replicating changes (see {@link ReplicationTracker}).
26122601
*/

server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

+13-20
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.elasticsearch.ExceptionsHelper;
3434
import org.elasticsearch.Version;
3535
import org.elasticsearch.action.ActionListener;
36-
import org.elasticsearch.action.support.PlainActionFuture;
3736
import org.elasticsearch.cluster.metadata.IndexMetaData;
3837
import org.elasticsearch.cluster.metadata.MappingMetaData;
3938
import org.elasticsearch.cluster.routing.RecoverySource;
@@ -83,31 +82,27 @@ final class StoreRecovery {
8382
* exist on disk ie. has been previously allocated or if the shard is a brand new allocation without pre-existing index
8483
* files / transaction logs. This
8584
* @param indexShard the index shard instance to recovery the shard into
86-
* @return <code>true</code> if the shard has been recovered successfully, <code>false</code> if the recovery
87-
* has been ignored due to a concurrent modification of if the clusters state has changed due to async updates.
85+
* @param listener resolves to <code>true</code> if the shard has been recovered successfully, <code>false</code> if the recovery
86+
* has been ignored due to a concurrent modification of if the clusters state has changed due to async updates.
8887
* @see Store
8988
*/
90-
boolean recoverFromStore(final IndexShard indexShard) {
89+
void recoverFromStore(final IndexShard indexShard, ActionListener<Boolean> listener) {
9190
if (canRecover(indexShard)) {
9291
RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType();
9392
assert recoveryType == RecoverySource.Type.EMPTY_STORE || recoveryType == RecoverySource.Type.EXISTING_STORE :
9493
"expected store recovery type but was: " + recoveryType;
95-
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
96-
final ActionListener<Boolean> recoveryListener = recoveryListener(indexShard, future);
97-
try {
94+
ActionListener.completeWith(recoveryListener(indexShard, listener), () -> {
9895
logger.debug("starting recovery from store ...");
9996
internalRecoverFromStore(indexShard);
100-
recoveryListener.onResponse(true);
101-
} catch (Exception e) {
102-
recoveryListener.onFailure(e);
103-
}
104-
return future.actionGet();
97+
return true;
98+
});
99+
} else {
100+
listener.onResponse(false);
105101
}
106-
return false;
107102
}
108103

109-
boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdateConsumer,
110-
final IndexShard indexShard, final List<LocalShardSnapshot> shards) {
104+
void recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdateConsumer, IndexShard indexShard,
105+
List<LocalShardSnapshot> shards, ActionListener<Boolean> listener) {
111106
if (canRecover(indexShard)) {
112107
RecoverySource.Type recoveryType = indexShard.recoveryState().getRecoverySource().getType();
113108
assert recoveryType == RecoverySource.Type.LOCAL_SHARDS: "expected local shards recovery type: " + recoveryType;
@@ -129,8 +124,7 @@ boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdate
129124
final boolean isSplit = sourceMetaData.getNumberOfShards() < indexShard.indexSettings().getNumberOfShards();
130125
assert isSplit == false || sourceMetaData.getCreationVersion().onOrAfter(Version.V_6_0_0_alpha1) : "for split we require a " +
131126
"single type but the index is created before 6.0.0";
132-
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
133-
ActionListener.completeWith(recoveryListener(indexShard, future), () -> {
127+
ActionListener.completeWith(recoveryListener(indexShard, listener), () -> {
134128
logger.debug("starting recovery from local shards {}", shards);
135129
try {
136130
final Directory directory = indexShard.store().directory(); // don't close this directory!!
@@ -150,10 +144,9 @@ boolean recoverFromLocalShards(BiConsumer<String, MappingMetaData> mappingUpdate
150144
throw new IndexShardRecoveryException(indexShard.shardId(), "failed to recover from local shards", ex);
151145
}
152146
});
153-
assert future.isDone();
154-
return future.actionGet();
147+
} else {
148+
listener.onResponse(false);
155149
}
156-
return false;
157150
}
158151

159152
void addIndices(final RecoveryState.Index indexRecoveryStats, final Directory target, final Sort indexSort, final Directory[] sources,

server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@
119119
import static org.elasticsearch.index.seqno.SequenceNumbers.NO_OPS_PERFORMED;
120120
import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
121121
import static org.elasticsearch.index.shard.IndexShardTestCase.getTranslog;
122+
import static org.elasticsearch.index.shard.IndexShardTestCase.recoverFromStore;
122123
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
123124
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
124125
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
@@ -642,7 +643,7 @@ public void testCircuitBreakerIncrementedByIndexShard() throws Exception {
642643
public static final IndexShard recoverShard(IndexShard newShard) throws IOException {
643644
DiscoveryNode localNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
644645
newShard.markAsRecovering("store", new RecoveryState(newShard.routingEntry(), localNode, null));
645-
assertTrue(newShard.recoverFromStore());
646+
recoverFromStore(newShard);
646647
IndexShardTestCase.updateRoutingEntry(newShard, newShard.routingEntry().moveToStarted());
647648
return newShard;
648649
}

0 commit comments

Comments
 (0)