Skip to content

Commit 0e47311

Browse files
authored
ILM make the set-single-node-allocation retryable (#52077)
1 parent 07c9770 commit 0e47311

File tree

3 files changed

+81
-13
lines changed

3 files changed

+81
-13
lines changed

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStep.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@
55
*/
66
package org.elasticsearch.xpack.core.ilm;
77

8-
import org.apache.log4j.LogManager;
9-
import org.apache.log4j.Logger;
8+
import org.apache.logging.log4j.LogManager;
9+
import org.apache.logging.log4j.Logger;
1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
1212
import org.elasticsearch.client.Client;
13+
import org.elasticsearch.client.transport.NoNodeAvailableException;
1314
import org.elasticsearch.cluster.ClusterState;
1415
import org.elasticsearch.cluster.ClusterStateObserver;
1516
import org.elasticsearch.cluster.metadata.IndexMetaData;
@@ -54,18 +55,23 @@ public SetSingleNodeAllocateStep(StepKey key, StepKey nextStepKey, Client client
5455
super(key, nextStepKey, client);
5556
}
5657

58+
@Override
59+
public boolean isRetryable() {
60+
return true;
61+
}
62+
5763
@Override
5864
public void performAction(IndexMetaData indexMetaData, ClusterState clusterState, ClusterStateObserver observer, Listener listener) {
5965
final RoutingNodes routingNodes = clusterState.getRoutingNodes();
6066
RoutingAllocation allocation = new RoutingAllocation(ALLOCATION_DECIDERS, routingNodes, clusterState, null,
6167
System.nanoTime());
6268
List<String> validNodeIds = new ArrayList<>();
69+
String indexName = indexMetaData.getIndex().getName();
6370
final Map<ShardId, List<ShardRouting>> routingsByShardId = clusterState.getRoutingTable()
64-
.allShards(indexMetaData.getIndex().getName())
71+
.allShards(indexName)
6572
.stream()
6673
.collect(Collectors.groupingBy(ShardRouting::shardId));
6774

68-
6975
if (routingsByShardId.isEmpty() == false) {
7076
for (RoutingNode node : routingNodes) {
7177
boolean canAllocateOneCopyOfEachShard = routingsByShardId.values().stream() // For each shard
@@ -79,21 +85,24 @@ public void performAction(IndexMetaData indexMetaData, ClusterState clusterState
7985
// Shuffle the list of nodes so the one we pick is random
8086
Randomness.shuffle(validNodeIds);
8187
Optional<String> nodeId = validNodeIds.stream().findAny();
88+
8289
if (nodeId.isPresent()) {
8390
Settings settings = Settings.builder()
8491
.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id", nodeId.get()).build();
85-
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexMetaData.getIndex().getName())
92+
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName)
8693
.masterNodeTimeout(getMasterTimeout(clusterState))
8794
.settings(settings);
8895
getClient().admin().indices().updateSettings(updateSettingsRequest,
8996
ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
9097
} else {
91-
// No nodes currently match the allocation rules so just wait until there is one that does
92-
logger.debug("could not find any nodes to allocate index [{}] onto prior to shrink");
93-
listener.onResponse(false);
98+
// No nodes currently match the allocation rules, so report this as an error and we'll retry
99+
logger.debug("could not find any nodes to allocate index [{}] onto prior to shrink", indexName);
100+
listener.onFailure(new NoNodeAvailableException("could not find any nodes to allocate index [" + indexName + "] onto" +
101+
" prior to shrink"));
94102
}
95103
} else {
96-
// There are no shards for the index, the index might be gone
104+
// There are no shards for the index, the index might be gone. Even though this is a retryable step ILM will not retry in
105+
// this case as we're using the periodic loop to trigger the retries and that is run over *existing* indices.
97106
listener.onFailure(new IndexNotFoundException(indexMetaData.getIndex()));
98107
}
99108
}

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ilm/SetSingleNodeAllocateStepTests.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
1212
import org.elasticsearch.action.support.master.AcknowledgedResponse;
13+
import org.elasticsearch.client.transport.NoNodeAvailableException;
1314
import org.elasticsearch.cluster.ClusterState;
1415
import org.elasticsearch.cluster.metadata.IndexMetaData;
1516
import org.elasticsearch.cluster.metadata.MetaData;
@@ -45,6 +46,7 @@
4546
import static org.hamcrest.Matchers.anyOf;
4647
import static org.hamcrest.Matchers.equalTo;
4748
import static org.hamcrest.Matchers.greaterThan;
49+
import static org.hamcrest.Matchers.instanceOf;
4850
import static org.hamcrest.Matchers.lessThanOrEqualTo;
4951

5052
public class SetSingleNodeAllocateStepTests extends AbstractStepTestCase<SetSingleNodeAllocateStep> {
@@ -563,22 +565,23 @@ private void assertNoValidNode(IndexMetaData indexMetaData, Index index, Discove
563565

564566
SetSingleNodeAllocateStep step = createRandomInstance();
565567

566-
SetOnce<Boolean> actionCompleted = new SetOnce<>();
568+
SetOnce<Exception> actionCompleted = new SetOnce<>();
567569

568570
step.performAction(indexMetaData, clusterState, null, new Listener() {
569571

570572
@Override
571573
public void onResponse(boolean complete) {
572-
actionCompleted.set(complete);
574+
throw new AssertionError("Unexpected method call");
573575
}
574576

575577
@Override
576578
public void onFailure(Exception e) {
577-
throw new AssertionError("Unexpected method call", e);
579+
actionCompleted.set(e);
578580
}
579581
});
580582

581-
assertEquals(false, actionCompleted.get());
583+
Exception failure = actionCompleted.get();
584+
assertThat(failure, instanceOf(NoNodeAvailableException.class));
582585

583586
Mockito.verifyZeroInteractions(client);
584587
}

x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/ilm/TimeSeriesLifecycleActionsIT.java

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.xpack.core.ilm.ReadOnlyAction;
4040
import org.elasticsearch.xpack.core.ilm.RolloverAction;
4141
import org.elasticsearch.xpack.core.ilm.SetPriorityAction;
42+
import org.elasticsearch.xpack.core.ilm.SetSingleNodeAllocateStep;
4243
import org.elasticsearch.xpack.core.ilm.ShrinkAction;
4344
import org.elasticsearch.xpack.core.ilm.ShrinkStep;
4445
import org.elasticsearch.xpack.core.ilm.Step;
@@ -585,6 +586,61 @@ public void testShrinkDuringSnapshot() throws Exception {
585586
assertOK(client().performRequest(new Request("DELETE", "/_snapshot/repo/snapshot")));
586587
}
587588

589+
public void testSetSingleNodeAllocationRetriesUntilItSucceeds() throws Exception {
590+
int numShards = 2;
591+
int expectedFinalShards = 1;
592+
String shrunkenIndex = ShrinkAction.SHRUNKEN_INDEX_PREFIX + index;
593+
createIndexWithSettings(index, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numShards)
594+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));
595+
596+
ensureGreen(index);
597+
598+
// unallocate all index shards
599+
Request setAllocationToMissingAttribute = new Request("PUT", "/" + index + "/_settings");
600+
setAllocationToMissingAttribute.setJsonEntity("{\n" +
601+
" \"settings\": {\n" +
602+
" \"index.routing.allocation.include.rack\": \"bogus_rack\"" +
603+
" }\n" +
604+
"}");
605+
client().performRequest(setAllocationToMissingAttribute);
606+
607+
ensureHealth(index, (request) -> {
608+
request.addParameter("wait_for_status", "red");
609+
request.addParameter("timeout", "70s");
610+
request.addParameter("level", "shards");
611+
});
612+
613+
// assign the policy that'll attempt to shrink the index
614+
createNewSingletonPolicy("warm", new ShrinkAction(expectedFinalShards));
615+
updatePolicy(index, policy);
616+
617+
assertTrue("ILM did not start retrying the set-single-node-allocation step", waitUntil(() -> {
618+
try {
619+
Map<String, Object> explainIndexResponse = explainIndex(index);
620+
if (explainIndexResponse == null) {
621+
return false;
622+
}
623+
String failedStep = (String) explainIndexResponse.get("failed_step");
624+
Integer retryCount = (Integer) explainIndexResponse.get(FAILED_STEP_RETRY_COUNT_FIELD);
625+
return failedStep != null && failedStep.equals(SetSingleNodeAllocateStep.NAME) && retryCount != null && retryCount >= 1;
626+
} catch (IOException e) {
627+
return false;
628+
}
629+
}, 30, TimeUnit.SECONDS));
630+
631+
Request resetAllocationForIndex = new Request("PUT", "/" + index + "/_settings");
632+
resetAllocationForIndex.setJsonEntity("{\n" +
633+
" \"settings\": {\n" +
634+
" \"index.routing.allocation.include.rack\": null" +
635+
" }\n" +
636+
"}");
637+
client().performRequest(resetAllocationForIndex);
638+
639+
assertBusy(() -> assertTrue(indexExists(shrunkenIndex)), 30, TimeUnit.SECONDS);
640+
assertBusy(() -> assertTrue(aliasExists(shrunkenIndex, index)));
641+
assertBusy(() -> assertThat(getStepKeyForIndex(shrunkenIndex), equalTo(PhaseCompleteStep.finalStep("warm").getKey())));
642+
}
643+
588644
public void testFreezeAction() throws Exception {
589645
createIndexWithSettings(index, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
590646
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));

0 commit comments

Comments
 (0)