Skip to content

Commit 3c9f703

Browse files
authored
Enforce cluster UUIDs (#37775)
This commit adds join validation around cluster UUIDs, preventing a node to join a cluster if it was previously part of another cluster. The commit introduces a new flag to the cluster state, clusterUUIDCommitted, which denotes whether the node has locked into a cluster with the given uuid. When a cluster is committed, this flag will turn to true, and subsequent cluster state updates will keep the information about committal. Note that coordinating-only nodes are still free to switch clusters at will (after restart), as they don't carry any persistent state.
1 parent 09a11a3 commit 3c9f703

File tree

10 files changed

+261
-30
lines changed

10 files changed

+261
-30
lines changed

Diff for: server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java

+21-5
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ public void handleCommit(ApplyCommitRequest applyCommit) {
422422
logger.trace("handleCommit: applying commit request for term [{}] and version [{}]", applyCommit.getTerm(),
423423
applyCommit.getVersion());
424424

425-
persistedState.markLastAcceptedConfigAsCommitted();
425+
persistedState.markLastAcceptedStateAsCommitted();
426426
assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration());
427427
}
428428

@@ -471,16 +471,32 @@ public interface PersistedState {
471471
/**
472472
* Marks the last accepted cluster state as committed.
473473
* After a successful call to this method, {@link #getLastAcceptedState()} should return the last cluster state that was set,
474-
* with the last committed configuration now corresponding to the last accepted configuration.
474+
* with the last committed configuration now corresponding to the last accepted configuration, and the cluster uuid, if set,
475+
* marked as committed.
475476
*/
476-
default void markLastAcceptedConfigAsCommitted() {
477+
default void markLastAcceptedStateAsCommitted() {
477478
final ClusterState lastAcceptedState = getLastAcceptedState();
479+
MetaData.Builder metaDataBuilder = null;
478480
if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) {
479481
final CoordinationMetaData coordinationMetaData = CoordinationMetaData.builder(lastAcceptedState.coordinationMetaData())
480482
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
481483
.build();
482-
final MetaData metaData = MetaData.builder(lastAcceptedState.metaData()).coordinationMetaData(coordinationMetaData).build();
483-
setLastAcceptedState(ClusterState.builder(lastAcceptedState).metaData(metaData).build());
484+
metaDataBuilder = MetaData.builder(lastAcceptedState.metaData());
485+
metaDataBuilder.coordinationMetaData(coordinationMetaData);
486+
}
487+
// if we receive a commit from a Zen1 master that has not recovered its state yet, the cluster uuid might not been known yet.
488+
assert lastAcceptedState.metaData().clusterUUID().equals(MetaData.UNKNOWN_CLUSTER_UUID) == false ||
489+
lastAcceptedState.term() == ZEN1_BWC_TERM :
490+
"received cluster state with empty cluster uuid but not Zen1 BWC term: " + lastAcceptedState;
491+
if (lastAcceptedState.metaData().clusterUUID().equals(MetaData.UNKNOWN_CLUSTER_UUID) == false &&
492+
lastAcceptedState.metaData().clusterUUIDCommitted() == false) {
493+
if (metaDataBuilder == null) {
494+
metaDataBuilder = MetaData.builder(lastAcceptedState.metaData());
495+
}
496+
metaDataBuilder.clusterUUIDCommitted(true);
497+
}
498+
if (metaDataBuilder != null) {
499+
setLastAcceptedState(ClusterState.builder(lastAcceptedState).metaData(metaDataBuilder).build());
484500
}
485501
}
486502
}

Diff for: server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

+14-2
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
147147
this.masterService = masterService;
148148
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
149149
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
150-
this::getCurrentTerm, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators);
150+
this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators);
151151
this.persistedStateSupplier = persistedStateSupplier;
152152
this.discoverySettings = new DiscoverySettings(settings, clusterSettings);
153153
this.lastKnownLeader = Optional.empty();
@@ -281,7 +281,18 @@ PublishWithJoinResponse handlePublishRequest(PublishRequest publishRequest) {
281281
+ lastKnownLeader + ", rejecting");
282282
}
283283

284-
if (publishRequest.getAcceptedState().term() > coordinationState.get().getLastAcceptedState().term()) {
284+
final ClusterState localState = coordinationState.get().getLastAcceptedState();
285+
286+
if (localState.metaData().clusterUUIDCommitted() &&
287+
localState.metaData().clusterUUID().equals(publishRequest.getAcceptedState().metaData().clusterUUID()) == false) {
288+
logger.warn("received cluster state from {} with a different cluster uuid {} than local cluster uuid {}, rejecting",
289+
sourceNode, publishRequest.getAcceptedState().metaData().clusterUUID(), localState.metaData().clusterUUID());
290+
throw new CoordinationStateRejectedException("received cluster state from " + sourceNode +
291+
" with a different cluster uuid " + publishRequest.getAcceptedState().metaData().clusterUUID() +
292+
" than local cluster uuid " + localState.metaData().clusterUUID() + ", rejecting");
293+
}
294+
295+
if (publishRequest.getAcceptedState().term() > localState.term()) {
285296
// only do join validation if we have not accepted state from this master yet
286297
onJoinValidators.forEach(a -> a.accept(getLocalNode(), publishRequest.getAcceptedState()));
287298
}
@@ -653,6 +664,7 @@ public void invariant() {
653664
assert followersChecker.getFastResponseState().term == getCurrentTerm() : followersChecker.getFastResponseState();
654665
assert followersChecker.getFastResponseState().mode == getMode() : followersChecker.getFastResponseState();
655666
assert (applierState.nodes().getMasterNodeId() == null) == applierState.blocks().hasGlobalBlockWithId(NO_MASTER_BLOCK_ID);
667+
assert applierState.nodes().getMasterNodeId() == null || applierState.metaData().clusterUUIDCommitted();
656668
assert preVoteCollector.getPreVoteResponse().equals(getPreVoteResponse())
657669
: preVoteCollector + " vs " + getPreVoteResponse();
658670

Diff for: server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import java.util.function.BiConsumer;
6363
import java.util.function.Function;
6464
import java.util.function.LongSupplier;
65+
import java.util.function.Supplier;
6566

6667
public class JoinHelper {
6768

@@ -84,7 +85,7 @@ public class JoinHelper {
8485
final Set<Tuple<DiscoveryNode, JoinRequest>> pendingOutgoingJoins = ConcurrentCollections.newConcurrentSet();
8586

8687
public JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
87-
TransportService transportService, LongSupplier currentTermSupplier,
88+
TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
8889
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
8990
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators) {
9091
this.masterService = masterService;
@@ -132,6 +133,13 @@ public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentSta
132133
transportService.registerRequestHandler(VALIDATE_JOIN_ACTION_NAME,
133134
MembershipAction.ValidateJoinRequest::new, ThreadPool.Names.GENERIC,
134135
(request, channel, task) -> {
136+
final ClusterState localState = currentStateSupplier.get();
137+
if (localState.metaData().clusterUUIDCommitted() &&
138+
localState.metaData().clusterUUID().equals(request.getState().metaData().clusterUUID()) == false) {
139+
throw new CoordinationStateRejectedException("join validation on cluster state" +
140+
" with a different cluster uuid " + request.getState().metaData().clusterUUID() +
141+
" than local cluster uuid " + localState.metaData().clusterUUID() + ", rejecting");
142+
}
135143
joinValidators.forEach(action -> action.accept(transportService.getLocalNode(), request.getState()));
136144
channel.sendResponse(Empty.INSTANCE);
137145
});

Diff for: server/src/main/java/org/elasticsearch/cluster/metadata/MetaData.java

+49-6
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ public class MetaData implements Iterable<IndexMetaData>, Diffable<MetaData>, To
8888
private static final Logger logger = LogManager.getLogger(MetaData.class);
8989

9090
public static final String ALL = "_all";
91+
public static final String UNKNOWN_CLUSTER_UUID = "_na_";
9192

9293
public enum XContentContext {
9394
/* Custom metadata should be returns as part of API call */
@@ -159,6 +160,7 @@ public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, Clust
159160
private static final NamedDiffableValueSerializer<Custom> CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);
160161

161162
private final String clusterUUID;
163+
private final boolean clusterUUIDCommitted;
162164
private final long version;
163165

164166
private final CoordinationMetaData coordinationMetaData;
@@ -179,12 +181,13 @@ public interface Custom extends NamedDiffable<Custom>, ToXContentFragment, Clust
179181

180182
private final SortedMap<String, AliasOrIndex> aliasAndIndexLookup;
181183

182-
MetaData(String clusterUUID, long version, CoordinationMetaData coordinationMetaData,
184+
MetaData(String clusterUUID, boolean clusterUUIDCommitted, long version, CoordinationMetaData coordinationMetaData,
183185
Settings transientSettings, Settings persistentSettings,
184186
ImmutableOpenMap<String, IndexMetaData> indices, ImmutableOpenMap<String, IndexTemplateMetaData> templates,
185187
ImmutableOpenMap<String, Custom> customs, String[] allIndices, String[] allOpenIndices, String[] allClosedIndices,
186188
SortedMap<String, AliasOrIndex> aliasAndIndexLookup) {
187189
this.clusterUUID = clusterUUID;
190+
this.clusterUUIDCommitted = clusterUUIDCommitted;
188191
this.version = version;
189192
this.coordinationMetaData = coordinationMetaData;
190193
this.transientSettings = transientSettings;
@@ -218,6 +221,14 @@ public String clusterUUID() {
218221
return this.clusterUUID;
219222
}
220223

224+
/**
225+
* Whether the current node with the given cluster state is locked into the cluster with the UUID returned by {@link #clusterUUID()},
226+
* meaning that it will not accept any cluster state with a different clusterUUID.
227+
*/
228+
public boolean clusterUUIDCommitted() {
229+
return this.clusterUUIDCommitted;
230+
}
231+
221232
/**
222233
* Returns the merged transient and persistent settings.
223234
*/
@@ -757,6 +768,12 @@ public static boolean isGlobalStateEquals(MetaData metaData1, MetaData metaData2
757768
if (!metaData1.templates.equals(metaData2.templates())) {
758769
return false;
759770
}
771+
if (!metaData1.clusterUUID.equals(metaData2.clusterUUID)) {
772+
return false;
773+
}
774+
if (metaData1.clusterUUIDCommitted != metaData2.clusterUUIDCommitted) {
775+
return false;
776+
}
760777
// Check if any persistent metadata needs to be saved
761778
int customCount1 = 0;
762779
for (ObjectObjectCursor<String, Custom> cursor : metaData1.customs) {
@@ -798,6 +815,7 @@ private static class MetaDataDiff implements Diff<MetaData> {
798815

799816
private long version;
800817
private String clusterUUID;
818+
private boolean clusterUUIDCommitted;
801819
private CoordinationMetaData coordinationMetaData;
802820
private Settings transientSettings;
803821
private Settings persistentSettings;
@@ -807,6 +825,7 @@ private static class MetaDataDiff implements Diff<MetaData> {
807825

808826
MetaDataDiff(MetaData before, MetaData after) {
809827
clusterUUID = after.clusterUUID;
828+
clusterUUIDCommitted = after.clusterUUIDCommitted;
810829
version = after.version;
811830
coordinationMetaData = after.coordinationMetaData;
812831
transientSettings = after.transientSettings;
@@ -818,8 +837,11 @@ private static class MetaDataDiff implements Diff<MetaData> {
818837

819838
MetaDataDiff(StreamInput in) throws IOException {
820839
clusterUUID = in.readString();
840+
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
841+
clusterUUIDCommitted = in.readBoolean();
842+
}
821843
version = in.readLong();
822-
if (in.getVersion().onOrAfter(Version.V_7_0_0)) { //TODO revisit after Zen2 BWC is implemented
844+
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
823845
coordinationMetaData = new CoordinationMetaData(in);
824846
} else {
825847
coordinationMetaData = CoordinationMetaData.EMPTY_META_DATA;
@@ -836,6 +858,9 @@ private static class MetaDataDiff implements Diff<MetaData> {
836858
@Override
837859
public void writeTo(StreamOutput out) throws IOException {
838860
out.writeString(clusterUUID);
861+
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
862+
out.writeBoolean(clusterUUIDCommitted);
863+
}
839864
out.writeLong(version);
840865
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
841866
coordinationMetaData.writeTo(out);
@@ -851,6 +876,7 @@ public void writeTo(StreamOutput out) throws IOException {
851876
public MetaData apply(MetaData part) {
852877
Builder builder = builder();
853878
builder.clusterUUID(clusterUUID);
879+
builder.clusterUUIDCommitted(clusterUUIDCommitted);
854880
builder.version(version);
855881
builder.coordinationMetaData(coordinationMetaData);
856882
builder.transientSettings(transientSettings);
@@ -866,6 +892,9 @@ public static MetaData readFrom(StreamInput in) throws IOException {
866892
Builder builder = new Builder();
867893
builder.version = in.readLong();
868894
builder.clusterUUID = in.readString();
895+
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
896+
builder.clusterUUIDCommitted = in.readBoolean();
897+
}
869898
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
870899
builder.coordinationMetaData(new CoordinationMetaData(in));
871900
}
@@ -891,6 +920,9 @@ public static MetaData readFrom(StreamInput in) throws IOException {
891920
public void writeTo(StreamOutput out) throws IOException {
892921
out.writeLong(version);
893922
out.writeString(clusterUUID);
923+
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
924+
out.writeBoolean(clusterUUIDCommitted);
925+
}
894926
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
895927
coordinationMetaData.writeTo(out);
896928
}
@@ -930,6 +962,7 @@ public static Builder builder(MetaData metaData) {
930962
public static class Builder {
931963

932964
private String clusterUUID;
965+
private boolean clusterUUIDCommitted;
933966
private long version;
934967

935968
private CoordinationMetaData coordinationMetaData = CoordinationMetaData.EMPTY_META_DATA;
@@ -941,7 +974,7 @@ public static class Builder {
941974
private final ImmutableOpenMap.Builder<String, Custom> customs;
942975

943976
public Builder() {
944-
clusterUUID = "_na_";
977+
clusterUUID = UNKNOWN_CLUSTER_UUID;
945978
indices = ImmutableOpenMap.builder();
946979
templates = ImmutableOpenMap.builder();
947980
customs = ImmutableOpenMap.builder();
@@ -950,6 +983,7 @@ public Builder() {
950983

951984
public Builder(MetaData metaData) {
952985
this.clusterUUID = metaData.clusterUUID;
986+
this.clusterUUIDCommitted = metaData.clusterUUIDCommitted;
953987
this.coordinationMetaData = metaData.coordinationMetaData;
954988
this.transientSettings = metaData.transientSettings;
955989
this.persistentSettings = metaData.persistentSettings;
@@ -1125,8 +1159,13 @@ public Builder clusterUUID(String clusterUUID) {
11251159
return this;
11261160
}
11271161

1162+
public Builder clusterUUIDCommitted(boolean clusterUUIDCommitted) {
1163+
this.clusterUUIDCommitted = clusterUUIDCommitted;
1164+
return this;
1165+
}
1166+
11281167
public Builder generateClusterUuidIfNeeded() {
1129-
if (clusterUUID.equals("_na_")) {
1168+
if (clusterUUID.equals(UNKNOWN_CLUSTER_UUID)) {
11301169
clusterUUID = UUIDs.randomBase64UUID();
11311170
}
11321171
return this;
@@ -1182,8 +1221,9 @@ public MetaData build() {
11821221
String[] allOpenIndicesArray = allOpenIndices.toArray(new String[allOpenIndices.size()]);
11831222
String[] allClosedIndicesArray = allClosedIndices.toArray(new String[allClosedIndices.size()]);
11841223

1185-
return new MetaData(clusterUUID, version, coordinationMetaData, transientSettings, persistentSettings, indices.build(),
1186-
templates.build(), customs.build(), allIndicesArray, allOpenIndicesArray, allClosedIndicesArray, aliasAndIndexLookup);
1224+
return new MetaData(clusterUUID, clusterUUIDCommitted, version, coordinationMetaData, transientSettings, persistentSettings,
1225+
indices.build(), templates.build(), customs.build(), allIndicesArray, allOpenIndicesArray, allClosedIndicesArray,
1226+
aliasAndIndexLookup);
11871227
}
11881228

11891229
private SortedMap<String, AliasOrIndex> buildAliasAndIndexLookup() {
@@ -1226,6 +1266,7 @@ public static void toXContent(MetaData metaData, XContentBuilder builder, ToXCon
12261266

12271267
builder.field("version", metaData.version());
12281268
builder.field("cluster_uuid", metaData.clusterUUID);
1269+
builder.field("cluster_uuid_committed", metaData.clusterUUIDCommitted);
12291270

12301271
builder.startObject("cluster_coordination");
12311272
metaData.coordinationMetaData().toXContent(builder, params);
@@ -1324,6 +1365,8 @@ public static MetaData fromXContent(XContentParser parser) throws IOException {
13241365
builder.version = parser.longValue();
13251366
} else if ("cluster_uuid".equals(currentFieldName) || "uuid".equals(currentFieldName)) {
13261367
builder.clusterUUID = parser.text();
1368+
} else if ("cluster_uuid_committed".equals(currentFieldName)) {
1369+
builder.clusterUUIDCommitted = parser.booleanValue();
13271370
} else {
13281371
throw new IllegalArgumentException("Unexpected field [" + currentFieldName + "]");
13291372
}

0 commit comments

Comments
 (0)