diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java index df0bcd9728fad..9b137070cb8cd 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java @@ -205,6 +205,13 @@ public void applyClusterState(ClusterChangedEvent event) { } try { + // Hack: This is to ensure that non-master-eligible Zen2 nodes always store a current term + // that's higher than the last accepted term. + // TODO: can we get rid of this hack? + if (event.state().term() > getCurrentTerm()) { + innerSetCurrentTerm(event.state().term()); + } + updateClusterState(event.state(), event.previousState()); incrementalWrite = true; } catch (WriteStateException e) { @@ -225,17 +232,21 @@ public ClusterState getLastAcceptedState() { @Override public void setCurrentTerm(long currentTerm) { - Manifest manifest = new Manifest(currentTerm, previousManifest.getClusterStateVersion(), previousManifest.getGlobalGeneration(), - new HashMap<>(previousManifest.getIndexGenerations())); try { - metaStateService.writeManifestAndCleanup("current term changed", manifest); - previousManifest = manifest; + innerSetCurrentTerm(currentTerm); } catch (WriteStateException e) { logger.warn("Exception occurred when setting current term", e); //TODO re-throw exception } } + private void innerSetCurrentTerm(long currentTerm) throws WriteStateException { + Manifest manifest = new Manifest(currentTerm, previousManifest.getClusterStateVersion(), previousManifest.getGlobalGeneration(), + new HashMap<>(previousManifest.getIndexGenerations())); + metaStateService.writeManifestAndCleanup("current term changed", manifest); + previousManifest = manifest; + } + @Override public void setLastAcceptedState(ClusterState clusterState) { try { diff --git a/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java b/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java index 815345f7535d1..41007b9e9fd02 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java +++ b/test/framework/src/main/java/org/elasticsearch/test/discovery/TestZenDiscovery.java @@ -21,8 +21,11 @@ import org.elasticsearch.cluster.coordination.CoordinationState; import org.elasticsearch.cluster.coordination.Coordinator; +import org.elasticsearch.cluster.coordination.InMemoryPersistedState; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterApplier; +import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.MasterService; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -80,6 +83,12 @@ public Map> getDiscoveryTypes(ThreadPool threadPool, if (USE_ZEN2.get(settings)) { Supplier persistedStateSupplier = () -> { gatewayMetaState.applyClusterStateUpdaters(); + if (DiscoveryNode.isMasterNode(settings) == false) { + // use Zen1 way of writing cluster state for non-master-eligible nodes + // this avoids concurrent manipulating of IndexMetadata with IndicesStore + ((ClusterApplierService) clusterApplier).addLowPriorityApplier(gatewayMetaState); + return new InMemoryPersistedState(gatewayMetaState.getCurrentTerm(), gatewayMetaState.getLastAcceptedState()); + } return gatewayMetaState; };