Skip to content

Commit bdef2ab

Browse files
authored
Use m_m_nodes from Zen1 master for Zen2 bootstrap (#37701)
Today we support a smooth rolling upgrade from Zen1 to Zen2 by automatically bootstrapping the cluster once all the Zen1 nodes have left, as long as the `minimum_master_nodes` count is satisfied. However this means that Zen2 nodes also require the `minimum_master_nodes` setting for this one specific and transient situation. Since nodes only perform this automatic bootstrapping if they previously belonged to a Zen1 cluster, they can keep track of the `minimum_master_nodes` setting from the previous master instead of requiring it to be set on the Zen2 node.
1 parent 2908ca1 commit bdef2ab

File tree

12 files changed

+168
-46
lines changed

12 files changed

+168
-46
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/state/TransportClusterStateAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ private void buildResponse(final ClusterStateRequest request,
127127
ClusterState.Builder builder = ClusterState.builder(currentState.getClusterName());
128128
builder.version(currentState.version());
129129
builder.stateUUID(currentState.stateUUID());
130+
builder.minimumMasterNodesOnPublishingMaster(currentState.getMinimumMasterNodesOnPublishingMaster());
130131

131132
if (request.nodes()) {
132133
builder.nodes(currentState.nodes());

server/src/main/java/org/elasticsearch/cluster/ClusterState.java

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.carrotsearch.hppc.cursors.ObjectCursor;
2424
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
2525

26+
import org.elasticsearch.Version;
2627
import org.elasticsearch.client.transport.TransportClient;
2728
import org.elasticsearch.cluster.block.ClusterBlock;
2829
import org.elasticsearch.cluster.block.ClusterBlocks;
@@ -178,17 +179,19 @@ default boolean isPrivate() {
178179

179180
private final boolean wasReadFromDiff;
180181

182+
private final int minimumMasterNodesOnPublishingMaster;
183+
181184
// built on demand
182185
private volatile RoutingNodes routingNodes;
183186

184187
public ClusterState(long version, String stateUUID, ClusterState state) {
185188
this(state.clusterName, version, stateUUID, state.metaData(), state.routingTable(), state.nodes(), state.blocks(),
186-
state.customs(), false);
189+
state.customs(), -1, false);
187190
}
188191

189192
public ClusterState(ClusterName clusterName, long version, String stateUUID, MetaData metaData, RoutingTable routingTable,
190193
DiscoveryNodes nodes, ClusterBlocks blocks, ImmutableOpenMap<String, Custom> customs,
191-
boolean wasReadFromDiff) {
194+
int minimumMasterNodesOnPublishingMaster, boolean wasReadFromDiff) {
192195
this.version = version;
193196
this.stateUUID = stateUUID;
194197
this.clusterName = clusterName;
@@ -197,6 +200,7 @@ public ClusterState(ClusterName clusterName, long version, String stateUUID, Met
197200
this.nodes = nodes;
198201
this.blocks = blocks;
199202
this.customs = customs;
203+
this.minimumMasterNodesOnPublishingMaster = minimumMasterNodesOnPublishingMaster;
200204
this.wasReadFromDiff = wasReadFromDiff;
201205
}
202206

@@ -290,6 +294,17 @@ public Set<VotingConfigExclusion> getVotingConfigExclusions() {
290294
return coordinationMetaData().getVotingConfigExclusions();
291295
}
292296

297+
/**
298+
* The node-level `discovery.zen.minimum_master_nodes` setting on the master node that published this cluster state, for use in rolling
299+
* upgrades from 6.x to 7.x. Once all the 6.x master-eligible nodes have left the cluster, the 7.x nodes use this value to determine how
300+
* many master-eligible nodes must be discovered before the cluster can be bootstrapped. Note that this method returns the node-level
301+
* value of this setting, and ignores any cluster-level override that was set via the API. Callers are expected to combine this value
302+
* with any value set in the cluster-level settings. This should be removed once we no longer need support for {@link Version#V_6_7_0}.
303+
*/
304+
public int getMinimumMasterNodesOnPublishingMaster() {
305+
return minimumMasterNodesOnPublishingMaster;
306+
}
307+
293308
// Used for testing and logging to determine how this cluster state was send over the wire
294309
public boolean wasReadFromDiff() {
295310
return wasReadFromDiff;
@@ -644,7 +659,7 @@ public static class Builder {
644659
private ClusterBlocks blocks = ClusterBlocks.EMPTY_CLUSTER_BLOCK;
645660
private final ImmutableOpenMap.Builder<String, Custom> customs;
646661
private boolean fromDiff;
647-
662+
private int minimumMasterNodesOnPublishingMaster = -1;
648663

649664
public Builder(ClusterState state) {
650665
this.clusterName = state.clusterName;
@@ -655,6 +670,7 @@ public Builder(ClusterState state) {
655670
this.metaData = state.metaData();
656671
this.blocks = state.blocks();
657672
this.customs = ImmutableOpenMap.builder(state.customs());
673+
this.minimumMasterNodesOnPublishingMaster = state.minimumMasterNodesOnPublishingMaster;
658674
this.fromDiff = false;
659675
}
660676

@@ -715,6 +731,11 @@ public Builder stateUUID(String uuid) {
715731
return this;
716732
}
717733

734+
public Builder minimumMasterNodesOnPublishingMaster(int minimumMasterNodesOnPublishingMaster) {
735+
this.minimumMasterNodesOnPublishingMaster = minimumMasterNodesOnPublishingMaster;
736+
return this;
737+
}
738+
718739
public Builder putCustom(String type, Custom custom) {
719740
customs.put(type, custom);
720741
return this;
@@ -739,7 +760,8 @@ public ClusterState build() {
739760
if (UNKNOWN_UUID.equals(uuid)) {
740761
uuid = UUIDs.randomBase64UUID();
741762
}
742-
return new ClusterState(clusterName, version, uuid, metaData, routingTable, nodes, blocks, customs.build(), fromDiff);
763+
return new ClusterState(clusterName, version, uuid, metaData, routingTable, nodes, blocks, customs.build(),
764+
minimumMasterNodesOnPublishingMaster, fromDiff);
743765
}
744766

745767
public static byte[] toBytes(ClusterState state) throws IOException {
@@ -782,6 +804,7 @@ public static ClusterState readFrom(StreamInput in, DiscoveryNode localNode) thr
782804
Custom customIndexMetaData = in.readNamedWriteable(Custom.class);
783805
builder.putCustom(customIndexMetaData.getWriteableName(), customIndexMetaData);
784806
}
807+
builder.minimumMasterNodesOnPublishingMaster = in.getVersion().onOrAfter(Version.V_7_0_0) ? in.readVInt() : -1;
785808
return builder.build();
786809
}
787810

@@ -807,6 +830,9 @@ public void writeTo(StreamOutput out) throws IOException {
807830
out.writeNamedWriteable(cursor.value);
808831
}
809832
}
833+
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
834+
out.writeVInt(minimumMasterNodesOnPublishingMaster);
835+
}
810836
}
811837

812838
private static class ClusterStateDiff implements Diff<ClusterState> {
@@ -829,6 +855,8 @@ private static class ClusterStateDiff implements Diff<ClusterState> {
829855

830856
private final Diff<ImmutableOpenMap<String, Custom>> customs;
831857

858+
private final int minimumMasterNodesOnPublishingMaster;
859+
832860
ClusterStateDiff(ClusterState before, ClusterState after) {
833861
fromUuid = before.stateUUID;
834862
toUuid = after.stateUUID;
@@ -839,6 +867,7 @@ private static class ClusterStateDiff implements Diff<ClusterState> {
839867
metaData = after.metaData.diff(before.metaData);
840868
blocks = after.blocks.diff(before.blocks);
841869
customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER);
870+
minimumMasterNodesOnPublishingMaster = after.minimumMasterNodesOnPublishingMaster;
842871
}
843872

844873
ClusterStateDiff(StreamInput in, DiscoveryNode localNode) throws IOException {
@@ -851,6 +880,7 @@ private static class ClusterStateDiff implements Diff<ClusterState> {
851880
metaData = MetaData.readDiffFrom(in);
852881
blocks = ClusterBlocks.readDiffFrom(in);
853882
customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), CUSTOM_VALUE_SERIALIZER);
883+
minimumMasterNodesOnPublishingMaster = in.getVersion().onOrAfter(Version.V_7_0_0) ? in.readVInt() : -1;
854884
}
855885

856886
@Override
@@ -864,6 +894,9 @@ public void writeTo(StreamOutput out) throws IOException {
864894
metaData.writeTo(out);
865895
blocks.writeTo(out);
866896
customs.writeTo(out);
897+
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
898+
out.writeVInt(minimumMasterNodesOnPublishingMaster);
899+
}
867900
}
868901

869902
@Override
@@ -883,9 +916,9 @@ public ClusterState apply(ClusterState state) {
883916
builder.metaData(metaData.apply(state.metaData));
884917
builder.blocks(blocks.apply(state.blocks));
885918
builder.customs(customs.apply(state.customs));
919+
builder.minimumMasterNodesOnPublishingMaster(minimumMasterNodesOnPublishingMaster);
886920
builder.fromDiff(true);
887921
return builder.build();
888922
}
889-
890923
}
891924
}

server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSe
168168
this.reconfigurator = new Reconfigurator(settings, clusterSettings);
169169
this.clusterBootstrapService = new ClusterBootstrapService(settings, transportService, this::getFoundPeers,
170170
this::isInitialConfigurationSet, this::setInitialConfiguration);
171-
this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, clusterSettings, transportService,
171+
this.discoveryUpgradeService = new DiscoveryUpgradeService(settings, transportService,
172172
this::isInitialConfigurationSet, joinHelper, peerFinder::getFoundPeers, this::setInitialConfiguration);
173173
this.lagDetector = new LagDetector(settings, transportService.getThreadPool(), n -> removeNode(n, "lagging"),
174174
transportService::getLocalNode);
@@ -467,7 +467,7 @@ void becomeCandidate(String method) {
467467
clusterFormationFailureHelper.start();
468468

469469
if (getCurrentTerm() == ZEN1_BWC_TERM) {
470-
discoveryUpgradeService.activate(lastKnownLeader);
470+
discoveryUpgradeService.activate(lastKnownLeader, coordinationState.get().getLastAcceptedState());
471471
}
472472

473473
leaderChecker.setCurrentNodes(DiscoveryNodes.EMPTY_NODES);

server/src/main/java/org/elasticsearch/cluster/coordination/DiscoveryUpgradeService.java

Lines changed: 39 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@
2424
import org.elasticsearch.Version;
2525
import org.elasticsearch.action.ActionListener;
2626
import org.elasticsearch.cluster.ClusterName;
27+
import org.elasticsearch.cluster.ClusterState;
2728
import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration;
2829
import org.elasticsearch.cluster.node.DiscoveryNode;
2930
import org.elasticsearch.common.Nullable;
3031
import org.elasticsearch.common.io.stream.StreamInput;
31-
import org.elasticsearch.common.settings.ClusterSettings;
3232
import org.elasticsearch.common.settings.Setting;
3333
import org.elasticsearch.common.settings.Settings;
3434
import org.elasticsearch.common.unit.TimeValue;
@@ -60,6 +60,7 @@
6060
import static org.elasticsearch.cluster.ClusterName.CLUSTER_NAME_SETTING;
6161
import static org.elasticsearch.cluster.ClusterState.UNKNOWN_VERSION;
6262
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;
63+
import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
6364
import static org.elasticsearch.discovery.zen.ZenDiscovery.PING_TIMEOUT_SETTING;
6465

6566
/**
@@ -80,7 +81,12 @@ public class DiscoveryUpgradeService {
8081
public static final Setting<Boolean> ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING =
8182
Setting.boolSetting("discovery.zen.unsafe_rolling_upgrades_enabled", true, Setting.Property.NodeScope);
8283

83-
private final ElectMasterService electMasterService;
84+
/**
85+
* Dummy {@link ElectMasterService} that is only used to choose the best 6.x master from the discovered nodes, ignoring the
86+
* `minimum_master_nodes` setting.
87+
*/
88+
private static final ElectMasterService electMasterService = new ElectMasterService(Settings.EMPTY);
89+
8490
private final TransportService transportService;
8591
private final BooleanSupplier isBootstrappedSupplier;
8692
private final JoinHelper joinHelper;
@@ -93,12 +99,11 @@ public class DiscoveryUpgradeService {
9399
@Nullable // null if no active joining round
94100
private volatile JoiningRound joiningRound;
95101

96-
public DiscoveryUpgradeService(Settings settings, ClusterSettings clusterSettings, TransportService transportService,
102+
public DiscoveryUpgradeService(Settings settings, TransportService transportService,
97103
BooleanSupplier isBootstrappedSupplier, JoinHelper joinHelper,
98104
Supplier<Iterable<DiscoveryNode>> peersSupplier,
99105
Consumer<VotingConfiguration> initialConfigurationConsumer) {
100106
assert Version.CURRENT.major == Version.V_6_6_0.major + 1 : "remove this service once unsafe upgrades are no longer needed";
101-
electMasterService = new ElectMasterService(settings);
102107
this.transportService = transportService;
103108
this.isBootstrappedSupplier = isBootstrappedSupplier;
104109
this.joinHelper = joinHelper;
@@ -107,12 +112,9 @@ public DiscoveryUpgradeService(Settings settings, ClusterSettings clusterSetting
107112
this.bwcPingTimeout = BWC_PING_TIMEOUT_SETTING.get(settings);
108113
this.enableUnsafeBootstrappingOnUpgrade = ENABLE_UNSAFE_BOOTSTRAPPING_ON_UPGRADE_SETTING.get(settings);
109114
this.clusterName = CLUSTER_NAME_SETTING.get(settings);
110-
111-
clusterSettings.addSettingsUpdateConsumer(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING,
112-
electMasterService::minimumMasterNodes); // TODO reject update if the new value is too large
113115
}
114116

115-
public void activate(Optional<DiscoveryNode> lastKnownLeader) {
117+
public void activate(Optional<DiscoveryNode> lastKnownLeader, ClusterState lastAcceptedClusterState) {
116118
// called under coordinator mutex
117119

118120
if (isBootstrappedSupplier.getAsBoolean()) {
@@ -122,8 +124,13 @@ public void activate(Optional<DiscoveryNode> lastKnownLeader) {
122124
assert lastKnownLeader.isPresent() == false || Coordinator.isZen1Node(lastKnownLeader.get()) : lastKnownLeader;
123125
// if there was a leader and it's not a old node then we must have been bootstrapped
124126

127+
final Settings dynamicSettings = lastAcceptedClusterState.metaData().settings();
128+
final int minimumMasterNodes = DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.exists(dynamicSettings)
129+
? DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(dynamicSettings)
130+
: lastAcceptedClusterState.getMinimumMasterNodesOnPublishingMaster();
131+
125132
assert joiningRound == null : joiningRound;
126-
joiningRound = new JoiningRound(lastKnownLeader.isPresent());
133+
joiningRound = new JoiningRound(enableUnsafeBootstrappingOnUpgrade && lastKnownLeader.isPresent(), minimumMasterNodes);
127134
joiningRound.scheduleNextAttempt();
128135
}
129136

@@ -160,15 +167,21 @@ void countDown() {
160167

161168
private class JoiningRound {
162169
private final boolean upgrading;
170+
private final int minimumMasterNodes;
163171

164-
JoiningRound(boolean upgrading) {
172+
JoiningRound(boolean upgrading, int minimumMasterNodes) {
165173
this.upgrading = upgrading;
174+
this.minimumMasterNodes = minimumMasterNodes;
166175
}
167176

168177
private boolean isRunning() {
169178
return joiningRound == this && isBootstrappedSupplier.getAsBoolean() == false;
170179
}
171180

181+
private boolean canBootstrap(Set<DiscoveryNode> discoveryNodes) {
182+
return upgrading && minimumMasterNodes <= discoveryNodes.stream().filter(DiscoveryNode::isMasterNode).count();
183+
}
184+
172185
void scheduleNextAttempt() {
173186
if (isRunning() == false) {
174187
return;
@@ -189,26 +202,22 @@ public void run() {
189202
// this set of nodes is reasonably fresh - the PeerFinder cleans up nodes to which the transport service is not
190203
// connected each time it wakes up (every second by default)
191204

192-
logger.debug("nodes: {}", discoveryNodes);
193-
194-
if (electMasterService.hasEnoughMasterNodes(discoveryNodes)) {
195-
if (discoveryNodes.stream().anyMatch(Coordinator::isZen1Node)) {
196-
electBestOldMaster(discoveryNodes);
197-
} else if (upgrading && enableUnsafeBootstrappingOnUpgrade) {
198-
// no Zen1 nodes found, but the last-known master was a Zen1 node, so this is a rolling upgrade
199-
transportService.getThreadPool().generic().execute(() -> {
200-
try {
201-
initialConfigurationConsumer.accept(new VotingConfiguration(discoveryNodes.stream()
202-
.map(DiscoveryNode::getId).collect(Collectors.toSet())));
203-
} catch (Exception e) {
204-
logger.debug("exception during bootstrapping upgrade, retrying", e);
205-
} finally {
206-
scheduleNextAttempt();
207-
}
208-
});
209-
} else {
210-
scheduleNextAttempt();
211-
}
205+
logger.debug("upgrading={}, minimumMasterNodes={}, nodes={}", upgrading, minimumMasterNodes, discoveryNodes);
206+
207+
if (discoveryNodes.stream().anyMatch(Coordinator::isZen1Node)) {
208+
electBestOldMaster(discoveryNodes);
209+
} else if (canBootstrap(discoveryNodes)) {
210+
// no Zen1 nodes found, but the last-known master was a Zen1 node, so this is a rolling upgrade
211+
transportService.getThreadPool().generic().execute(() -> {
212+
try {
213+
initialConfigurationConsumer.accept(new VotingConfiguration(discoveryNodes.stream()
214+
.map(DiscoveryNode::getId).collect(Collectors.toSet())));
215+
} catch (Exception e) {
216+
logger.debug("exception during bootstrapping upgrade, retrying", e);
217+
} finally {
218+
scheduleNextAttempt();
219+
}
220+
});
212221
} else {
213222
scheduleNextAttempt();
214223
}

server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public JoinHelper(Settings settings, AllocationService allocationService, Master
9090
this.masterService = masterService;
9191
this.transportService = transportService;
9292
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
93-
this.joinTaskExecutor = new JoinTaskExecutor(allocationService, logger) {
93+
this.joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger) {
9494

9595
@Override
9696
public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentState, List<JoinTaskExecutor.Task> joiningTasks)

0 commit comments

Comments
 (0)