Skip to content

[Zen2] Implement state recovery #36013

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Dec 4, 2018
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.gateway;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndicesService;

import java.util.Map;

class ClusterStateUpdaters {
private static final Logger logger = LogManager.getLogger(ClusterStateUpdaters.class);

static ClusterState setLocalNode(final ClusterState clusterState, DiscoveryNode localNode) {
return ClusterState.builder(clusterState)
.nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).build())
.build();
}

static ClusterState upgradeAndArchiveUnknownOrInvalidSettings(final ClusterState clusterState,
final ClusterSettings clusterSettings) {
final MetaData.Builder metaDataBuilder = MetaData.builder(clusterState.metaData());

metaDataBuilder.persistentSettings(
clusterSettings.archiveUnknownOrInvalidSettings(
clusterSettings.upgradeSettings(metaDataBuilder.persistentSettings()),
e -> logUnknownSetting("persistent", e),
(e, ex) -> logInvalidSetting("persistent", e, ex)));
metaDataBuilder.transientSettings(
clusterSettings.archiveUnknownOrInvalidSettings(
clusterSettings.upgradeSettings(metaDataBuilder.transientSettings()),
e -> logUnknownSetting("transient", e),
(e, ex) -> logInvalidSetting("transient", e, ex)));
return ClusterState.builder(clusterState).metaData(metaDataBuilder).build();
}

private static void logUnknownSetting(final String settingType, final Map.Entry<String, String> e) {
logger.warn("ignoring unknown {} setting: [{}] with value [{}]; archiving", settingType, e.getKey(), e.getValue());
}

private static void logInvalidSetting(final String settingType, final Map.Entry<String, String> e,
final IllegalArgumentException ex) {
logger.warn(() -> new ParameterizedMessage("ignoring invalid {} setting: [{}] with value [{}]; archiving",
settingType, e.getKey(), e.getValue()), ex);
}

static ClusterState recoverClusterBlocks(final ClusterState state) {
final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(state.blocks());

if (MetaData.SETTING_READ_ONLY_SETTING.get(state.metaData().settings())) {
blocks.addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK);
}

if (MetaData.SETTING_READ_ONLY_ALLOW_DELETE_SETTING.get(state.metaData().settings())) {
blocks.addGlobalBlock(MetaData.CLUSTER_READ_ONLY_ALLOW_DELETE_BLOCK);
}

for (final IndexMetaData indexMetaData : state.metaData()) {
blocks.addBlocks(indexMetaData);
}

return ClusterState.builder(state).blocks(blocks).build();
}

static ClusterState closeBadIndices(final ClusterState clusterState, final IndicesService indicesService) {
final MetaData.Builder builder = MetaData.builder(clusterState.metaData()).removeAllIndices();

for (IndexMetaData metaData : clusterState.metaData()) {
try {
if (metaData.getState() == IndexMetaData.State.OPEN) {
// verify that we can actually create this index - if not we recover it as closed with lots of warn logs
indicesService.verifyIndexMetadata(metaData, metaData);
}
} catch (final Exception e) {
final Index electedIndex = metaData.getIndex();
logger.warn(() -> new ParameterizedMessage("recovering index {} failed - recovering as closed", electedIndex), e);
metaData = IndexMetaData.builder(metaData).state(IndexMetaData.State.CLOSE).build();
}
builder.put(metaData, false);
}

return ClusterState.builder(clusterState).metaData(builder).build();
}

static ClusterState updateRoutingTable(final ClusterState state) {
// initialize all index routing tables as empty
final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(state.routingTable());
for (final ObjectCursor<IndexMetaData> cursor : state.metaData().indices().values()) {
routingTableBuilder.addAsRecovery(cursor.value);
}
// start with 0 based versions for routing table
routingTableBuilder.version(0);
return ClusterState.builder(state).routingTable(routingTableBuilder.build()).build();
}

static ClusterState removeStateNotRecoveredBlock(final ClusterState state) {
return ClusterState.builder(state)
.blocks(ClusterBlocks.builder()
.blocks(state.blocks()).removeGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK).build())
.build();
}

static ClusterState mixCurrentStateAndRecoveredState(final ClusterState currentState, final ClusterState recoveredState) {
assert currentState.metaData().indices().isEmpty();

final ClusterBlocks.Builder blocks = ClusterBlocks.builder()
.blocks(currentState.blocks())
.blocks(recoveredState.blocks());

final MetaData.Builder metaDataBuilder = MetaData.builder(recoveredState.metaData());
// automatically generate a UID for the metadata if we need to
metaDataBuilder.generateClusterUuidIfNeeded();

for (final IndexMetaData indexMetaData : recoveredState.metaData()) {
metaDataBuilder.put(indexMetaData, false);
}

return ClusterState.builder(currentState)
.blocks(blocks)
.metaData(metaDataBuilder)
.build();
}

}
76 changes: 20 additions & 56 deletions server/src/main/java/org/elasticsearch/gateway/Gateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,18 @@
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.IndicesService;

import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;

public class Gateway {

Expand All @@ -49,34 +47,32 @@ public class Gateway {
private final int minimumMasterNodes;
private final IndicesService indicesService;

public Gateway(Settings settings, ClusterService clusterService,
TransportNodesListGatewayMetaState listGatewayMetaState,
IndicesService indicesService) {
public Gateway(final Settings settings, final ClusterService clusterService,
final TransportNodesListGatewayMetaState listGatewayMetaState,
final IndicesService indicesService) {
this.indicesService = indicesService;
this.clusterService = clusterService;
this.listGatewayMetaState = listGatewayMetaState;
this.minimumMasterNodes = ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);
}

public void performStateRecovery(final GatewayStateRecoveredListener listener) throws GatewayException {
String[] nodesIds = clusterService.state().nodes().getMasterNodes().keys().toArray(String.class);
final String[] nodesIds = clusterService.state().nodes().getMasterNodes().keys().toArray(String.class);
logger.trace("performing state recovery from {}", Arrays.toString(nodesIds));
TransportNodesListGatewayMetaState.NodesGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds, null).actionGet();


int requiredAllocation = Math.max(1, minimumMasterNodes);
final TransportNodesListGatewayMetaState.NodesGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds, null).actionGet();

final int requiredAllocation = Math.max(1, minimumMasterNodes);

if (nodesState.hasFailures()) {
for (FailedNodeException failedNodeException : nodesState.failures()) {
for (final FailedNodeException failedNodeException : nodesState.failures()) {
logger.warn("failed to fetch state from node", failedNodeException);
}
}

ObjectFloatHashMap<Index> indices = new ObjectFloatHashMap<>();
final ObjectFloatHashMap<Index> indices = new ObjectFloatHashMap<>();
MetaData electedGlobalState = null;
int found = 0;
for (TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNodes()) {
for (final TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNodes()) {
if (nodeState.metaData() == null) {
continue;
}
Expand All @@ -86,7 +82,7 @@ public void performStateRecovery(final GatewayStateRecoveredListener listener) t
} else if (nodeState.metaData().version() > electedGlobalState.version()) {
electedGlobalState = nodeState.metaData();
}
for (ObjectCursor<IndexMetaData> cursor : nodeState.metaData().indices().values()) {
for (final ObjectCursor<IndexMetaData> cursor : nodeState.metaData().indices().values()) {
indices.addTo(cursor.value.getIndex(), 1);
}
}
Expand All @@ -95,20 +91,20 @@ public void performStateRecovery(final GatewayStateRecoveredListener listener) t
return;
}
// update the global state, and clean the indices, we elect them in the next phase
MetaData.Builder metaDataBuilder = MetaData.builder(electedGlobalState).removeAllIndices();
final MetaData.Builder metaDataBuilder = MetaData.builder(electedGlobalState).removeAllIndices();

assert !indices.containsKey(null);
final Object[] keys = indices.keys;
for (int i = 0; i < keys.length; i++) {
if (keys[i] != null) {
Index index = (Index) keys[i];
final Index index = (Index) keys[i];
IndexMetaData electedIndexMetaData = null;
int indexMetaDataCount = 0;
for (TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNodes()) {
for (final TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNodes()) {
if (nodeState.metaData() == null) {
continue;
}
IndexMetaData indexMetaData = nodeState.metaData().index(index);
final IndexMetaData indexMetaData = nodeState.metaData().index(index);
if (indexMetaData == null) {
continue;
}
Expand All @@ -123,49 +119,17 @@ public void performStateRecovery(final GatewayStateRecoveredListener listener) t
if (indexMetaDataCount < requiredAllocation) {
logger.debug("[{}] found [{}], required [{}], not adding", index, indexMetaDataCount, requiredAllocation);
} // TODO if this logging statement is correct then we are missing an else here
try {
if (electedIndexMetaData.getState() == IndexMetaData.State.OPEN) {
// verify that we can actually create this index - if not we recover it as closed with lots of warn logs
indicesService.verifyIndexMetadata(electedIndexMetaData, electedIndexMetaData);
}
} catch (Exception e) {
final Index electedIndex = electedIndexMetaData.getIndex();
logger.warn(() -> new ParameterizedMessage("recovering index {} failed - recovering as closed", electedIndex), e);
electedIndexMetaData = IndexMetaData.builder(electedIndexMetaData).state(IndexMetaData.State.CLOSE).build();
}

metaDataBuilder.put(electedIndexMetaData, false);
}
}
}
final ClusterState.Builder builder = upgradeAndArchiveUnknownOrInvalidSettings(metaDataBuilder);
listener.onSuccess(builder.build());
}

ClusterState.Builder upgradeAndArchiveUnknownOrInvalidSettings(MetaData.Builder metaDataBuilder) {
final ClusterSettings clusterSettings = clusterService.getClusterSettings();
metaDataBuilder.persistentSettings(
clusterSettings.archiveUnknownOrInvalidSettings(
clusterSettings.upgradeSettings(metaDataBuilder.persistentSettings()),
e -> logUnknownSetting("persistent", e),
(e, ex) -> logInvalidSetting("persistent", e, ex)));
metaDataBuilder.transientSettings(
clusterSettings.archiveUnknownOrInvalidSettings(
clusterSettings.upgradeSettings(metaDataBuilder.transientSettings()),
e -> logUnknownSetting("transient", e),
(e, ex) -> logInvalidSetting("transient", e, ex)));
ClusterState.Builder builder = ClusterState.builder(clusterService.getClusterName());
builder.metaData(metaDataBuilder);
return builder;
}

private void logUnknownSetting(String settingType, Map.Entry<String, String> e) {
logger.warn("ignoring unknown {} setting: [{}] with value [{}]; archiving", settingType, e.getKey(), e.getValue());
}
ClusterState recoveredState = Function.<ClusterState>identity()
.andThen(state -> ClusterStateUpdaters.upgradeAndArchiveUnknownOrInvalidSettings(state, clusterService.getClusterSettings()))
.andThen(state -> ClusterStateUpdaters.closeBadIndices(state, indicesService))
.apply(ClusterState.builder(clusterService.getClusterName()).metaData(metaDataBuilder).build());

private void logInvalidSetting(String settingType, Map.Entry<String, String> e, IllegalArgumentException ex) {
logger.warn(() -> new ParameterizedMessage("ignoring invalid {} setting: [{}] with value [{}]; archiving",
settingType, e.getKey(), e.getValue()), ex);
listener.onSuccess(recoveredState);
}

public interface GatewayStateRecoveredListener {
Expand Down
Loading