Skip to content

[Zen2] Hide not recovered state #36224

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Dec 5, 2018
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.script.MockScriptPlugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.discovery.TestZenDiscovery;

import java.util.Arrays;
import java.util.Collection;
Expand All @@ -46,13 +45,6 @@
@ESIntegTestCase.ClusterScope(numDataNodes = 0, numClientNodes = 0, scope = ESIntegTestCase.Scope.TEST)
public class IngestRestartIT extends ESIntegTestCase {

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
.put(TestZenDiscovery.USE_ZEN2.getKey(), false) // no state persistence yet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also remove import?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.build();
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(IngestCommonPlugin.class, CustomScriptPlugin.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@

import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;
import static org.elasticsearch.discovery.DiscoverySettings.NO_MASTER_BLOCK_WRITES;
import static org.elasticsearch.gateway.ClusterStateUpdaters.hideStateIfNotRecovered;
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;

public class Coordinator extends AbstractLifecycleComponent implements Discovery {
Expand Down Expand Up @@ -209,7 +210,7 @@ private void handleApplyCommit(ApplyCommitRequest applyCommitRequest, ActionList
logger.trace("handleApplyCommit: applying commit {}", applyCommitRequest);

coordinationState.get().handleCommit(applyCommitRequest);
final ClusterState committedState = coordinationState.get().getLastAcceptedState();
final ClusterState committedState = hideStateIfNotRecovered(coordinationState.get().getLastAcceptedState());
applierState = mode == Mode.CANDIDATE ? clusterStateWithNoMasterBlock(committedState) : committedState;
if (applyCommitRequest.getSourceNode().equals(getLocalNode())) {
// master node applies the committed state at the end of the publication process, not here.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@

import java.util.Map;

class ClusterStateUpdaters {
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;

public class ClusterStateUpdaters {
private static final Logger logger = LogManager.getLogger(ClusterStateUpdaters.class);

static ClusterState setLocalNode(final ClusterState clusterState, DiscoveryNode localNode) {
Expand Down Expand Up @@ -149,4 +151,25 @@ static ClusterState mixCurrentStateAndRecoveredState(final ClusterState currentS
.build();
}

public static ClusterState hideStateIfNotRecovered(ClusterState state) {
if (state.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK)) {
final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(state.blocks());
blocks.removeGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK);
blocks.removeGlobalBlock(MetaData.CLUSTER_READ_ONLY_ALLOW_DELETE_BLOCK);
for (IndexMetaData indexMetaData: state.metaData()) {
blocks.removeIndexBlocks(indexMetaData.getIndex().getName());
}
final MetaData metaData = MetaData.builder()
.clusterUUID(state.metaData().clusterUUID())
.coordinationMetaData(state.metaData().coordinationMetaData())
.build();

return ClusterState.builder(state)
.metaData(metaData)
.blocks(blocks.build())
.build();
}
return state;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.hamcrest.Matchers;

import java.io.IOException;
Expand Down Expand Up @@ -212,15 +211,14 @@ public void testAllocatedProcessors() throws Exception {
}

public void testClusterStatusWhenStateNotRecovered() throws Exception {
internalCluster().startMasterOnlyNode(Settings.builder().put("gateway.recover_after_nodes", 2)
.put(TestZenDiscovery.USE_ZEN2.getKey(), false).build());
internalCluster().startMasterOnlyNode(Settings.builder().put("gateway.recover_after_nodes", 2).build());
ClusterStatsResponse response = client().admin().cluster().prepareClusterStats().get();
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.RED));

if (randomBoolean()) {
internalCluster().startMasterOnlyNode(Settings.builder().put(TestZenDiscovery.USE_ZEN2.getKey(), false).build());
internalCluster().startMasterOnlyNode();
} else {
internalCluster().startDataOnlyNode(Settings.builder().put(TestZenDiscovery.USE_ZEN2.getKey(), false).build());
internalCluster().startDataOnlyNode();
}
// wait for the cluster status to settle
ensureGreen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.discovery.TestZenDiscovery;

import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -67,14 +66,6 @@
@ClusterScope(scope = Scope.TEST)
public class CreateIndexIT extends ESIntegTestCase {

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
// testIndexWithUnknownSetting and testRestartIndexCreationAfterFullClusterRestart fail with Zen2
.put(TestZenDiscovery.USE_ZEN2.getKey(), false)
.build();
}

public void testCreationDateGivenFails() {
try {
prepareCreate("test").setSettings(Settings.builder().put(IndexMetaData.SETTING_CREATION_DATE, 4L)).get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,17 +341,17 @@ public boolean clearData(String nodeName) {

public void testNotWaitForQuorumCopies() throws Exception {
logger.info("--> starting 3 nodes");
internalCluster().startNodes(3, Settings.builder().put(TestZenDiscovery.USE_ZEN2.getKey(), false).build()); // needs state recovery
List<String> nodes = internalCluster().startNodes(3);
logger.info("--> creating index with 1 primary and 2 replicas");
assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder()
.put("index.number_of_shards", randomIntBetween(1, 3)).put("index.number_of_replicas", 2)).get());
ensureGreen("test");
client().prepareIndex("test", "type1").setSource(jsonBuilder()
.startObject().field("field", "value1").endObject()).get();
logger.info("--> removing 2 nodes from cluster");
internalCluster().stopRandomDataNode();
internalCluster().stopRandomDataNode();
internalCluster().fullRestart();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodes.get(1), nodes.get(2)));
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodes.get(1), nodes.get(2)));
internalCluster().restartRandomDataNode();
logger.info("--> checking that index still gets allocated with only 1 shard copy being available");
ensureYellow("test");
assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 1L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.coordination.CoordinationMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -36,6 +37,7 @@
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
import java.util.function.BiConsumer;
Expand All @@ -45,6 +47,7 @@

import static org.elasticsearch.cluster.metadata.MetaData.CLUSTER_READ_ONLY_BLOCK;
import static org.elasticsearch.gateway.ClusterStateUpdaters.closeBadIndices;
import static org.elasticsearch.gateway.ClusterStateUpdaters.hideStateIfNotRecovered;
import static org.elasticsearch.gateway.ClusterStateUpdaters.mixCurrentStateAndRecoveredState;
import static org.elasticsearch.gateway.ClusterStateUpdaters.recoverClusterBlocks;
import static org.elasticsearch.gateway.ClusterStateUpdaters.removeStateNotRecoveredBlock;
Expand Down Expand Up @@ -271,4 +274,55 @@ public void testSetLocalNode() {
assertThat(updatedState.nodes().getSize(), is(1));
}

public void testDoNotHideStateIfRecovered() {
final IndexMetaData indexMetaData = createIndexMetaData("test", Settings.EMPTY);
final MetaData metaData = MetaData.builder()
.persistentSettings(Settings.builder().put("test", "test").build())
.put(indexMetaData, false)
.build();
final ClusterState initialState = ClusterState.builder(ClusterState.EMPTY_STATE)
.metaData(metaData)
.build();
assertMetaDataEquals(initialState, hideStateIfNotRecovered(initialState));
}

public void testHideStateIfNotRecovered() {
final IndexMetaData indexMetaData = createIndexMetaData("test",
Settings.builder().put(IndexMetaData.INDEX_READ_ONLY_SETTING.getKey(), true).build());
final String clusterUUID = UUIDs.randomBase64UUID();
final CoordinationMetaData coordinationMetaData = new CoordinationMetaData(randomLong(),
new CoordinationMetaData.VotingConfiguration(Sets.newHashSet(generateRandomStringArray(5, 5, false))),
new CoordinationMetaData.VotingConfiguration(Sets.newHashSet(generateRandomStringArray(5, 5, false))),
Arrays.stream(generateRandomStringArray(5, 5, false))
.map(id -> new CoordinationMetaData.VotingTombstone(id, id))
.collect(Collectors.toSet()));
final MetaData metaData = MetaData.builder()
.persistentSettings(Settings.builder().put(MetaData.SETTING_READ_ONLY_SETTING.getKey(), true).build())
.transientSettings(Settings.builder().put(MetaData.SETTING_READ_ONLY_ALLOW_DELETE_SETTING.getKey(), true).build())
.clusterUUID(clusterUUID)
.coordinationMetaData(coordinationMetaData)
.put(indexMetaData, false)
.build();
final ClusterState initialState = ClusterState.builder(ClusterState.EMPTY_STATE)
.metaData(metaData)
.blocks(ClusterBlocks.builder().addGlobalBlock(STATE_NOT_RECOVERED_BLOCK))
.build();
final DiscoveryNode localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Collections.emptyMap(),
Sets.newHashSet(DiscoveryNode.Role.MASTER), Version.CURRENT);
final ClusterState updatedState = Function.<ClusterState>identity()
.andThen(state -> setLocalNode(state, localNode))
.andThen(ClusterStateUpdaters::recoverClusterBlocks)
.apply(initialState);

final ClusterState hiddenState = hideStateIfNotRecovered(updatedState);

assertTrue(MetaData.isGlobalStateEquals(hiddenState.metaData(),
MetaData.builder().coordinationMetaData(coordinationMetaData).clusterUUID(clusterUUID).build()));
assertThat(hiddenState.metaData().indices().size(), is(0));
assertTrue(hiddenState.blocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK));
assertFalse(hiddenState.blocks().hasGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK));
assertFalse(hiddenState.blocks().hasGlobalBlock(MetaData.CLUSTER_READ_ONLY_ALLOW_DELETE_BLOCK));
assertFalse(hiddenState.blocks().hasIndexBlock(indexMetaData.getIndex().getName(), IndexMetaData.INDEX_READ_ONLY_BLOCK));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,6 @@ public class GatewayIndexStateIT extends ESIntegTestCase {

private final Logger logger = LogManager.getLogger(GatewayIndexStateIT.class);

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
// testRecoverBrokenIndexMetadata, testRecoverMissingAnalyzer, testDanglingIndices and testArchiveBrokenClusterSettings fail
.put(TestZenDiscovery.USE_ZEN2.getKey(), false)
.build();
}

public void testMappingMetaDataParsed() throws Exception {
logger.info("--> starting 1 nodes");
internalCluster().startNode();
Expand Down Expand Up @@ -286,7 +278,9 @@ public void testTwoNodesSingleDoc() throws Exception {
public void testDanglingIndices() throws Exception {
logger.info("--> starting two nodes");

final String node_1 = internalCluster().startNodes(2).get(0);
final String node_1 = internalCluster().startNodes(2,
//TODO fails wih Zen2
Settings.builder().put(TestZenDiscovery.USE_ZEN2.getKey(), false).build()).get(0);

logger.info("--> indexing a simple document");
client().prepareIndex("test", "type1", "1").setSource("field1", "value1").setRefreshPolicy(IMMEDIATE).get();
Expand Down Expand Up @@ -339,7 +333,9 @@ public void testIndexDeletionWhenNodeRejoins() throws Exception {
final List<String> nodes;
logger.info("--> starting a cluster with " + numNodes + " nodes");
nodes = internalCluster().startNodes(numNodes,
Settings.builder().put(IndexGraveyard.SETTING_MAX_TOMBSTONES.getKey(), randomIntBetween(10, 100)).build());
Settings.builder().put(IndexGraveyard.SETTING_MAX_TOMBSTONES.getKey(), randomIntBetween(10, 100))
//TODO fails with Zen2
.put(TestZenDiscovery.USE_ZEN2.getKey(), false).build());
logger.info("--> create an index");
createIndex(indexName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.InternalTestCluster.RestartCallback;
import org.elasticsearch.test.discovery.TestZenDiscovery;

import java.util.concurrent.TimeUnit;

Expand All @@ -40,13 +39,6 @@
@ClusterScope(numDataNodes = 0, scope = Scope.TEST)
public class QuorumGatewayIT extends ESIntegTestCase {

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
.put(TestZenDiscovery.USE_ZEN2.getKey(), false) // no state persistence yet
.build();
}

@Override
protected int numberOfReplicas() {
return 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.discovery.TestZenDiscovery;

import java.util.Set;

Expand All @@ -40,13 +40,6 @@
public class RecoverAfterNodesIT extends ESIntegTestCase {
private static final TimeValue BLOCK_WAIT_TIMEOUT = TimeValue.timeValueSeconds(10);

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
.put(TestZenDiscovery.USE_ZEN2.getKey(), false) // recover_after no implemented in Zen2 yet
.build();
}

public Set<ClusterBlock> waitForNoBlocksOnNode(TimeValue timeout, Client nodeClient) throws InterruptedException {
long start = System.currentTimeMillis();
Set<ClusterBlock> blocks;
Expand All @@ -67,7 +60,9 @@ public Client startNode(Settings.Builder settings, int minMasterNodes) {

public void testRecoverAfterNodes() throws Exception {
logger.info("--> start node (1)");
Client clientNode1 = startNode(Settings.builder().put("gateway.recover_after_nodes", 3), 1);
Client clientNode1 = startNode(Settings.builder()
.put("gateway.recover_after_nodes", 3)
.put(ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING.getKey(), 1), 1);
assertThat(clientNode1.admin().cluster().prepareState().setLocal(true).execute().actionGet()
.getState().blocks().global(ClusterBlockLevel.METADATA_WRITE),
hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK));
Expand All @@ -94,7 +89,8 @@ public void testRecoverAfterMasterNodes() throws Exception {
logger.info("--> start master_node (1)");
Client master1 = startNode(Settings.builder()
.put("gateway.recover_after_master_nodes", 2).put(Node.NODE_DATA_SETTING.getKey(), false)
.put(Node.NODE_MASTER_SETTING.getKey(), true), 1);
.put(Node.NODE_MASTER_SETTING.getKey(), true)
.put(ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING.getKey(), 1), 1);
assertThat(master1.admin().cluster().prepareState().setLocal(true).execute().actionGet()
.getState().blocks().global(ClusterBlockLevel.METADATA_WRITE),
hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK));
Expand Down Expand Up @@ -140,7 +136,8 @@ public void testRecoverAfterDataNodes() throws Exception {
Client master1 = startNode(Settings.builder()
.put("gateway.recover_after_data_nodes", 2)
.put(Node.NODE_DATA_SETTING.getKey(), false)
.put(Node.NODE_MASTER_SETTING.getKey(), true), 1);
.put(Node.NODE_MASTER_SETTING.getKey(), true)
.put(ClusterBootstrapService.INITIAL_MASTER_NODE_COUNT_SETTING.getKey(), 1), 1);
assertThat(master1.admin().cluster().prepareState().setLocal(true).execute().actionGet()
.getState().blocks().global(ClusterBlockLevel.METADATA_WRITE),
hasItem(GatewayService.STATE_NOT_RECOVERED_BLOCK));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.test.MockIndexEventListener;
import org.elasticsearch.test.discovery.TestZenDiscovery;
import org.elasticsearch.test.store.MockFSIndexStore;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -117,7 +116,6 @@ protected Settings nodeSettings(int nodeOrdinal) {
// speed up recoveries
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 5)
.put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 5)
.put(TestZenDiscovery.USE_ZEN2.getKey(), false) // no state persistence yet
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.discovery.TestZenDiscovery;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.emptyIterable;
Expand All @@ -66,14 +65,6 @@

public class FlushIT extends ESIntegTestCase {

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal))
// uses fullClusterRestart
.put(TestZenDiscovery.USE_ZEN2.getKey(), false)
.build();
}

public void testWaitIfOngoing() throws InterruptedException {
createIndex("test");
ensureGreen("test");
Expand Down
Loading