Skip to content

Commit 6522538

Browse files
authored
Add validation for supported index version on node join, restore, upgrade & open index (#21830)
Today we can easily join a cluster that holds an index we don't support since we currently allow rolling upgrades from 5.x to 6.x. Along the same lines we don't check if we can support an index based on the nodes in the cluster when we open, restore or metadata-upgrade and index. This commit adds additional safety that fails cluster state validation, open, restore and /or upgrade if there is an open index with an incompatible index version created in the cluster. Realtes to #21670
1 parent 155de53 commit 6522538

22 files changed

+300
-56
lines changed

core/src/main/java/org/elasticsearch/Version.java

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -218,12 +218,17 @@ public static void writeVersion(Version version, StreamOutput out) throws IOExce
218218
}
219219

220220
/**
221-
* Returns the smallest version between the 2.
221+
* Returns the minimum version between the 2.
222222
*/
223-
public static Version smallest(Version version1, Version version2) {
223+
public static Version min(Version version1, Version version2) {
224224
return version1.id < version2.id ? version1 : version2;
225225
}
226226

227+
/**
228+
* Returns the maximum version between the 2
229+
*/
230+
public static Version max(Version version1, Version version2) { return version1.id > version2.id ? version1 : version2; }
231+
227232
/**
228233
* Returns the version given its string representation, current version if the argument is null or empty
229234
*/
@@ -326,7 +331,22 @@ public Version minimumCompatibilityVersion() {
326331
bwcMajor = major;
327332
bwcMinor = 0;
328333
}
329-
return Version.smallest(this, fromId(bwcMajor * 1000000 + bwcMinor * 10000 + 99));
334+
return Version.min(this, fromId(bwcMajor * 1000000 + bwcMinor * 10000 + 99));
335+
}
336+
337+
/**
338+
* Returns the minimum created index version that this version supports. Indices created with lower versions
339+
* can't be used with this version.
340+
*/
341+
public Version minimumIndexCompatibilityVersion() {
342+
final int bwcMajor;
343+
if (major == 5) {
344+
bwcMajor = 2; // we jumped from 2 to 5
345+
} else {
346+
bwcMajor = major - 1;
347+
}
348+
final int bwcMinor = 0;
349+
return Version.min(this, fromId(bwcMajor * 1000000 + bwcMinor * 10000 + 99));
330350
}
331351

332352
/**
@@ -414,5 +434,4 @@ public boolean isRC() {
414434
public boolean isRelease() {
415435
return build == 99;
416436
}
417-
418437
}

core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataCreateIndexService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ public ClusterState execute(ClusterState currentState) throws Exception {
316316

317317
if (indexSettingsBuilder.get(SETTING_VERSION_CREATED) == null) {
318318
DiscoveryNodes nodes = currentState.nodes();
319-
final Version createdVersion = Version.smallest(Version.CURRENT, nodes.getSmallestNonClientNodeVersion());
319+
final Version createdVersion = Version.min(Version.CURRENT, nodes.getSmallestNonClientNodeVersion());
320320
indexSettingsBuilder.put(SETTING_VERSION_CREATED, createdVersion);
321321
}
322322

core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.elasticsearch.cluster.metadata;
2121

2222
import org.elasticsearch.ElasticsearchException;
23+
import org.elasticsearch.Version;
2324
import org.elasticsearch.action.ActionListener;
2425
import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
2526
import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest;
@@ -160,12 +161,14 @@ public ClusterState execute(ClusterState currentState) {
160161
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
161162
ClusterBlocks.Builder blocksBuilder = ClusterBlocks.builder()
162163
.blocks(currentState.blocks());
164+
final Version minIndexCompatibilityVersion = currentState.getNodes().getMaxNodeVersion()
165+
.minimumIndexCompatibilityVersion();
163166
for (IndexMetaData closedMetaData : indicesToOpen) {
164167
final String indexName = closedMetaData.getIndex().getName();
165168
IndexMetaData indexMetaData = IndexMetaData.builder(closedMetaData).state(IndexMetaData.State.OPEN).build();
166169
// The index might be closed because we couldn't import it due to old incompatible version
167170
// We need to check that this index can be upgraded to the current version
168-
indexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData);
171+
indexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData, minIndexCompatibilityVersion);
169172
try {
170173
indicesService.verifyIndexMetadata(indexMetaData, indexMetaData);
171174
} catch (Exception e) {

core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexUpgradeService.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,13 @@ public MetaDataIndexUpgradeService(Settings settings, MapperRegistry mapperRegis
6767
* If the index does not need upgrade it returns the index metadata unchanged, otherwise it returns a modified index metadata. If index
6868
* cannot be updated the method throws an exception.
6969
*/
70-
public IndexMetaData upgradeIndexMetaData(IndexMetaData indexMetaData) {
70+
public IndexMetaData upgradeIndexMetaData(IndexMetaData indexMetaData, Version minimumIndexCompatibilityVersion) {
7171
// Throws an exception if there are too-old segments:
7272
if (isUpgraded(indexMetaData)) {
7373
assert indexMetaData == archiveBrokenIndexSettings(indexMetaData) : "all settings must have been upgraded before";
7474
return indexMetaData;
7575
}
76-
checkSupportedVersion(indexMetaData);
76+
checkSupportedVersion(indexMetaData, minimumIndexCompatibilityVersion);
7777
IndexMetaData newMetaData = indexMetaData;
7878
// we have to run this first otherwise in we try to create IndexSettings
7979
// with broken settings and fail in checkMappingsCompatibility
@@ -92,21 +92,26 @@ boolean isUpgraded(IndexMetaData indexMetaData) {
9292
}
9393

9494
/**
95-
* Elasticsearch 5.0 no longer supports indices with pre Lucene v5.0 (Elasticsearch v2.0.0.beta1) segments. All indices
96-
* that were created before Elasticsearch v2.0.0.beta1 should be reindexed in Elasticsearch 2.x
97-
* before they can be opened by this version of elasticsearch. */
98-
private void checkSupportedVersion(IndexMetaData indexMetaData) {
99-
if (indexMetaData.getState() == IndexMetaData.State.OPEN && isSupportedVersion(indexMetaData) == false) {
100-
throw new IllegalStateException("The index [" + indexMetaData.getIndex() + "] was created before v2.0.0.beta1."
101-
+ " It should be reindexed in Elasticsearch 2.x before upgrading to " + Version.CURRENT + ".");
95+
* Elasticsearch v6.0 no longer supports indices created pre v5.0. All indices
96+
* that were created before Elasticsearch v5.0 should be re-indexed in Elasticsearch 5.x
97+
* before they can be opened by this version of elasticsearch.
98+
*/
99+
private void checkSupportedVersion(IndexMetaData indexMetaData, Version minimumIndexCompatibilityVersion) {
100+
if (indexMetaData.getState() == IndexMetaData.State.OPEN && isSupportedVersion(indexMetaData,
101+
minimumIndexCompatibilityVersion) == false) {
102+
throw new IllegalStateException("The index [" + indexMetaData.getIndex() + "] was created with version ["
103+
+ indexMetaData.getCreationVersion() + "] but the minimum compatible version is ["
104+
105+
+ minimumIndexCompatibilityVersion + "]. It should be re-indexed in Elasticsearch " + minimumIndexCompatibilityVersion.major
106+
+ ".x before upgrading to " + Version.CURRENT + ".");
102107
}
103108
}
104109

105110
/*
106111
* Returns true if this index can be supported by the current version of elasticsearch
107112
*/
108-
private static boolean isSupportedVersion(IndexMetaData indexMetaData) {
109-
return indexMetaData.getCreationVersion().onOrAfter(Version.V_5_0_0_beta1);
113+
private static boolean isSupportedVersion(IndexMetaData indexMetaData, Version minimumIndexCompatibilityVersion) {
114+
return indexMetaData.getCreationVersion().onOrAfter(minimumIndexCompatibilityVersion);
110115
}
111116

112117
/**
@@ -173,4 +178,4 @@ IndexMetaData archiveBrokenIndexSettings(IndexMetaData indexMetaData) {
173178
return indexMetaData;
174179
}
175180
}
176-
}
181+
}

core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,17 +56,22 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
5656
private final String masterNodeId;
5757
private final String localNodeId;
5858
private final Version minNonClientNodeVersion;
59+
private final Version maxNodeVersion;
60+
private final Version minNodeVersion;
5961

6062
private DiscoveryNodes(ImmutableOpenMap<String, DiscoveryNode> nodes, ImmutableOpenMap<String, DiscoveryNode> dataNodes,
6163
ImmutableOpenMap<String, DiscoveryNode> masterNodes, ImmutableOpenMap<String, DiscoveryNode> ingestNodes,
62-
String masterNodeId, String localNodeId, Version minNonClientNodeVersion) {
64+
String masterNodeId, String localNodeId, Version minNonClientNodeVersion, Version maxNodeVersion,
65+
Version minNodeVersion) {
6366
this.nodes = nodes;
6467
this.dataNodes = dataNodes;
6568
this.masterNodes = masterNodes;
6669
this.ingestNodes = ingestNodes;
6770
this.masterNodeId = masterNodeId;
6871
this.localNodeId = localNodeId;
6972
this.minNonClientNodeVersion = minNonClientNodeVersion;
73+
this.minNodeVersion = minNodeVersion;
74+
this.maxNodeVersion = maxNodeVersion;
7075
}
7176

7277
@Override
@@ -235,6 +240,24 @@ public Version getSmallestNonClientNodeVersion() {
235240
return minNonClientNodeVersion;
236241
}
237242

243+
/**
244+
* Returns the version of the node with the oldest version in the cluster.
245+
*
246+
* @return the oldest version in the cluster
247+
*/
248+
public Version getMinNodeVersion() {
249+
return minNodeVersion;
250+
}
251+
252+
/**
253+
* Returns the version of the node with the yougest version in the cluster
254+
*
255+
* @return the oldest version in the cluster
256+
*/
257+
public Version getMaxNodeVersion() {
258+
return maxNodeVersion;
259+
}
260+
238261
/**
239262
* Resolve a node with a given id
240263
*
@@ -631,25 +654,27 @@ public DiscoveryNodes build() {
631654
ImmutableOpenMap.Builder<String, DiscoveryNode> masterNodesBuilder = ImmutableOpenMap.builder();
632655
ImmutableOpenMap.Builder<String, DiscoveryNode> ingestNodesBuilder = ImmutableOpenMap.builder();
633656
Version minNodeVersion = Version.CURRENT;
657+
Version maxNodeVersion = Version.CURRENT;
634658
Version minNonClientNodeVersion = Version.CURRENT;
635659
for (ObjectObjectCursor<String, DiscoveryNode> nodeEntry : nodes) {
636660
if (nodeEntry.value.isDataNode()) {
637661
dataNodesBuilder.put(nodeEntry.key, nodeEntry.value);
638-
minNonClientNodeVersion = Version.smallest(minNonClientNodeVersion, nodeEntry.value.getVersion());
662+
minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion());
639663
}
640664
if (nodeEntry.value.isMasterNode()) {
641665
masterNodesBuilder.put(nodeEntry.key, nodeEntry.value);
642-
minNonClientNodeVersion = Version.smallest(minNonClientNodeVersion, nodeEntry.value.getVersion());
666+
minNonClientNodeVersion = Version.min(minNonClientNodeVersion, nodeEntry.value.getVersion());
643667
}
644668
if (nodeEntry.value.isIngestNode()) {
645669
ingestNodesBuilder.put(nodeEntry.key, nodeEntry.value);
646670
}
647-
minNodeVersion = Version.smallest(minNodeVersion, nodeEntry.value.getVersion());
671+
minNodeVersion = Version.min(minNodeVersion, nodeEntry.value.getVersion());
672+
maxNodeVersion = Version.max(maxNodeVersion, nodeEntry.value.getVersion());
648673
}
649674

650675
return new DiscoveryNodes(
651676
nodes.build(), dataNodesBuilder.build(), masterNodesBuilder.build(), ingestNodesBuilder.build(),
652-
masterNodeId, localNodeId, minNonClientNodeVersion
677+
masterNodeId, localNodeId, minNonClientNodeVersion, maxNodeVersion, minNodeVersion
653678
);
654679
}
655680

core/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,16 @@
1919

2020
package org.elasticsearch.discovery.zen;
2121

22+
import org.elasticsearch.Version;
2223
import org.elasticsearch.cluster.ClusterState;
24+
import org.elasticsearch.cluster.metadata.IndexMetaData;
25+
import org.elasticsearch.cluster.metadata.MetaData;
2326
import org.elasticsearch.cluster.node.DiscoveryNode;
2427
import org.elasticsearch.common.component.AbstractComponent;
2528
import org.elasticsearch.common.io.stream.StreamInput;
2629
import org.elasticsearch.common.io.stream.StreamOutput;
2730
import org.elasticsearch.common.settings.Settings;
2831
import org.elasticsearch.common.unit.TimeValue;
29-
import org.elasticsearch.discovery.zen.DiscoveryNodesProvider;
3032
import org.elasticsearch.threadpool.ThreadPool;
3133
import org.elasticsearch.transport.EmptyTransportResponseHandler;
3234
import org.elasticsearch.transport.TransportChannel;
@@ -37,6 +39,7 @@
3739

3840
import java.io.IOException;
3941
import java.util.concurrent.TimeUnit;
42+
import java.util.function.Supplier;
4043

4144
public class MembershipAction extends AbstractComponent {
4245

@@ -58,21 +61,20 @@ public interface MembershipListener {
5861

5962
private final TransportService transportService;
6063

61-
private final DiscoveryNodesProvider nodesProvider;
62-
6364
private final MembershipListener listener;
6465

6566
public MembershipAction(Settings settings, TransportService transportService,
66-
DiscoveryNodesProvider nodesProvider, MembershipListener listener) {
67+
Supplier<DiscoveryNode> localNodeSupplier, MembershipListener listener) {
6768
super(settings);
6869
this.transportService = transportService;
69-
this.nodesProvider = nodesProvider;
7070
this.listener = listener;
7171

72+
7273
transportService.registerRequestHandler(DISCOVERY_JOIN_ACTION_NAME, JoinRequest::new,
7374
ThreadPool.Names.GENERIC, new JoinRequestRequestHandler());
74-
transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME, ValidateJoinRequest::new,
75-
ThreadPool.Names.GENERIC, new ValidateJoinRequestRequestHandler());
75+
transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME,
76+
() -> new ValidateJoinRequest(localNodeSupplier), ThreadPool.Names.GENERIC,
77+
new ValidateJoinRequestRequestHandler());
7678
transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new,
7779
ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler());
7880
}
@@ -152,20 +154,23 @@ public void onFailure(Exception e) {
152154
}
153155
}
154156

155-
class ValidateJoinRequest extends TransportRequest {
157+
static class ValidateJoinRequest extends TransportRequest {
158+
private final Supplier<DiscoveryNode> localNode;
156159
private ClusterState state;
157160

158-
ValidateJoinRequest() {
161+
ValidateJoinRequest(Supplier<DiscoveryNode> localNode) {
162+
this.localNode = localNode;
159163
}
160164

161165
ValidateJoinRequest(ClusterState state) {
162166
this.state = state;
167+
this.localNode = state.nodes()::getLocalNode;
163168
}
164169

165170
@Override
166171
public void readFrom(StreamInput in) throws IOException {
167172
super.readFrom(in);
168-
this.state = ClusterState.Builder.readFrom(in, nodesProvider.nodes().getLocalNode());
173+
this.state = ClusterState.Builder.readFrom(in, localNode.get());
169174
}
170175

171176
@Override
@@ -175,15 +180,31 @@ public void writeTo(StreamOutput out) throws IOException {
175180
}
176181
}
177182

178-
class ValidateJoinRequestRequestHandler implements TransportRequestHandler<ValidateJoinRequest> {
183+
static class ValidateJoinRequestRequestHandler implements TransportRequestHandler<ValidateJoinRequest> {
179184

180185
@Override
181186
public void messageReceived(ValidateJoinRequest request, TransportChannel channel) throws Exception {
187+
ensureIndexCompatibility(Version.CURRENT.minimumIndexCompatibilityVersion(), request.state.getMetaData());
182188
// for now, the mere fact that we can serialize the cluster state acts as validation....
183189
channel.sendResponse(TransportResponse.Empty.INSTANCE);
184190
}
185191
}
186192

193+
/**
194+
* Ensures that all indices are compatible with the supported index version.
195+
* @throws IllegalStateException if any index is incompatible with the given version
196+
*/
197+
static void ensureIndexCompatibility(final Version supportedIndexVersion, MetaData metaData) {
198+
// we ensure that all indices in the cluster we join are compatible with us no matter if they are
199+
// closed or not we can't read mappings of these indices so we need to reject the join...
200+
for (IndexMetaData idxMetaData : metaData) {
201+
if (idxMetaData.getCreationVersion().before(supportedIndexVersion)) {
202+
throw new IllegalStateException("index " + idxMetaData.getIndex() + " version not supported: "
203+
+ idxMetaData.getCreationVersion() + " minimum compatible index version is: " + supportedIndexVersion);
204+
}
205+
}
206+
}
207+
187208
public static class LeaveRequest extends TransportRequest {
188209

189210
private DiscoveryNode node;

core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -410,7 +410,6 @@ class JoinTaskExecutor implements ClusterStateTaskExecutor<DiscoveryNode> {
410410
@Override
411411
public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<DiscoveryNode> joiningNodes) throws Exception {
412412
final BatchResult.Builder<DiscoveryNode> results = BatchResult.builder();
413-
414413
final DiscoveryNodes currentNodes = currentState.nodes();
415414
boolean nodesChanged = false;
416415
ClusterState.Builder newState;
@@ -435,8 +434,10 @@ public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<Discov
435434

436435
assert nodesBuilder.isLocalNodeElectedMaster();
437436

437+
Version minNodeVersion = Version.CURRENT;
438438
// processing any joins
439439
for (final DiscoveryNode node : joiningNodes) {
440+
minNodeVersion = Version.min(minNodeVersion, node.getVersion());
440441
if (node.equals(BECOME_MASTER_TASK) || node.equals(FINISH_ELECTION_TASK)) {
441442
// noop
442443
} else if (currentNodes.nodeExists(node)) {
@@ -452,7 +453,9 @@ public BatchResult<DiscoveryNode> execute(ClusterState currentState, List<Discov
452453
}
453454
results.success(node);
454455
}
455-
456+
// we do this validation quite late to prevent race conditions between nodes joining and importing dangling indices
457+
// we have to reject nodes that don't support all indices we have in this cluster
458+
MembershipAction.ensureIndexCompatibility(minNodeVersion.minimumIndexCompatibilityVersion(), currentState.getMetaData());
456459
if (nodesChanged) {
457460
newState.nodes(nodesBuilder);
458461
return results.build(allocationService.reroute(newState.build(), "node_join"));

0 commit comments

Comments
 (0)