Skip to content

Commit 35e3d77

Browse files
authored
[Zen2] Implement state recovery (#36013)
This commit implements proper metadata recovery for Zen2. GatewayService is responsible for the recovery. In Zen1 GatewayService creates an instance of Gateway, that is used to reach out to other cluster nodes, get their state and calculate the most up-to-date state based on versions. After that Gateway performs upgrade and archival of ClusterSettings and closes bad indices. Then recovered state is passed to GatewayService.GatewayRecoveryListener that mixes up current state and restored state, removes state not recovered block, creates the routing table and performs re-routing. In Zen2 we should perform this kind of logic on cluster startup, except mixing state (because there is nothing to mix) and opening routing table. This commit refactors out all `ClusterUpdate` functions in a separate class `ClusterStateUpdaters`, which is used by `Gateway` and `GatewayService` in case of Zen1, and by `GatewayMetaState` and `GatewayService` in case of Zen2. This commit also switches all integration tests that are already using Zen2 from InMemoryPersistedState to GatewayMetaState.
1 parent 94010d3 commit 35e3d77

File tree

11 files changed

+572
-339
lines changed

11 files changed

+572
-339
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.gateway;
21+
22+
import com.carrotsearch.hppc.cursors.ObjectCursor;
23+
import org.apache.logging.log4j.LogManager;
24+
import org.apache.logging.log4j.Logger;
25+
import org.apache.logging.log4j.message.ParameterizedMessage;
26+
import org.elasticsearch.cluster.ClusterState;
27+
import org.elasticsearch.cluster.block.ClusterBlocks;
28+
import org.elasticsearch.cluster.metadata.IndexMetaData;
29+
import org.elasticsearch.cluster.metadata.MetaData;
30+
import org.elasticsearch.cluster.node.DiscoveryNode;
31+
import org.elasticsearch.cluster.node.DiscoveryNodes;
32+
import org.elasticsearch.cluster.routing.RoutingTable;
33+
import org.elasticsearch.common.settings.ClusterSettings;
34+
import org.elasticsearch.index.Index;
35+
import org.elasticsearch.indices.IndicesService;
36+
37+
import java.util.Map;
38+
39+
class ClusterStateUpdaters {
40+
private static final Logger logger = LogManager.getLogger(ClusterStateUpdaters.class);
41+
42+
static ClusterState setLocalNode(final ClusterState clusterState, DiscoveryNode localNode) {
43+
return ClusterState.builder(clusterState)
44+
.nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).build())
45+
.build();
46+
}
47+
48+
static ClusterState upgradeAndArchiveUnknownOrInvalidSettings(final ClusterState clusterState,
49+
final ClusterSettings clusterSettings) {
50+
final MetaData.Builder metaDataBuilder = MetaData.builder(clusterState.metaData());
51+
52+
metaDataBuilder.persistentSettings(
53+
clusterSettings.archiveUnknownOrInvalidSettings(
54+
clusterSettings.upgradeSettings(metaDataBuilder.persistentSettings()),
55+
e -> logUnknownSetting("persistent", e),
56+
(e, ex) -> logInvalidSetting("persistent", e, ex)));
57+
metaDataBuilder.transientSettings(
58+
clusterSettings.archiveUnknownOrInvalidSettings(
59+
clusterSettings.upgradeSettings(metaDataBuilder.transientSettings()),
60+
e -> logUnknownSetting("transient", e),
61+
(e, ex) -> logInvalidSetting("transient", e, ex)));
62+
return ClusterState.builder(clusterState).metaData(metaDataBuilder).build();
63+
}
64+
65+
private static void logUnknownSetting(final String settingType, final Map.Entry<String, String> e) {
66+
logger.warn("ignoring unknown {} setting: [{}] with value [{}]; archiving", settingType, e.getKey(), e.getValue());
67+
}
68+
69+
private static void logInvalidSetting(final String settingType, final Map.Entry<String, String> e,
70+
final IllegalArgumentException ex) {
71+
logger.warn(() -> new ParameterizedMessage("ignoring invalid {} setting: [{}] with value [{}]; archiving",
72+
settingType, e.getKey(), e.getValue()), ex);
73+
}
74+
75+
static ClusterState recoverClusterBlocks(final ClusterState state) {
76+
final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(state.blocks());
77+
78+
if (MetaData.SETTING_READ_ONLY_SETTING.get(state.metaData().settings())) {
79+
blocks.addGlobalBlock(MetaData.CLUSTER_READ_ONLY_BLOCK);
80+
}
81+
82+
if (MetaData.SETTING_READ_ONLY_ALLOW_DELETE_SETTING.get(state.metaData().settings())) {
83+
blocks.addGlobalBlock(MetaData.CLUSTER_READ_ONLY_ALLOW_DELETE_BLOCK);
84+
}
85+
86+
for (final IndexMetaData indexMetaData : state.metaData()) {
87+
blocks.addBlocks(indexMetaData);
88+
}
89+
90+
return ClusterState.builder(state).blocks(blocks).build();
91+
}
92+
93+
static ClusterState closeBadIndices(final ClusterState clusterState, final IndicesService indicesService) {
94+
final MetaData.Builder builder = MetaData.builder(clusterState.metaData()).removeAllIndices();
95+
96+
for (IndexMetaData metaData : clusterState.metaData()) {
97+
try {
98+
if (metaData.getState() == IndexMetaData.State.OPEN) {
99+
// verify that we can actually create this index - if not we recover it as closed with lots of warn logs
100+
indicesService.verifyIndexMetadata(metaData, metaData);
101+
}
102+
} catch (final Exception e) {
103+
final Index electedIndex = metaData.getIndex();
104+
logger.warn(() -> new ParameterizedMessage("recovering index {} failed - recovering as closed", electedIndex), e);
105+
metaData = IndexMetaData.builder(metaData).state(IndexMetaData.State.CLOSE).build();
106+
}
107+
builder.put(metaData, false);
108+
}
109+
110+
return ClusterState.builder(clusterState).metaData(builder).build();
111+
}
112+
113+
static ClusterState updateRoutingTable(final ClusterState state) {
114+
// initialize all index routing tables as empty
115+
final RoutingTable.Builder routingTableBuilder = RoutingTable.builder(state.routingTable());
116+
for (final ObjectCursor<IndexMetaData> cursor : state.metaData().indices().values()) {
117+
routingTableBuilder.addAsRecovery(cursor.value);
118+
}
119+
// start with 0 based versions for routing table
120+
routingTableBuilder.version(0);
121+
return ClusterState.builder(state).routingTable(routingTableBuilder.build()).build();
122+
}
123+
124+
static ClusterState removeStateNotRecoveredBlock(final ClusterState state) {
125+
return ClusterState.builder(state)
126+
.blocks(ClusterBlocks.builder()
127+
.blocks(state.blocks()).removeGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK).build())
128+
.build();
129+
}
130+
131+
static ClusterState mixCurrentStateAndRecoveredState(final ClusterState currentState, final ClusterState recoveredState) {
132+
assert currentState.metaData().indices().isEmpty();
133+
134+
final ClusterBlocks.Builder blocks = ClusterBlocks.builder()
135+
.blocks(currentState.blocks())
136+
.blocks(recoveredState.blocks());
137+
138+
final MetaData.Builder metaDataBuilder = MetaData.builder(recoveredState.metaData());
139+
// automatically generate a UID for the metadata if we need to
140+
metaDataBuilder.generateClusterUuidIfNeeded();
141+
142+
for (final IndexMetaData indexMetaData : recoveredState.metaData()) {
143+
metaDataBuilder.put(indexMetaData, false);
144+
}
145+
146+
return ClusterState.builder(currentState)
147+
.blocks(blocks)
148+
.metaData(metaDataBuilder)
149+
.build();
150+
}
151+
152+
}

server/src/main/java/org/elasticsearch/gateway/Gateway.java

+20-56
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,18 @@
2323
import com.carrotsearch.hppc.cursors.ObjectCursor;
2424
import org.apache.logging.log4j.LogManager;
2525
import org.apache.logging.log4j.Logger;
26-
import org.apache.logging.log4j.message.ParameterizedMessage;
2726
import org.elasticsearch.action.FailedNodeException;
2827
import org.elasticsearch.cluster.ClusterState;
2928
import org.elasticsearch.cluster.metadata.IndexMetaData;
3029
import org.elasticsearch.cluster.metadata.MetaData;
3130
import org.elasticsearch.cluster.service.ClusterService;
32-
import org.elasticsearch.common.settings.ClusterSettings;
3331
import org.elasticsearch.common.settings.Settings;
3432
import org.elasticsearch.discovery.zen.ElectMasterService;
3533
import org.elasticsearch.index.Index;
3634
import org.elasticsearch.indices.IndicesService;
3735

3836
import java.util.Arrays;
39-
import java.util.Map;
37+
import java.util.function.Function;
4038

4139
public class Gateway {
4240

@@ -49,34 +47,32 @@ public class Gateway {
4947
private final int minimumMasterNodes;
5048
private final IndicesService indicesService;
5149

52-
public Gateway(Settings settings, ClusterService clusterService,
53-
TransportNodesListGatewayMetaState listGatewayMetaState,
54-
IndicesService indicesService) {
50+
public Gateway(final Settings settings, final ClusterService clusterService,
51+
final TransportNodesListGatewayMetaState listGatewayMetaState,
52+
final IndicesService indicesService) {
5553
this.indicesService = indicesService;
5654
this.clusterService = clusterService;
5755
this.listGatewayMetaState = listGatewayMetaState;
5856
this.minimumMasterNodes = ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);
5957
}
6058

6159
public void performStateRecovery(final GatewayStateRecoveredListener listener) throws GatewayException {
62-
String[] nodesIds = clusterService.state().nodes().getMasterNodes().keys().toArray(String.class);
60+
final String[] nodesIds = clusterService.state().nodes().getMasterNodes().keys().toArray(String.class);
6361
logger.trace("performing state recovery from {}", Arrays.toString(nodesIds));
64-
TransportNodesListGatewayMetaState.NodesGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds, null).actionGet();
65-
66-
67-
int requiredAllocation = Math.max(1, minimumMasterNodes);
62+
final TransportNodesListGatewayMetaState.NodesGatewayMetaState nodesState = listGatewayMetaState.list(nodesIds, null).actionGet();
6863

64+
final int requiredAllocation = Math.max(1, minimumMasterNodes);
6965

7066
if (nodesState.hasFailures()) {
71-
for (FailedNodeException failedNodeException : nodesState.failures()) {
67+
for (final FailedNodeException failedNodeException : nodesState.failures()) {
7268
logger.warn("failed to fetch state from node", failedNodeException);
7369
}
7470
}
7571

76-
ObjectFloatHashMap<Index> indices = new ObjectFloatHashMap<>();
72+
final ObjectFloatHashMap<Index> indices = new ObjectFloatHashMap<>();
7773
MetaData electedGlobalState = null;
7874
int found = 0;
79-
for (TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNodes()) {
75+
for (final TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNodes()) {
8076
if (nodeState.metaData() == null) {
8177
continue;
8278
}
@@ -86,7 +82,7 @@ public void performStateRecovery(final GatewayStateRecoveredListener listener) t
8682
} else if (nodeState.metaData().version() > electedGlobalState.version()) {
8783
electedGlobalState = nodeState.metaData();
8884
}
89-
for (ObjectCursor<IndexMetaData> cursor : nodeState.metaData().indices().values()) {
85+
for (final ObjectCursor<IndexMetaData> cursor : nodeState.metaData().indices().values()) {
9086
indices.addTo(cursor.value.getIndex(), 1);
9187
}
9288
}
@@ -95,20 +91,20 @@ public void performStateRecovery(final GatewayStateRecoveredListener listener) t
9591
return;
9692
}
9793
// update the global state, and clean the indices, we elect them in the next phase
98-
MetaData.Builder metaDataBuilder = MetaData.builder(electedGlobalState).removeAllIndices();
94+
final MetaData.Builder metaDataBuilder = MetaData.builder(electedGlobalState).removeAllIndices();
9995

10096
assert !indices.containsKey(null);
10197
final Object[] keys = indices.keys;
10298
for (int i = 0; i < keys.length; i++) {
10399
if (keys[i] != null) {
104-
Index index = (Index) keys[i];
100+
final Index index = (Index) keys[i];
105101
IndexMetaData electedIndexMetaData = null;
106102
int indexMetaDataCount = 0;
107-
for (TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNodes()) {
103+
for (final TransportNodesListGatewayMetaState.NodeGatewayMetaState nodeState : nodesState.getNodes()) {
108104
if (nodeState.metaData() == null) {
109105
continue;
110106
}
111-
IndexMetaData indexMetaData = nodeState.metaData().index(index);
107+
final IndexMetaData indexMetaData = nodeState.metaData().index(index);
112108
if (indexMetaData == null) {
113109
continue;
114110
}
@@ -123,49 +119,17 @@ public void performStateRecovery(final GatewayStateRecoveredListener listener) t
123119
if (indexMetaDataCount < requiredAllocation) {
124120
logger.debug("[{}] found [{}], required [{}], not adding", index, indexMetaDataCount, requiredAllocation);
125121
} // TODO if this logging statement is correct then we are missing an else here
126-
try {
127-
if (electedIndexMetaData.getState() == IndexMetaData.State.OPEN) {
128-
// verify that we can actually create this index - if not we recover it as closed with lots of warn logs
129-
indicesService.verifyIndexMetadata(electedIndexMetaData, electedIndexMetaData);
130-
}
131-
} catch (Exception e) {
132-
final Index electedIndex = electedIndexMetaData.getIndex();
133-
logger.warn(() -> new ParameterizedMessage("recovering index {} failed - recovering as closed", electedIndex), e);
134-
electedIndexMetaData = IndexMetaData.builder(electedIndexMetaData).state(IndexMetaData.State.CLOSE).build();
135-
}
136122

137123
metaDataBuilder.put(electedIndexMetaData, false);
138124
}
139125
}
140126
}
141-
final ClusterState.Builder builder = upgradeAndArchiveUnknownOrInvalidSettings(metaDataBuilder);
142-
listener.onSuccess(builder.build());
143-
}
144-
145-
ClusterState.Builder upgradeAndArchiveUnknownOrInvalidSettings(MetaData.Builder metaDataBuilder) {
146-
final ClusterSettings clusterSettings = clusterService.getClusterSettings();
147-
metaDataBuilder.persistentSettings(
148-
clusterSettings.archiveUnknownOrInvalidSettings(
149-
clusterSettings.upgradeSettings(metaDataBuilder.persistentSettings()),
150-
e -> logUnknownSetting("persistent", e),
151-
(e, ex) -> logInvalidSetting("persistent", e, ex)));
152-
metaDataBuilder.transientSettings(
153-
clusterSettings.archiveUnknownOrInvalidSettings(
154-
clusterSettings.upgradeSettings(metaDataBuilder.transientSettings()),
155-
e -> logUnknownSetting("transient", e),
156-
(e, ex) -> logInvalidSetting("transient", e, ex)));
157-
ClusterState.Builder builder = ClusterState.builder(clusterService.getClusterName());
158-
builder.metaData(metaDataBuilder);
159-
return builder;
160-
}
161-
162-
private void logUnknownSetting(String settingType, Map.Entry<String, String> e) {
163-
logger.warn("ignoring unknown {} setting: [{}] with value [{}]; archiving", settingType, e.getKey(), e.getValue());
164-
}
127+
ClusterState recoveredState = Function.<ClusterState>identity()
128+
.andThen(state -> ClusterStateUpdaters.upgradeAndArchiveUnknownOrInvalidSettings(state, clusterService.getClusterSettings()))
129+
.andThen(state -> ClusterStateUpdaters.closeBadIndices(state, indicesService))
130+
.apply(ClusterState.builder(clusterService.getClusterName()).metaData(metaDataBuilder).build());
165131

166-
private void logInvalidSetting(String settingType, Map.Entry<String, String> e, IllegalArgumentException ex) {
167-
logger.warn(() -> new ParameterizedMessage("ignoring invalid {} setting: [{}] with value [{}]; archiving",
168-
settingType, e.getKey(), e.getValue()), ex);
132+
listener.onSuccess(recoveredState);
169133
}
170134

171135
public interface GatewayStateRecoveredListener {

0 commit comments

Comments
 (0)