Skip to content

Commit 727004c

Browse files
committed
Reset state recovery after successful recovery (#42576)
The problem this commit addresses is that state recovery is not reset on a node that then becomes master with a cluster state that has a state not recovered flag in it. The situation that was observed in a failed test run of MinimumMasterNodesIT.testThreeNodesNoMasterBlock (see below) is that we have 3 master nodes (node_t0, node_t1, node_t2), two of them are shut down (node_t2 remains), when the first one comes back (renamed to node_t4) it becomes leader in term 2 and sends state (with state_not_recovered_block) to node_t2, which accepts. node_t2 becomes leader in term 3, and as it was previously leader in term1 and successfully completed state recovery, does never retry state recovery in term 3. Closes #39172
1 parent 1e05a8b commit 727004c

File tree

4 files changed

+67
-9
lines changed

4 files changed

+67
-9
lines changed

server/src/main/java/org/elasticsearch/cluster/ClusterState.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,8 @@ public RoutingNodes getRoutingNodes() {
325325
public String toString() {
326326
StringBuilder sb = new StringBuilder();
327327
final String TAB = " ";
328-
sb.append("cluster uuid: ").append(metaData.clusterUUID()).append("\n");
328+
sb.append("cluster uuid: ").append(metaData.clusterUUID())
329+
.append(" [committed: ").append(metaData.clusterUUIDCommitted()).append("]").append("\n");
329330
sb.append("version: ").append(version).append("\n");
330331
sb.append("state uuid: ").append(stateUUID).append("\n");
331332
sb.append("from_diff: ").append(wasReadFromDiff).append("\n");

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -729,7 +729,6 @@ public void invariant() {
729729
assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState();
730730
assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState();
731731
assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID);
732-
assert applierState.nodes().getMasterNodeId() == null || applierState.metaData().clusterUUIDCommitted();
733732
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse())
734733
: preVoteCollector + " vs " + getPreVoteResponse();
735734

server/src/main/java/org/elasticsearch/gateway/GatewayService.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public class GatewayService extends AbstractLifecycleComponent implements Cluste
8787

8888
private final Runnable recoveryRunnable;
8989

90-
private final AtomicBoolean recovered = new AtomicBoolean();
90+
private final AtomicBoolean recoveryInProgress = new AtomicBoolean();
9191
private final AtomicBoolean scheduledRecovery = new AtomicBoolean();
9292

9393
@Inject
@@ -215,15 +215,15 @@ public void onFailure(Exception e) {
215215

216216
@Override
217217
protected void doRun() {
218-
if (recovered.compareAndSet(false, true)) {
218+
if (recoveryInProgress.compareAndSet(false, true)) {
219219
logger.info("recover_after_time [{}] elapsed. performing state recovery...", recoverAfterTime);
220220
recoveryRunnable.run();
221221
}
222222
}
223223
}, recoverAfterTime, ThreadPool.Names.GENERIC);
224224
}
225225
} else {
226-
if (recovered.compareAndSet(false, true)) {
226+
if (recoveryInProgress.compareAndSet(false, true)) {
227227
threadPool.generic().execute(new AbstractRunnable() {
228228
@Override
229229
public void onFailure(final Exception e) {
@@ -241,7 +241,7 @@ protected void doRun() {
241241
}
242242

243243
private void resetRecoveredFlags() {
244-
recovered.set(false);
244+
recoveryInProgress.set(false);
245245
scheduledRecovery.set(false);
246246
}
247247

@@ -260,6 +260,9 @@ public ClusterState execute(final ClusterState currentState) {
260260
@Override
261261
public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) {
262262
logger.info("recovered [{}] indices into cluster_state", newState.metaData().indices().size());
263+
// reset flag even though state recovery completed, to ensure that if we subsequently become leader again based on a
264+
// not-recovered state, that we again do another state recovery.
265+
resetRecoveredFlags();
263266
}
264267

265268
@Override

server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import org.elasticsearch.cluster.metadata.MetaData;
4646
import org.elasticsearch.cluster.node.DiscoveryNode;
4747
import org.elasticsearch.cluster.node.DiscoveryNode.Role;
48+
import org.elasticsearch.cluster.routing.allocation.AllocationService;
4849
import org.elasticsearch.cluster.service.ClusterApplierService;
4950
import org.elasticsearch.cluster.service.ClusterService;
5051
import org.elasticsearch.common.Nullable;
@@ -70,6 +71,8 @@
7071
import org.elasticsearch.discovery.SeedHostsProvider.HostsResolver;
7172
import org.elasticsearch.discovery.zen.PublishClusterStateStats;
7273
import org.elasticsearch.env.NodeEnvironment;
74+
import org.elasticsearch.gateway.ClusterStateUpdaters;
75+
import org.elasticsearch.gateway.GatewayService;
7376
import org.elasticsearch.gateway.MetaStateService;
7477
import org.elasticsearch.gateway.MockGatewayMetaState;
7578
import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
@@ -131,6 +134,7 @@
131134
import static org.elasticsearch.cluster.coordination.NoMasterBlockService.NO_MASTER_BLOCK_WRITES;
132135
import static org.elasticsearch.cluster.coordination.Reconfigurator.CLUSTER_AUTO_SHRINK_VOTING_CONFIGURATION;
133136
import static org.elasticsearch.discovery.PeerFinder.DISCOVERY_FIND_PEERS_INTERVAL_SETTING;
137+
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
134138
import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
135139
import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
136140
import static org.hamcrest.Matchers.containsString;
@@ -191,6 +195,45 @@ public void testRepeatableTests() throws Exception {
191195
assertEquals(result1, result2);
192196
}
193197

198+
/**
199+
* This test was added to verify that state recovery is properly reset on a node after it has become master and successfully
200+
* recovered a state (see {@link GatewayService}). The situation which triggers this with a decent likelihood is as follows:
201+
* 3 master-eligible nodes (leader, follower1, follower2), the followers are shut down (leader remains), when followers come back
202+
* one of them becomes leader and publishes first state (with STATE_NOT_RECOVERED_BLOCK) to old leader, which accepts it.
203+
* Old leader is initiating an election at the same time, and wins election. It becomes leader again, but as it previously
204+
* successfully completed state recovery, is never reset to a state where state recovery can be retried.
205+
*/
206+
public void testStateRecoveryResetAfterPreviousLeadership() {
207+
final Cluster cluster = new Cluster(3);
208+
cluster.runRandomly();
209+
cluster.stabilise();
210+
211+
final ClusterNode leader = cluster.getAnyLeader();
212+
final ClusterNode follower1 = cluster.getAnyNodeExcept(leader);
213+
final ClusterNode follower2 = cluster.getAnyNodeExcept(leader, follower1);
214+
215+
// restart follower1 and follower2
216+
for (ClusterNode clusterNode : Arrays.asList(follower1, follower2)) {
217+
clusterNode.close();
218+
cluster.clusterNodes.forEach(
219+
cn -> cluster.deterministicTaskQueue.scheduleNow(cn.onNode(
220+
new Runnable() {
221+
@Override
222+
public void run() {
223+
cn.transportService.disconnectFromNode(clusterNode.getLocalNode());
224+
}
225+
226+
@Override
227+
public String toString() {
228+
return "disconnect from " + clusterNode.getLocalNode() + " after shutdown";
229+
}
230+
})));
231+
cluster.clusterNodes.replaceAll(cn -> cn == clusterNode ? cn.restartedNode() : cn);
232+
}
233+
234+
cluster.stabilise();
235+
}
236+
194237
public void testCanUpdateClusterStateAfterStabilisation() {
195238
final Cluster cluster = new Cluster(randomIntBetween(1, 5));
196239
cluster.runRandomly();
@@ -1525,6 +1568,10 @@ void stabilise(long stabilisationDurationMillis) {
15251568

15261569
assertTrue(leaderId + " has been bootstrapped", leader.coordinator.isInitialConfigurationSet());
15271570
assertTrue(leaderId + " exists in its last-applied state", leader.getLastAppliedClusterState().getNodes().nodeExists(leaderId));
1571+
assertThat(leaderId + " has no NO_MASTER_BLOCK",
1572+
leader.getLastAppliedClusterState().blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID), equalTo(false));
1573+
assertThat(leaderId + " has no STATE_NOT_RECOVERED_BLOCK",
1574+
leader.getLastAppliedClusterState().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK), equalTo(false));
15281575
assertThat(leaderId + " has applied its state ", leader.getLastAppliedClusterState().getVersion(), isEqualToLeaderVersion);
15291576

15301577
for (final ClusterNode clusterNode : clusterNodes) {
@@ -1556,6 +1603,8 @@ void stabilise(long stabilisationDurationMillis) {
15561603
equalTo(leader.getLocalNode()));
15571604
assertThat(nodeId + " has no NO_MASTER_BLOCK",
15581605
clusterNode.getLastAppliedClusterState().blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID), equalTo(false));
1606+
assertThat(nodeId + " has no STATE_NOT_RECOVERED_BLOCK",
1607+
clusterNode.getLastAppliedClusterState().blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK), equalTo(false));
15591608
} else {
15601609
assertThat(nodeId + " is not following " + leaderId, clusterNode.coordinator.getMode(), is(CANDIDATE));
15611610
assertThat(nodeId + " has no master", clusterNode.getLastAppliedClusterState().nodes().getMasterNode(), nullValue());
@@ -1725,7 +1774,8 @@ class MockPersistedState implements PersistedState {
17251774
} else {
17261775
nodeEnvironment = null;
17271776
delegate = new InMemoryPersistedState(0L,
1728-
clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L));
1777+
ClusterStateUpdaters.addStateNotRecoveredBlock(
1778+
clusterState(0L, 0L, localNode, VotingConfiguration.EMPTY_CONFIG, VotingConfiguration.EMPTY_CONFIG, 0L)));
17291779
}
17301780
} catch (IOException e) {
17311781
throw new UncheckedIOException("Unable to create MockPersistedState", e);
@@ -1765,8 +1815,9 @@ class MockPersistedState implements PersistedState {
17651815
clusterState.writeTo(outStream);
17661816
StreamInput inStream = new NamedWriteableAwareStreamInput(outStream.bytes().streamInput(),
17671817
new NamedWriteableRegistry(ClusterModule.getNamedWriteables()));
1818+
// adapt cluster state to new localNode instance and add blocks
17681819
delegate = new InMemoryPersistedState(adaptCurrentTerm.apply(oldState.getCurrentTerm()),
1769-
ClusterState.readFrom(inStream, newLocalNode)); // adapts it to new localNode instance
1820+
ClusterStateUpdaters.addStateNotRecoveredBlock(ClusterState.readFrom(inStream, newLocalNode)));
17701821
}
17711822
} catch (IOException e) {
17721823
throw new UncheckedIOException("Unable to create MockPersistedState", e);
@@ -1870,15 +1921,19 @@ protected Optional<DisruptableMockTransport> getDisruptableMockTransport(Transpo
18701921
transportService));
18711922
final Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators =
18721923
Collections.singletonList((dn, cs) -> extraJoinValidators.forEach(validator -> validator.accept(dn, cs)));
1924+
final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY);
18731925
coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(),
1874-
ESAllocationTestCase.createAllocationService(Settings.EMPTY), masterService, this::getPersistedState,
1926+
allocationService, masterService, this::getPersistedState,
18751927
Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get());
18761928
masterService.setClusterStatePublisher(coordinator);
1929+
final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService,
1930+
deterministicTaskQueue.getThreadPool(this::onNode), null, null, coordinator);
18771931

18781932
logger.trace("starting up [{}]", localNode);
18791933
transportService.start();
18801934
transportService.acceptIncomingRequests();
18811935
coordinator.start();
1936+
gatewayService.start();
18821937
clusterService.start();
18831938
coordinator.startInitialJoin();
18841939
}

0 commit comments

Comments
 (0)