Skip to content

Commit 4fd7a22

Browse files
Allow cluster access during node restart (#42946)
This commit modifies InternalTestCluster to allow using client() and other operations inside a RestartCallback (onStoppedNode typically). Restarting nodes are now removed from the map and thus all methods now return the state as if the restarting node does not exist. This avoids various exceptions stemming from accessing the stopped node(s).
1 parent f828c77 commit 4fd7a22

File tree

3 files changed

+52
-40
lines changed

3 files changed

+52
-40
lines changed

server/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.cluster.routing.RoutingTable;
4040
import org.elasticsearch.cluster.routing.ShardRoutingState;
4141
import org.elasticsearch.cluster.routing.UnassignedInfo;
42+
import org.elasticsearch.common.CheckedConsumer;
4243
import org.elasticsearch.common.Priority;
4344
import org.elasticsearch.common.settings.Settings;
4445
import org.elasticsearch.common.xcontent.XContentFactory;
@@ -55,7 +56,11 @@
5556

5657
import java.io.IOException;
5758
import java.util.List;
59+
import java.util.Map;
5860
import java.util.concurrent.TimeUnit;
61+
import java.util.function.Function;
62+
import java.util.stream.Collectors;
63+
import java.util.stream.Stream;
5964

6065
import static org.elasticsearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
6166
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
@@ -369,14 +374,7 @@ public void testRecoverBrokenIndexMetadata() throws Exception {
369374
// this one is not validated ahead of time and breaks allocation
370375
.put("index.analysis.filter.myCollator.type", "icu_collation")
371376
).build();
372-
internalCluster().fullRestart(new RestartCallback(){
373-
@Override
374-
public Settings onNodeStopped(String nodeName) throws Exception {
375-
final MetaStateService metaStateService = internalCluster().getInstance(MetaStateService.class, nodeName);
376-
metaStateService.writeIndexAndUpdateManifest("broken metadata", brokenMeta);
377-
return super.onNodeStopped(nodeName);
378-
}
379-
});
377+
writeBrokenMeta(metaStateService -> metaStateService.writeIndexAndUpdateManifest("broken metadata", brokenMeta));
380378

381379
// check that the cluster does not keep reallocating shards
382380
assertBusy(() -> {
@@ -443,14 +441,7 @@ public void testRecoverMissingAnalyzer() throws Exception {
443441
final IndexMetaData metaData = state.getMetaData().index("test");
444442
final IndexMetaData brokenMeta = IndexMetaData.builder(metaData).settings(metaData.getSettings()
445443
.filter((s) -> "index.analysis.analyzer.test.tokenizer".equals(s) == false)).build();
446-
internalCluster().fullRestart(new RestartCallback(){
447-
@Override
448-
public Settings onNodeStopped(String nodeName) throws Exception {
449-
final MetaStateService metaStateService = internalCluster().getInstance(MetaStateService.class, nodeName);
450-
metaStateService.writeIndexAndUpdateManifest("broken metadata", brokenMeta);
451-
return super.onNodeStopped(nodeName);
452-
}
453-
});
444+
writeBrokenMeta(metaStateService -> metaStateService.writeIndexAndUpdateManifest("broken metadata", brokenMeta));
454445

455446
// check that the cluster does not keep reallocating shards
456447
assertBusy(() -> {
@@ -495,14 +486,7 @@ public void testArchiveBrokenClusterSettings() throws Exception {
495486
final MetaData brokenMeta = MetaData.builder(metaData).persistentSettings(Settings.builder()
496487
.put(metaData.persistentSettings()).put("this.is.unknown", true)
497488
.put(MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), "broken").build()).build();
498-
internalCluster().fullRestart(new RestartCallback(){
499-
@Override
500-
public Settings onNodeStopped(String nodeName) throws Exception {
501-
final MetaStateService metaStateService = internalCluster().getInstance(MetaStateService.class, nodeName);
502-
metaStateService.writeGlobalStateAndUpdateManifest("broken metadata", brokenMeta);
503-
return super.onNodeStopped(nodeName);
504-
}
505-
});
489+
writeBrokenMeta(metaStateService -> metaStateService.writeGlobalStateAndUpdateManifest("broken metadata", brokenMeta));
506490

507491
ensureYellow("test"); // wait for state recovery
508492
state = client().admin().cluster().prepareState().get().getState();
@@ -519,4 +503,17 @@ public Settings onNodeStopped(String nodeName) throws Exception {
519503
+ MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey()));
520504
assertHitCount(client().prepareSearch().setQuery(matchAllQuery()).get(), 1L);
521505
}
506+
507+
private void writeBrokenMeta(CheckedConsumer<MetaStateService, IOException> writer) throws Exception {
508+
Map<String, MetaStateService> metaStateServices = Stream.of(internalCluster().getNodeNames())
509+
.collect(Collectors.toMap(Function.identity(), nodeName -> internalCluster().getInstance(MetaStateService.class, nodeName)));
510+
internalCluster().fullRestart(new RestartCallback(){
511+
@Override
512+
public Settings onNodeStopped(String nodeName) throws Exception {
513+
final MetaStateService metaStateService = metaStateServices.get(nodeName);
514+
writer.accept(metaStateService);
515+
return super.onNodeStopped(nodeName);
516+
}
517+
});
518+
}
522519
}

test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,6 @@
137137
import java.util.stream.IntStream;
138138
import java.util.stream.Stream;
139139

140-
import static java.util.Collections.emptyList;
141140
import static org.apache.lucene.util.LuceneTestCase.TEST_NIGHTLY;
142141
import static org.apache.lucene.util.LuceneTestCase.rarely;
143142
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING;
@@ -889,6 +888,7 @@ void startNode() {
889888
Settings closeForRestart(RestartCallback callback) throws Exception {
890889
assert callback != null;
891890
close();
891+
removeNode(this);
892892
Settings callbackSettings = callback.onNodeStopped(name);
893893
assert callbackSettings != null;
894894
Settings.Builder newSettings = Settings.builder();
@@ -1648,20 +1648,9 @@ private void restartNode(NodeAndClient nodeAndClient, RestartCallback callback)
16481648
final Settings newSettings = nodeAndClient.closeForRestart(callback);
16491649
removeExclusions(excludedNodeIds);
16501650

1651-
boolean success = false;
1652-
try {
1653-
nodeAndClient.recreateNode(newSettings, () -> rebuildUnicastHostFiles(emptyList()));
1654-
nodeAndClient.startNode();
1655-
success = true;
1656-
} finally {
1657-
if (success == false) {
1658-
removeNode(nodeAndClient);
1659-
}
1660-
}
1661-
1662-
if (activeDisruptionScheme != null) {
1663-
activeDisruptionScheme.applyToNode(nodeAndClient.name, this);
1664-
}
1651+
nodeAndClient.recreateNode(newSettings, () -> rebuildUnicastHostFiles(Collections.singletonList(nodeAndClient)));
1652+
nodeAndClient.startNode();
1653+
publishNode(nodeAndClient);
16651654

16661655
if (callback.validateClusterForming() || excludedNodeIds.isEmpty() == false) {
16671656
// we have to validate cluster size to ensure that the restarted node has rejoined the cluster if it was master-eligible;
@@ -1728,6 +1717,7 @@ public synchronized void fullRestart(RestartCallback callback) throws Exception
17281717
final Settings[] newNodeSettings = new Settings[nextNodeId.get()];
17291718
Map<Set<DiscoveryNodeRole>, List<NodeAndClient>> nodesByRoles = new HashMap<>();
17301719
Set[] rolesOrderedByOriginalStartupOrder = new Set[nextNodeId.get()];
1720+
final int nodeCount = nodes.size();
17311721
for (NodeAndClient nodeAndClient : nodes.values()) {
17321722
callback.doAfterNodes(numNodesRestarted++, nodeAndClient.nodeClient());
17331723
logger.info("Stopping and resetting node [{}] ", nodeAndClient.name);
@@ -1741,7 +1731,7 @@ public synchronized void fullRestart(RestartCallback callback) throws Exception
17411731
nodesByRoles.computeIfAbsent(discoveryNode.getRoles(), k -> new ArrayList<>()).add(nodeAndClient);
17421732
}
17431733

1744-
assert nodesByRoles.values().stream().mapToInt(List::size).sum() == nodes.size();
1734+
assert nodesByRoles.values().stream().mapToInt(List::size).sum() == nodeCount;
17451735

17461736
// randomize start up order, but making sure that:
17471737
// 1) A data folder that was assigned to a data node will stay so

test/framework/src/test/java/org/elasticsearch/test/test/InternalTestClusterIT.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818
*/
1919
package org.elasticsearch.test.test;
2020

21+
import org.elasticsearch.client.node.NodeClient;
22+
import org.elasticsearch.common.settings.Settings;
2123
import org.elasticsearch.test.ESIntegTestCase;
2224
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
25+
import org.elasticsearch.test.InternalTestCluster;
2326

2427
import java.io.IOException;
2528

@@ -61,4 +64,26 @@ public void testStoppingNodesOneByOne() throws IOException {
6164

6265
ensureGreen();
6366
}
67+
68+
public void testOperationsDuringRestart() throws Exception {
69+
internalCluster().startMasterOnlyNode();
70+
internalCluster().startDataOnlyNodes(2);
71+
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
72+
@Override
73+
public Settings onNodeStopped(String nodeName) throws Exception {
74+
ensureGreen();
75+
internalCluster().validateClusterFormed();
76+
assertNotNull(internalCluster().getInstance(NodeClient.class));
77+
internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() {
78+
@Override
79+
public Settings onNodeStopped(String nodeName) throws Exception {
80+
ensureGreen();
81+
internalCluster().validateClusterFormed();
82+
return super.onNodeStopped(nodeName);
83+
}
84+
});
85+
return super.onNodeStopped(nodeName);
86+
}
87+
});
88+
}
6489
}

0 commit comments

Comments
 (0)