diff --git a/build.gradle b/build.gradle index ee64ad599ea13..58302a75c8817 100644 --- a/build.gradle +++ b/build.gradle @@ -206,8 +206,8 @@ task verifyVersions { * after the backport of the backcompat code is complete. */ -boolean bwc_tests_enabled = true -final String bwc_tests_disabled_issue = "" /* place a PR link here when committing bwc changes */ +boolean bwc_tests_enabled = false +final String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/issues/48701" /* place a PR link here when committing bwc changes */ if (bwc_tests_enabled == false) { if (bwc_tests_disabled_issue.isEmpty()) { throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false") diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java index 6973f165e7d4b..4862e9e4c0862 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/CoordinationState.java @@ -25,6 +25,8 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import java.io.Closeable; +import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -432,15 +434,14 @@ public void invariant() { assert publishVotes.isEmpty() || electionWon(); } - public void close() { + public void close() throws IOException { persistedState.close(); } /** * Pluggable persistence layer for {@link CoordinationState}. - * */ - public interface PersistedState { + public interface PersistedState extends Closeable { /** * Returns the current term @@ -497,7 +498,8 @@ default void markLastAcceptedStateAsCommitted() { } } - default void close() {} + default void close() throws IOException { + } } /** diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index d664a3de193e5..20b3637680fb9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -72,6 +72,7 @@ import org.elasticsearch.transport.TransportResponse.Empty; import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -702,7 +703,7 @@ protected void doStop() { } @Override - protected void doClose() { + protected void doClose() throws IOException { final CoordinationState coordinationState = this.coordinationState.get(); if (coordinationState != null) { // This looks like a race that might leak an unclosed CoordinationState if it's created while execution is here, but this method diff --git a/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java index 160662a63e5b3..06ccc4633a672 100644 --- a/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -49,6 +49,7 @@ import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.gateway.LucenePersistedStateFactory; import org.elasticsearch.gateway.MetaDataStateFormat; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; @@ -380,6 +381,16 @@ private static boolean upgradeLegacyNodeFolders(Logger logger, Settings settings // determine folders to move and check that there are no extra files/folders final Set folderNames = new HashSet<>(); + final Set expectedFolderNames = new HashSet<>(Arrays.asList( + + // node state directory, also containing MetaDataStateFormat-based global metadata + MetaDataStateFormat.STATE_DIR_NAME, + + // Lucene-based metadata folder + LucenePersistedStateFactory.METADATA_DIRECTORY_NAME, + + // indices + INDICES_FOLDER)); try (DirectoryStream stream = Files.newDirectoryStream(legacyNodePath.path)) { for (Path subFolderPath : stream) { @@ -387,8 +398,7 @@ private static boolean upgradeLegacyNodeFolders(Logger logger, Settings settings if (FileSystemUtils.isDesktopServicesStore(subFolderPath)) { // ignore } else if (FileSystemUtils.isAccessibleDirectory(subFolderPath, logger)) { - if (fileName.equals(INDICES_FOLDER) == false && // indices folder - fileName.equals(MetaDataStateFormat.STATE_DIR_NAME) == false) { // global metadata & node state folder + if (expectedFolderNames.contains(fileName) == false) { throw new IllegalStateException("unexpected folder encountered during data folder upgrade: " + subFolderPath); } @@ -406,7 +416,7 @@ private static boolean upgradeLegacyNodeFolders(Logger logger, Settings settings } } - assert Sets.difference(folderNames, Sets.newHashSet(INDICES_FOLDER, MetaDataStateFormat.STATE_DIR_NAME)).isEmpty() : + assert Sets.difference(folderNames, expectedFolderNames).isEmpty() : "expected indices and/or state dir folder but was " + folderNames; upgradeActions.add(() -> { diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java index c60f5d6b7d40c..64fce2e5115d1 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java @@ -22,7 +22,6 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.Version; @@ -43,10 +42,12 @@ import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.Index; import org.elasticsearch.plugins.MetaDataUpgrader; import org.elasticsearch.transport.TransportService; +import java.io.Closeable; import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -63,7 +64,7 @@ * ClusterState#metaData()} because it might be stale or incomplete. Master-eligible nodes must perform an election to find a complete and * non-stale state, and master-ineligible nodes receive the real cluster state from the elected master after joining the cluster. */ -public class GatewayMetaState { +public class GatewayMetaState implements Closeable { private static final Logger logger = LogManager.getLogger(GatewayMetaState.class); // Set by calling start() @@ -81,49 +82,46 @@ public MetaData getMetaData() { public void start(Settings settings, TransportService transportService, ClusterService clusterService, MetaStateService metaStateService, MetaDataIndexUpgradeService metaDataIndexUpgradeService, - MetaDataUpgrader metaDataUpgrader) { + MetaDataUpgrader metaDataUpgrader, LucenePersistedStateFactory lucenePersistedStateFactory) { assert persistedState.get() == null : "should only start once, but already have " + persistedState.get(); - final Tuple manifestClusterStateTuple; - try { - upgradeMetaData(settings, metaStateService, metaDataIndexUpgradeService, metaDataUpgrader); - manifestClusterStateTuple = loadStateAndManifest(ClusterName.CLUSTER_NAME_SETTING.get(settings), metaStateService); - } catch (IOException e) { - throw new ElasticsearchException("failed to load metadata", e); + if (DiscoveryNode.isMasterNode(settings)) { + try { + persistedState.set(lucenePersistedStateFactory.loadPersistedState((version, metadata) -> + prepareInitialClusterState(transportService, clusterService, + ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)) + .version(version) + .metaData(upgradeMetaDataForMasterEligibleNode(metadata, metaDataIndexUpgradeService, metaDataUpgrader)) + .build()))); + } catch (IOException e) { + throw new ElasticsearchException("failed to load metadata", e); + } } - final IncrementalClusterStateWriter incrementalClusterStateWriter - = new IncrementalClusterStateWriter(settings, clusterService.getClusterSettings(), metaStateService, + if (DiscoveryNode.isDataNode(settings)) { + final Tuple manifestClusterStateTuple; + try { + upgradeMetaData(settings, metaStateService, metaDataIndexUpgradeService, metaDataUpgrader); + manifestClusterStateTuple = loadStateAndManifest(ClusterName.CLUSTER_NAME_SETTING.get(settings), metaStateService); + } catch (IOException e) { + throw new ElasticsearchException("failed to load metadata", e); + } + + final IncrementalClusterStateWriter incrementalClusterStateWriter + = new IncrementalClusterStateWriter(settings, clusterService.getClusterSettings(), metaStateService, manifestClusterStateTuple.v1(), prepareInitialClusterState(transportService, clusterService, manifestClusterStateTuple.v2()), transportService.getThreadPool()::relativeTimeInMillis); - if (DiscoveryNode.isMasterNode(settings) == false) { - if (DiscoveryNode.isDataNode(settings)) { - // Master-eligible nodes persist index metadata for all indices regardless of whether they hold any shards or not. It's - // vitally important to the safety of the cluster coordination system that master-eligible nodes persist this metadata when - // _accepting_ the cluster state (i.e. before it is committed). This persistence happens on the generic threadpool. - // - // In contrast, master-ineligible data nodes only persist the index metadata for shards that they hold. When all shards of - // an index are moved off such a node the IndicesStore is responsible for removing the corresponding index directory, - // including the metadata, and does so on the cluster applier thread. - // - // This presents a problem: if a shard is unassigned from a node and then reassigned back to it again then there is a race - // between the IndicesStore deleting the index folder and the CoordinationState concurrently trying to write the updated - // metadata into it. We could probably solve this with careful synchronization, but in fact there is no need. The persisted - // state on master-ineligible data nodes is mostly ignored - it's only there to support dangling index imports, which is - // inherently unsafe anyway. Thus we can safely delay metadata writes on master-ineligible data nodes until applying the - // cluster state, which is what this does: - clusterService.addLowPriorityApplier(new GatewayClusterApplier(incrementalClusterStateWriter)); - } - // Master-ineligible nodes do not need to persist the cluster state when accepting it because they are not in the voting - // configuration, so it's ok if they have a stale or incomplete cluster state when restarted. We track the latest cluster state - // in memory instead. - persistedState.set(new InMemoryPersistedState(manifestClusterStateTuple.v1().getCurrentTerm(), manifestClusterStateTuple.v2())); - } else { - // Master-ineligible nodes must persist the cluster state when accepting it because they must reload the (complete, fresh) - // last-accepted cluster state when restarted. - persistedState.set(new GatewayPersistedState(incrementalClusterStateWriter)); + clusterService.addLowPriorityApplier(new GatewayClusterApplier(incrementalClusterStateWriter)); + + if (DiscoveryNode.isMasterNode(settings) == false) { + persistedState.set( + new InMemoryPersistedState(manifestClusterStateTuple.v1().getCurrentTerm(), manifestClusterStateTuple.v2())); + } + } else if (DiscoveryNode.isMasterNode(settings) == false) { + persistedState.set( + new InMemoryPersistedState(0L, ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)).build())); } } @@ -139,6 +137,13 @@ ClusterState prepareInitialClusterState(TransportService transportService, Clust .apply(clusterState); } + // exposed so it can be overridden by tests + MetaData upgradeMetaDataForMasterEligibleNode(MetaData metaData, + MetaDataIndexUpgradeService metaDataIndexUpgradeService, + MetaDataUpgrader metaDataUpgrader) { + return upgradeMetaData(metaData, metaDataIndexUpgradeService, metaDataUpgrader); + } + // exposed so it can be overridden by tests void upgradeMetaData(Settings settings, MetaStateService metaStateService, MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataUpgrader metaDataUpgrader) throws IOException { @@ -252,6 +257,10 @@ private static boolean applyPluginUpgraders(ImmutableOpenMap clusterStateFromMetaData) + throws IOException { + + final OnDiskState onDiskState = loadBestOnDiskState(); + + final List metaDataIndexWriters = new ArrayList<>(); + final List closeables = new ArrayList<>(); + boolean success = false; + try { + for (final Path path : nodeEnvironment.nodeDataPaths()) { + final Directory directory = createDirectory(path.resolve(METADATA_DIRECTORY_NAME)); + closeables.add(directory); + + final IndexWriterConfig indexWriterConfig = new IndexWriterConfig(new KeywordAnalyzer()); + // start empty since we re-write the whole cluster state to ensure it is all using the same format version + indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE); + // only commit when specifically instructed, we must not write any intermediate states + indexWriterConfig.setCommitOnClose(false); + // most of the data goes into stored fields which are not buffered, so we only really need a tiny buffer + indexWriterConfig.setRAMBufferSizeMB(1.0); + // merge on the write thread (e.g. while flushing) + indexWriterConfig.setMergeScheduler(new SerialMergeScheduler()); + + final IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig); + closeables.add(indexWriter); + metaDataIndexWriters.add(new MetaDataIndexWriter(directory, indexWriter)); + } + success = true; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(closeables); + } + } + + final ClusterState clusterState = clusterStateFromMetaData.apply(onDiskState.lastAcceptedVersion, onDiskState.metaData); + final LucenePersistedState lucenePersistedState + = new LucenePersistedState(nodeEnvironment.nodeId(), metaDataIndexWriters, onDiskState.currentTerm, clusterState, bigArrays); + success = false; + try { + lucenePersistedState.persistInitialState(); + success = true; + return lucenePersistedState; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(lucenePersistedState); + } + } + } + + // exposed for tests + Directory createDirectory(Path path) throws IOException { + // it is possible to disable the use of MMapDirectory for indices, and it may be surprising to users that have done so if we still + // use a MMapDirectory here, which might happen with FSDirectory.open(path). Concurrency is of no concern here so a + // SimpleFSDirectory is fine: + return new SimpleFSDirectory(path); + } + + private static class OnDiskState { + final String nodeId; + final Path dataPath; + final long currentTerm; + final long lastAcceptedVersion; + final MetaData metaData; + + private OnDiskState(String nodeId, Path dataPath, long currentTerm, long lastAcceptedVersion, MetaData metaData) { + this.nodeId = nodeId; + this.dataPath = dataPath; + this.currentTerm = currentTerm; + this.lastAcceptedVersion = lastAcceptedVersion; + this.metaData = metaData; + } + } + + private OnDiskState loadBestOnDiskState() throws IOException { + String committedClusterUuid = null; + Path committedClusterUuidPath = null; + OnDiskState bestOnDiskState = new OnDiskState(null, null, 0L, 0L, MetaData.EMPTY_META_DATA); + OnDiskState maxCurrentTermOnDiskState = bestOnDiskState; + + // We use a write-all-read-one strategy: metadata is written to every data path when accepting it, which means it is mostly + // sufficient to read _any_ copy. "Mostly" sufficient because the user can change the set of data paths when restarting, and may + // add a data path containing a stale copy of the metadata. We deal with this by using the freshest copy we can find. + for (final Path dataPath : nodeEnvironment.nodeDataPaths()) { + final Path indexPath = dataPath.resolve(METADATA_DIRECTORY_NAME); + if (Files.exists(indexPath)) { + try (Directory directory = createDirectory(indexPath); + DirectoryReader directoryReader = DirectoryReader.open(directory)) { + final OnDiskState onDiskState = loadOnDiskState(dataPath, directoryReader); + + if (nodeEnvironment.nodeId().equals(onDiskState.nodeId) == false) { + throw new IllegalStateException("unexpected node ID in metadata, found [" + onDiskState.nodeId + + "] in [" + dataPath + "] but expected [" + nodeEnvironment.nodeId() + "]"); + } + + if (onDiskState.metaData.clusterUUIDCommitted()) { + if (committedClusterUuid == null) { + committedClusterUuid = onDiskState.metaData.clusterUUID(); + committedClusterUuidPath = dataPath; + } else if (committedClusterUuid.equals(onDiskState.metaData.clusterUUID()) == false) { + throw new IllegalStateException("mismatched cluster UUIDs in metadata, found [" + committedClusterUuid + + "] in [" + committedClusterUuidPath + "] and [" + onDiskState.metaData.clusterUUID() + "] in [" + + dataPath + "]"); + } + } + + if (maxCurrentTermOnDiskState.currentTerm < onDiskState.currentTerm || maxCurrentTermOnDiskState.dataPath == null) { + maxCurrentTermOnDiskState = onDiskState; + } + + long acceptedTerm = onDiskState.metaData.coordinationMetaData().term(); + long maxAcceptedTerm = bestOnDiskState.metaData.coordinationMetaData().term(); + if (acceptedTerm > maxAcceptedTerm + || (acceptedTerm == maxAcceptedTerm + && (onDiskState.lastAcceptedVersion > bestOnDiskState.lastAcceptedVersion + || (onDiskState.lastAcceptedVersion == bestOnDiskState.lastAcceptedVersion) + && onDiskState.currentTerm > bestOnDiskState.currentTerm))) { + bestOnDiskState = onDiskState; + } + } catch (IndexNotFoundException e) { + logger.debug(new ParameterizedMessage("no on-disk state at {}", indexPath), e); + } + } + } + + if (bestOnDiskState.currentTerm != maxCurrentTermOnDiskState.currentTerm) { + throw new IllegalStateException("inconsistent terms found: best state is from [" + bestOnDiskState.dataPath + + "] in term [" + bestOnDiskState.currentTerm + "] but there is a stale state in [" + maxCurrentTermOnDiskState.dataPath + + "] with greater term [" + maxCurrentTermOnDiskState.currentTerm + "]"); + } + + return bestOnDiskState; + } + + private OnDiskState loadOnDiskState(Path dataPath, DirectoryReader reader) throws IOException { + final IndexSearcher searcher = new IndexSearcher(reader); + searcher.setQueryCache(null); + + final SetOnce builderReference = new SetOnce<>(); + consumeFromType(searcher, GLOBAL_TYPE_NAME, bytes -> + { + final MetaData metaData = MetaData.fromXContent(XContentFactory.xContent(XContentType.SMILE) + .createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, bytes.bytes, bytes.offset, bytes.length)); + logger.trace("found global metadata with last-accepted term [{}]", metaData.coordinationMetaData().term()); + if (builderReference.get() != null) { + throw new IllegalStateException("duplicate global metadata found in [" + dataPath + "]"); + } + builderReference.set(MetaData.builder(metaData)); + }); + + final MetaData.Builder builder = builderReference.get(); + if (builder == null) { + throw new IllegalStateException("no global metadata found in [" + dataPath + "]"); + } + + logger.trace("got global metadata, now reading index metadata"); + + final Set indexUUIDs = new HashSet<>(); + consumeFromType(searcher, INDEX_TYPE_NAME, bytes -> + { + final IndexMetaData indexMetaData = IndexMetaData.fromXContent(XContentFactory.xContent(XContentType.SMILE) + .createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, bytes.bytes, bytes.offset, bytes.length)); + logger.trace("found index metadata for {}", indexMetaData.getIndex()); + if (indexUUIDs.add(indexMetaData.getIndexUUID()) == false) { + throw new IllegalStateException("duplicate metadata found for " + indexMetaData.getIndex() + " in [" + dataPath + "]"); + } + builder.put(indexMetaData, false); + }); + + final Map userData = reader.getIndexCommit().getUserData(); + logger.trace("loaded metadata [{}] from [{}]", userData, reader.directory()); + assert userData.size() == COMMIT_DATA_SIZE : userData; + assert userData.get(CURRENT_TERM_KEY) != null; + assert userData.get(LAST_ACCEPTED_VERSION_KEY) != null; + assert userData.get(NODE_ID_KEY) != null; + assert userData.get(NODE_VERSION_KEY) != null; + return new OnDiskState(userData.get(NODE_ID_KEY), dataPath, Long.parseLong(userData.get(CURRENT_TERM_KEY)), + Long.parseLong(userData.get(LAST_ACCEPTED_VERSION_KEY)), builder.build()); + } + + private static void consumeFromType(IndexSearcher indexSearcher, String type, + CheckedConsumer bytesRefConsumer) throws IOException { + + final Query query = new TermQuery(new Term(TYPE_FIELD_NAME, type)); + final Weight weight = indexSearcher.createWeight(query, ScoreMode.COMPLETE_NO_SCORES, 0.0f); + logger.trace("running query [{}]", query); + + for (LeafReaderContext leafReaderContext : indexSearcher.getIndexReader().leaves()) { + logger.trace("new leafReaderContext: {}", leafReaderContext); + final Scorer scorer = weight.scorer(leafReaderContext); + if (scorer != null) { + final Bits liveDocs = leafReaderContext.reader().getLiveDocs(); + final IntPredicate isLiveDoc = liveDocs == null ? i -> true : liveDocs::get; + final DocIdSetIterator docIdSetIterator = scorer.iterator(); + while (docIdSetIterator.nextDoc() != DocIdSetIterator.NO_MORE_DOCS) { + if (isLiveDoc.test(docIdSetIterator.docID())) { + logger.trace("processing doc {}", docIdSetIterator.docID()); + bytesRefConsumer.accept( + leafReaderContext.reader().document(docIdSetIterator.docID()).getBinaryValue(DATA_FIELD_NAME)); + } + } + } + } + } + + private static final ToXContent.Params FORMAT_PARAMS; + + static { + Map params = new HashMap<>(2); + params.put("binary", "true"); + params.put(MetaData.CONTEXT_MODE_PARAM, MetaData.CONTEXT_MODE_GATEWAY); + FORMAT_PARAMS = new ToXContent.MapParams(params); + } + + /** + * A {@link Document} with a stored field containing serialized metadata written to a {@link ReleasableBytesStreamOutput} which must be + * released when no longer needed. + */ + private static class ReleasableDocument implements Releasable { + private final Document document; + private final Releasable releasable; + + ReleasableDocument(Document document, Releasable releasable) { + this.document = document; + this.releasable = releasable; + } + + Document getDocument() { + return document; + } + + @Override + public void close() { + releasable.close(); + } + } + + /** + * Encapsulates a single {@link IndexWriter} with its {@link Directory} for ease of closing, and a {@link Logger}. There is one of these + * for each data path. + */ + private static class MetaDataIndexWriter implements Closeable { + + private final Logger logger; + private final Directory directory; + private final IndexWriter indexWriter; + + MetaDataIndexWriter(Directory directory, IndexWriter indexWriter) { + this.directory = directory; + this.indexWriter = indexWriter; + this.logger = Loggers.getLogger(MetaDataIndexWriter.class, directory.toString()); + } + + void deleteAll() throws IOException { + this.logger.trace("clearing existing metadata"); + this.indexWriter.deleteAll(); + } + + void updateIndexMetaDataDocument(Document indexMetaDataDocument, Index index) throws IOException { + this.logger.trace("updating metadata for [{}]", index); + indexWriter.updateDocument(new Term(INDEX_UUID_FIELD_NAME, index.getUUID()), indexMetaDataDocument); + } + + void updateGlobalMetaData(Document globalMetaDataDocument) throws IOException { + this.logger.trace("updating global metadata doc"); + indexWriter.updateDocument(new Term(TYPE_FIELD_NAME, GLOBAL_TYPE_NAME), globalMetaDataDocument); + } + + void deleteIndexMetaData(String indexUUID) throws IOException { + this.logger.trace("removing metadata for [{}]", indexUUID); + indexWriter.deleteDocuments(new Term(INDEX_UUID_FIELD_NAME, indexUUID)); + } + + void flush() throws IOException { + this.logger.trace("flushing"); + this.indexWriter.flush(); + } + + void commit(String nodeId, long currentTerm, long lastAcceptedVersion) throws IOException { + final Map commitData = new HashMap<>(COMMIT_DATA_SIZE); + commitData.put(CURRENT_TERM_KEY, Long.toString(currentTerm)); + commitData.put(LAST_ACCEPTED_VERSION_KEY, Long.toString(lastAcceptedVersion)); + commitData.put(NODE_VERSION_KEY, Integer.toString(Version.CURRENT.id)); + commitData.put(NODE_ID_KEY, nodeId); + indexWriter.setLiveCommitData(commitData.entrySet()); + indexWriter.commit(); + } + + @Override + public void close() throws IOException { + IOUtils.close(indexWriter, directory); + } + } + + /** + * Encapsulates the incremental writing of metadata to a collection of {@link MetaDataIndexWriter}s. + */ + private static class LucenePersistedState implements CoordinationState.PersistedState, Closeable { + + private long currentTerm; + private ClusterState lastAcceptedState; + private final List metaDataIndexWriters; + private final String nodeId; + private final BigArrays bigArrays; + + LucenePersistedState(String nodeId, List metaDataIndexWriters, long currentTerm, + ClusterState lastAcceptedState, BigArrays bigArrays) { + this.currentTerm = currentTerm; + this.lastAcceptedState = lastAcceptedState; + this.metaDataIndexWriters = metaDataIndexWriters; + this.nodeId = nodeId; + this.bigArrays = bigArrays; + } + + @Override + public long getCurrentTerm() { + return currentTerm; + } + + @Override + public ClusterState getLastAcceptedState() { + return lastAcceptedState; + } + + void persistInitialState() throws IOException { + // Write the whole state out to be sure it's fresh and using the latest format. Called during initialisation, so that + // (1) throwing an IOException is enough to halt the node, and + // (2) the index is currently empty since it was opened with IndexWriterConfig.OpenMode.CREATE + + // In the common case it's actually sufficient to commit() the existing state and not do any indexing. For instance, this is + // true if there's only one data path on this master node, and the commit we just loaded was already written out by this + // version of Elasticsearch. TODO TBD should we avoid indexing when possible? + addMetaData(lastAcceptedState); + commit(currentTerm, lastAcceptedState.getVersion()); + } + + @Override + public void setCurrentTerm(long currentTerm) { + commit(currentTerm, lastAcceptedState.version()); + this.currentTerm = currentTerm; + } + + @Override + public void setLastAcceptedState(ClusterState clusterState) { + try { + if (clusterState.term() != lastAcceptedState.term()) { + assert clusterState.term() > lastAcceptedState.term() : clusterState.term() + " vs " + lastAcceptedState.term(); + // In a new currentTerm, we cannot compare the persisted metadata's lastAcceptedVersion to those in the new state, so + // it's simplest to write everything again. + overwriteMetaData(clusterState); + } else { + // Within the same currentTerm, we _can_ use metadata versions to skip unnecessary writing. + updateMetaData(clusterState); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + + commit(currentTerm, clusterState.version()); + lastAcceptedState = clusterState; + } + + /** + * Update the persisted metadata to match the given cluster state by removing any stale or unnecessary documents and adding any + * updated documents. + */ + private void updateMetaData(ClusterState clusterState) throws IOException { + assert lastAcceptedState.term() == clusterState.term(); + logger.trace("currentTerm [{}] matches previous currentTerm, writing changes only", clusterState.term()); + + try (ReleasableDocument globalMetaDataDocument = makeGlobalMetaDataDocument(clusterState)) { + for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) { + metaDataIndexWriter.updateGlobalMetaData(globalMetaDataDocument.getDocument()); + } + } + + final Map indexMetaDataVersionByUUID = new HashMap<>(lastAcceptedState.metaData().indices().size()); + for (ObjectCursor cursor : lastAcceptedState.metaData().indices().values()) { + final IndexMetaData indexMetaData = cursor.value; + final Long previousValue = indexMetaDataVersionByUUID.putIfAbsent(indexMetaData.getIndexUUID(), indexMetaData.getVersion()); + assert previousValue == null : indexMetaData.getIndexUUID() + " already mapped to " + previousValue; + } + + for (ObjectCursor cursor : clusterState.metaData().indices().values()) { + final IndexMetaData indexMetaData = cursor.value; + final Long previousVersion = indexMetaDataVersionByUUID.get(indexMetaData.getIndexUUID()); + if (previousVersion == null || indexMetaData.getVersion() != previousVersion) { + logger.trace("updating metadata for [{}], changing version from [{}] to [{}]", + indexMetaData.getIndex(), previousVersion, indexMetaData.getVersion()); + try (ReleasableDocument indexMetaDataDocument = makeIndexMetaDataDocument(indexMetaData)) { + for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) { + metaDataIndexWriter.updateIndexMetaDataDocument(indexMetaDataDocument.getDocument(), indexMetaData.getIndex()); + } + } + } else { + logger.trace("no action required for [{}]", indexMetaData.getIndex()); + } + indexMetaDataVersionByUUID.remove(indexMetaData.getIndexUUID()); + } + + for (String removedIndexUUID : indexMetaDataVersionByUUID.keySet()) { + for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) { + metaDataIndexWriter.deleteIndexMetaData(removedIndexUUID); + } + } + + // Flush, to try and expose a failure (e.g. out of disk space) before committing, because we can handle a failure here more + // gracefully than one that occurs during the commit process. + for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) { + metaDataIndexWriter.flush(); + } + } + + /** + * Update the persisted metadata to match the given cluster state by removing all existing documents and then adding new documents. + */ + private void overwriteMetaData(ClusterState clusterState) throws IOException { + for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) { + metaDataIndexWriter.deleteAll(); + } + addMetaData(clusterState); + } + + /** + * Add documents for the metadata of the given cluster state, assuming that there are currently no documents. + */ + private void addMetaData(ClusterState clusterState) throws IOException { + try (ReleasableDocument globalMetaDataDocument = makeGlobalMetaDataDocument(clusterState)) { + for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) { + metaDataIndexWriter.updateGlobalMetaData(globalMetaDataDocument.getDocument()); + } + } + + for (ObjectCursor cursor : clusterState.metaData().indices().values()) { + final IndexMetaData indexMetaData = cursor.value; + try (ReleasableDocument indexMetaDataDocument = makeIndexMetaDataDocument(indexMetaData)) { + for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) { + metaDataIndexWriter.updateIndexMetaDataDocument(indexMetaDataDocument.getDocument(), indexMetaData.getIndex()); + } + } + } + + // Flush, to try and expose a failure (e.g. out of disk space) before committing, because we can handle a failure here more + // gracefully than one that occurs during the commit process. + for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) { + metaDataIndexWriter.flush(); + } + } + + private void commit(long currentTerm, long lastAcceptedVersion) { + try { + for (MetaDataIndexWriter metaDataIndexWriter : metaDataIndexWriters) { + metaDataIndexWriter.commit(nodeId, currentTerm, lastAcceptedVersion); + } + } catch (IOException e) { + // The commit() call has similar semantics to a fsync(): although it's atomic, if it fails then we've no idea whether the + // data on disk is now the old version or the new version, and this is a disaster. It's safest to fail the whole node and + // retry from the beginning. + throw new IOError(e); + } + } + + @Override + public void close() throws IOException { + logger.trace("closing"); + IOUtils.close(metaDataIndexWriters); + } + + private ReleasableDocument makeIndexMetaDataDocument(IndexMetaData indexMetaData) throws IOException { + final ReleasableDocument indexMetaDataDocument = makeDocument(INDEX_TYPE_NAME, indexMetaData); + boolean success = false; + try { + final String indexUUID = indexMetaData.getIndexUUID(); + assert indexUUID.equals(IndexMetaData.INDEX_UUID_NA_VALUE) == false; + indexMetaDataDocument.getDocument().add(new StringField(INDEX_UUID_FIELD_NAME, indexUUID, Field.Store.NO)); + success = true; + return indexMetaDataDocument; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(indexMetaDataDocument); + } + } + } + + private ReleasableDocument makeGlobalMetaDataDocument(ClusterState clusterState) throws IOException { + return makeDocument(GLOBAL_TYPE_NAME, clusterState.metaData()); + } + + private ReleasableDocument makeDocument(String typeName, ToXContent metaData) throws IOException { + final Document document = new Document(); + document.add(new StringField(TYPE_FIELD_NAME, typeName, Field.Store.NO)); + + boolean success = false; + final ReleasableBytesStreamOutput releasableBytesStreamOutput = new ReleasableBytesStreamOutput(bigArrays); + try { + final FilterOutputStream outputStream = new FilterOutputStream(releasableBytesStreamOutput) { + @Override + public void close() { + // closing the XContentBuilder should not release the bytes yet + } + }; + try (XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.SMILE, outputStream)) { + xContentBuilder.startObject(); + metaData.toXContent(xContentBuilder, FORMAT_PARAMS); + xContentBuilder.endObject(); + } + document.add(new StoredField(DATA_FIELD_NAME, releasableBytesStreamOutput.bytes().toBytesRef())); + final ReleasableDocument releasableDocument = new ReleasableDocument(document, releasableBytesStreamOutput); + success = true; + return releasableDocument; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(releasableBytesStreamOutput); + } + } + } + } +} diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index e4c1e7766af40..912fbf604109e 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -93,6 +93,7 @@ import org.elasticsearch.gateway.GatewayMetaState; import org.elasticsearch.gateway.GatewayModule; import org.elasticsearch.gateway.GatewayService; +import org.elasticsearch.gateway.LucenePersistedStateFactory; import org.elasticsearch.gateway.MetaStateService; import org.elasticsearch.http.HttpServerTransport; import org.elasticsearch.index.IndexSettings; @@ -405,6 +406,8 @@ protected Node( ClusterModule.getNamedXWriteables().stream()) .flatMap(Function.identity()).collect(toList())); final MetaStateService metaStateService = new MetaStateService(nodeEnvironment, xContentRegistry); + final LucenePersistedStateFactory lucenePersistedStateFactory + = new LucenePersistedStateFactory(nodeEnvironment, xContentRegistry, bigArrays); // collect engine factory providers from server and from plugins final Collection enginePlugins = pluginsService.filterPlugins(EnginePlugin.class); @@ -542,6 +545,7 @@ protected Node( b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry); b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader); b.bind(MetaStateService.class).toInstance(metaStateService); + b.bind(LucenePersistedStateFactory.class).toInstance(lucenePersistedStateFactory); b.bind(IndicesService.class).toInstance(indicesService); b.bind(AliasValidator.class).toInstance(aliasValidator); b.bind(MetaDataCreateIndexService.class).toInstance(metaDataCreateIndexService); @@ -688,7 +692,8 @@ public Node start() throws NodeValidationException { // Load (and maybe upgrade) the metadata stored on disk final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class); gatewayMetaState.start(settings(), transportService, clusterService, injector.getInstance(MetaStateService.class), - injector.getInstance(MetaDataIndexUpgradeService.class), injector.getInstance(MetaDataUpgrader.class)); + injector.getInstance(MetaDataIndexUpgradeService.class), injector.getInstance(MetaDataUpgrader.class), + injector.getInstance(LucenePersistedStateFactory.class)); // we load the global state here (the persistent part of the cluster state stored on disk) to // pass it to the bootstrap checks to allow plugins to enforce certain preconditions based on the recovered state. final MetaData onDiskMetadata = gatewayMetaState.getPersistedState().getLastAcceptedState().metaData(); @@ -845,6 +850,8 @@ public synchronized void close() throws IOException { toClose.add(injector.getInstance(SearchService.class)); toClose.add(() -> stopWatch.stop().start("transport")); toClose.add(injector.getInstance(TransportService.class)); + toClose.add(() -> stopWatch.stop().start("gateway_meta_state")); + toClose.add(injector.getInstance(GatewayMetaState.class)); for (LifecycleComponent plugin : pluginLifecycleComponents) { toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")")); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java index e4560d0613ccd..5e1bc3b9473d0 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/CoordinatorTests.java @@ -986,6 +986,7 @@ public void testClusterCannotFormWithFailingJoinValidation() { } } + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/48701") // TODO implement cluster detaching public void testCannotJoinClusterWithDifferentUUID() throws IllegalAccessException { try (Cluster cluster1 = new Cluster(randomIntBetween(1, 3))) { cluster1.runRandomly(); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapAndDetachCommandIT.java b/server/src/test/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapAndDetachCommandIT.java index 55fc91b943640..b772e290a6ead 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapAndDetachCommandIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/UnsafeBootstrapAndDetachCommandIT.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.coordination; import joptsimple.OptionSet; +import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; import org.elasticsearch.cli.MockTerminal; @@ -51,6 +52,7 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/48701") // TODO unsafe bootstrapping and cluster detach @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, autoManageMasterNodes = false) public class UnsafeBootstrapAndDetachCommandIT extends ESIntegTestCase { diff --git a/server/src/test/java/org/elasticsearch/env/NodeRepurposeCommandIT.java b/server/src/test/java/org/elasticsearch/env/NodeRepurposeCommandIT.java index 7520a214f9e20..1d864201c1d76 100644 --- a/server/src/test/java/org/elasticsearch/env/NodeRepurposeCommandIT.java +++ b/server/src/test/java/org/elasticsearch/env/NodeRepurposeCommandIT.java @@ -33,6 +33,7 @@ @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) public class NodeRepurposeCommandIT extends ESIntegTestCase { + @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/48701") // TODO node repurposing public void testRepurpose() throws Exception { final String indexName = "test-repurpose"; diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java b/server/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java index e9cbd912202a0..5524d3d8c4ff7 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayIndexStateIT.java @@ -29,7 +29,9 @@ import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.client.Client; import org.elasticsearch.client.Requests; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.coordination.CoordinationState; import org.elasticsearch.cluster.metadata.IndexGraveyard; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; @@ -39,7 +41,6 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.UnassignedInfo; -import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentFactory; @@ -371,14 +372,13 @@ public void testRecoverBrokenIndexMetadata() throws Exception { ClusterState state = client().admin().cluster().prepareState().get().getState(); final IndexMetaData metaData = state.getMetaData().index("test"); - final IndexMetaData brokenMeta = IndexMetaData.builder(metaData).settings(Settings.builder().put(metaData.getSettings()) + final IndexMetaData.Builder brokenMeta = IndexMetaData.builder(metaData).settings(Settings.builder().put(metaData.getSettings()) .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT.minimumIndexCompatibilityVersion().id) // this is invalid but should be archived .put("index.similarity.BM25.type", "boolean") // this one is not validated ahead of time and breaks allocation - .put("index.analysis.filter.myCollator.type", "icu_collation") - ).build(); - writeBrokenMeta(metaStateService -> metaStateService.writeIndexAndUpdateManifest("broken metadata", brokenMeta)); + .put("index.analysis.filter.myCollator.type", "icu_collation")); + restartNodesOnBrokenClusterState(ClusterState.builder(state).metaData(MetaData.builder(state.getMetaData()).put(brokenMeta))); // check that the cluster does not keep reallocating shards assertBusy(() -> { @@ -443,9 +443,9 @@ public void testRecoverMissingAnalyzer() throws Exception { ClusterState state = client().admin().cluster().prepareState().get().getState(); final IndexMetaData metaData = state.getMetaData().index("test"); - final IndexMetaData brokenMeta = IndexMetaData.builder(metaData).settings(metaData.getSettings() - .filter((s) -> "index.analysis.analyzer.test.tokenizer".equals(s) == false)).build(); - writeBrokenMeta(metaStateService -> metaStateService.writeIndexAndUpdateManifest("broken metadata", brokenMeta)); + final IndexMetaData.Builder brokenMeta = IndexMetaData.builder(metaData).settings(metaData.getSettings() + .filter((s) -> "index.analysis.analyzer.test.tokenizer".equals(s) == false)); + restartNodesOnBrokenClusterState(ClusterState.builder(state).metaData(MetaData.builder(state.getMetaData()).put(brokenMeta))); // check that the cluster does not keep reallocating shards assertBusy(() -> { @@ -490,7 +490,7 @@ public void testArchiveBrokenClusterSettings() throws Exception { final MetaData brokenMeta = MetaData.builder(metaData).persistentSettings(Settings.builder() .put(metaData.persistentSettings()).put("this.is.unknown", true) .put(MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey(), "broken").build()).build(); - writeBrokenMeta(metaStateService -> metaStateService.writeGlobalStateAndUpdateManifest("broken metadata", brokenMeta)); + restartNodesOnBrokenClusterState(ClusterState.builder(state).metaData(brokenMeta)); ensureYellow("test"); // wait for state recovery state = client().admin().cluster().prepareState().get().getState(); @@ -508,14 +508,19 @@ public void testArchiveBrokenClusterSettings() throws Exception { assertHitCount(client().prepareSearch().setQuery(matchAllQuery()).get(), 1L); } - private void writeBrokenMeta(CheckedConsumer writer) throws Exception { - Map metaStateServices = Stream.of(internalCluster().getNodeNames()) - .collect(Collectors.toMap(Function.identity(), nodeName -> internalCluster().getInstance(MetaStateService.class, nodeName))); + private void restartNodesOnBrokenClusterState(ClusterState.Builder clusterStateBuilder) throws Exception { + Map lucenePersistedStateFactories = Stream.of(internalCluster().getNodeNames()) + .collect(Collectors.toMap(Function.identity(), + nodeName -> internalCluster().getInstance(LucenePersistedStateFactory.class, nodeName))); + final ClusterState clusterState = clusterStateBuilder.build(); internalCluster().fullRestart(new RestartCallback(){ @Override public Settings onNodeStopped(String nodeName) throws Exception { - final MetaStateService metaStateService = metaStateServices.get(nodeName); - writer.accept(metaStateService); + final LucenePersistedStateFactory lucenePersistedStateFactory = lucenePersistedStateFactories.get(nodeName); + try (CoordinationState.PersistedState persistedState = lucenePersistedStateFactory.loadPersistedState( + (v, m) -> ClusterState.builder(ClusterName.DEFAULT).version(v).metaData(m).build())) { + persistedState.setLastAcceptedState(clusterState); + } return super.onNodeStopped(nodeName); } }); diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java index e723d08d7352c..ddac6bb98a5be 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -34,9 +34,11 @@ import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.test.ESTestCase; +import java.io.IOException; import java.util.Collections; import static org.hamcrest.Matchers.equalTo; @@ -53,7 +55,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { public void setUp() throws Exception { nodeEnvironment = newNodeEnvironment(); localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Collections.emptyMap(), - Sets.newHashSet(DiscoveryNodeRole.MASTER_ROLE), Version.CURRENT); + Sets.newHashSet(DiscoveryNodeRole.MASTER_ROLE), Version.CURRENT); clusterName = new ClusterName(randomAlphaOfLength(10)); settings = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), clusterName.value()).build(); super.setUp(); @@ -73,53 +75,64 @@ private CoordinationState.PersistedState newGatewayPersistedState() { return persistedState; } - private CoordinationState.PersistedState maybeNew(CoordinationState.PersistedState persistedState) { + private CoordinationState.PersistedState maybeNew(CoordinationState.PersistedState persistedState) throws IOException { if (randomBoolean()) { + persistedState.close(); return newGatewayPersistedState(); } return persistedState; } - public void testInitialState() { - CoordinationState.PersistedState gateway = newGatewayPersistedState(); - ClusterState state = gateway.getLastAcceptedState(); - assertThat(state.getClusterName(), equalTo(clusterName)); - assertTrue(MetaData.isGlobalStateEquals(state.metaData(), MetaData.EMPTY_META_DATA)); - assertThat(state.getVersion(), equalTo(Manifest.empty().getClusterStateVersion())); - assertThat(state.getNodes().getLocalNode(), equalTo(localNode)); - - long currentTerm = gateway.getCurrentTerm(); - assertThat(currentTerm, equalTo(Manifest.empty().getCurrentTerm())); + public void testInitialState() throws IOException { + CoordinationState.PersistedState gateway = null; + try { + gateway = newGatewayPersistedState(); + ClusterState state = gateway.getLastAcceptedState(); + assertThat(state.getClusterName(), equalTo(clusterName)); + assertTrue(MetaData.isGlobalStateEquals(state.metaData(), MetaData.EMPTY_META_DATA)); + assertThat(state.getVersion(), equalTo(Manifest.empty().getClusterStateVersion())); + assertThat(state.getNodes().getLocalNode(), equalTo(localNode)); + + long currentTerm = gateway.getCurrentTerm(); + assertThat(currentTerm, equalTo(Manifest.empty().getCurrentTerm())); + } finally { + IOUtils.close(gateway); + } } - public void testSetCurrentTerm() { - CoordinationState.PersistedState gateway = newGatewayPersistedState(); - - for (int i = 0; i < randomIntBetween(1, 5); i++) { - final long currentTerm = randomNonNegativeLong(); - gateway.setCurrentTerm(currentTerm); - gateway = maybeNew(gateway); - assertThat(gateway.getCurrentTerm(), equalTo(currentTerm)); + public void testSetCurrentTerm() throws IOException { + CoordinationState.PersistedState gateway = null; + try { + gateway = newGatewayPersistedState(); + + for (int i = 0; i < randomIntBetween(1, 5); i++) { + final long currentTerm = randomNonNegativeLong(); + gateway.setCurrentTerm(currentTerm); + gateway = maybeNew(gateway); + assertThat(gateway.getCurrentTerm(), equalTo(currentTerm)); + } + } finally { + IOUtils.close(gateway); } } private ClusterState createClusterState(long version, MetaData metaData) { return ClusterState.builder(clusterName). - nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).build()). - version(version). - metaData(metaData). - build(); + nodes(DiscoveryNodes.builder().add(localNode).localNodeId(localNode.getId()).build()). + version(version). + metaData(metaData). + build(); } private CoordinationMetaData createCoordinationMetaData(long term) { CoordinationMetaData.Builder builder = CoordinationMetaData.builder(); builder.term(term); builder.lastAcceptedConfiguration( - new CoordinationMetaData.VotingConfiguration( - Sets.newHashSet(generateRandomStringArray(10, 10, false)))); + new CoordinationMetaData.VotingConfiguration( + Sets.newHashSet(generateRandomStringArray(10, 10, false)))); builder.lastCommittedConfiguration( - new CoordinationMetaData.VotingConfiguration( - Sets.newHashSet(generateRandomStringArray(10, 10, false)))); + new CoordinationMetaData.VotingConfiguration( + Sets.newHashSet(generateRandomStringArray(10, 10, false)))); for (int i = 0; i < randomIntBetween(0, 5); i++) { builder.addVotingConfigExclusion(new VotingConfigExclusion(randomAlphaOfLength(10), randomAlphaOfLength(10))); } @@ -129,12 +142,12 @@ private CoordinationMetaData createCoordinationMetaData(long term) { private IndexMetaData createIndexMetaData(String indexName, int numberOfShards, long version) { return IndexMetaData.builder(indexName).settings( - Settings.builder() - .put(IndexMetaData.SETTING_INDEX_UUID, indexName) - .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numberOfShards) - .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) - .build() + Settings.builder() + .put(IndexMetaData.SETTING_INDEX_UUID, indexName) + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, numberOfShards) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) + .build() ).version(version).build(); } @@ -146,98 +159,119 @@ private void assertClusterStateEqual(ClusterState expected, ClusterState actual) } } - public void testSetLastAcceptedState() { - CoordinationState.PersistedState gateway = newGatewayPersistedState(); - final long term = randomNonNegativeLong(); - - for (int i = 0; i < randomIntBetween(1, 5); i++) { - final long version = randomNonNegativeLong(); - final String indexName = randomAlphaOfLength(10); - final IndexMetaData indexMetaData = createIndexMetaData(indexName, randomIntBetween(1,5), randomNonNegativeLong()); - final MetaData metaData = MetaData.builder(). + public void testSetLastAcceptedState() throws IOException { + CoordinationState.PersistedState gateway = null; + try { + gateway = newGatewayPersistedState(); + final long term = randomNonNegativeLong(); + + for (int i = 0; i < randomIntBetween(1, 5); i++) { + final long version = randomNonNegativeLong(); + final String indexName = randomAlphaOfLength(10); + final IndexMetaData indexMetaData = createIndexMetaData(indexName, randomIntBetween(1, 5), randomNonNegativeLong()); + final MetaData metaData = MetaData.builder(). persistentSettings(Settings.builder().put(randomAlphaOfLength(10), randomAlphaOfLength(10)).build()). coordinationMetaData(createCoordinationMetaData(term)). put(indexMetaData, false). build(); - ClusterState state = createClusterState(version, metaData); + ClusterState state = createClusterState(version, metaData); - gateway.setLastAcceptedState(state); - gateway = maybeNew(gateway); + gateway.setLastAcceptedState(state); + gateway = maybeNew(gateway); - ClusterState lastAcceptedState = gateway.getLastAcceptedState(); - assertClusterStateEqual(state, lastAcceptedState); + ClusterState lastAcceptedState = gateway.getLastAcceptedState(); + assertClusterStateEqual(state, lastAcceptedState); + } + } finally { + IOUtils.close(gateway); } } - public void testSetLastAcceptedStateTermChanged() { - CoordinationState.PersistedState gateway = newGatewayPersistedState(); + public void testSetLastAcceptedStateTermChanged() throws IOException { + CoordinationState.PersistedState gateway = null; + try { + gateway = newGatewayPersistedState(); - final String indexName = randomAlphaOfLength(10); - final int numberOfShards = randomIntBetween(1, 5); - final long version = randomNonNegativeLong(); - final long term = randomNonNegativeLong(); - final IndexMetaData indexMetaData = createIndexMetaData(indexName, numberOfShards, version); - final ClusterState state = createClusterState(randomNonNegativeLong(), + final String indexName = randomAlphaOfLength(10); + final int numberOfShards = randomIntBetween(1, 5); + final long version = randomNonNegativeLong(); + final long term = randomValueOtherThan(Long.MAX_VALUE, ESTestCase::randomNonNegativeLong); + final IndexMetaData indexMetaData = createIndexMetaData(indexName, numberOfShards, version); + final ClusterState state = createClusterState(randomNonNegativeLong(), MetaData.builder().coordinationMetaData(createCoordinationMetaData(term)).put(indexMetaData, false).build()); - gateway.setLastAcceptedState(state); + gateway.setLastAcceptedState(state); - gateway = maybeNew(gateway); - final long newTerm = randomValueOtherThan(term, ESTestCase::randomNonNegativeLong); - final int newNumberOfShards = randomValueOtherThan(numberOfShards, () -> randomIntBetween(1,5)); - final IndexMetaData newIndexMetaData = createIndexMetaData(indexName, newNumberOfShards, version); - final ClusterState newClusterState = createClusterState(randomNonNegativeLong(), + gateway = maybeNew(gateway); + final long newTerm = randomLongBetween(term + 1, Long.MAX_VALUE); + final int newNumberOfShards = randomValueOtherThan(numberOfShards, () -> randomIntBetween(1, 5)); + final IndexMetaData newIndexMetaData = createIndexMetaData(indexName, newNumberOfShards, version); + final ClusterState newClusterState = createClusterState(randomNonNegativeLong(), MetaData.builder().coordinationMetaData(createCoordinationMetaData(newTerm)).put(newIndexMetaData, false).build()); - gateway.setLastAcceptedState(newClusterState); + gateway.setLastAcceptedState(newClusterState); - gateway = maybeNew(gateway); - assertThat(gateway.getLastAcceptedState().metaData().index(indexName), equalTo(newIndexMetaData)); + gateway = maybeNew(gateway); + assertThat(gateway.getLastAcceptedState().metaData().index(indexName), equalTo(newIndexMetaData)); + } finally { + IOUtils.close(gateway); + } } - public void testCurrentTermAndTermAreDifferent() { - CoordinationState.PersistedState gateway = newGatewayPersistedState(); + public void testCurrentTermAndTermAreDifferent() throws IOException { + CoordinationState.PersistedState gateway = null; + try { + gateway = newGatewayPersistedState(); - long currentTerm = randomNonNegativeLong(); - long term = randomValueOtherThan(currentTerm, ESTestCase::randomNonNegativeLong); + long currentTerm = randomNonNegativeLong(); + long term = randomValueOtherThan(currentTerm, ESTestCase::randomNonNegativeLong); - gateway.setCurrentTerm(currentTerm); - gateway.setLastAcceptedState(createClusterState(randomNonNegativeLong(), + gateway.setCurrentTerm(currentTerm); + gateway.setLastAcceptedState(createClusterState(randomNonNegativeLong(), MetaData.builder().coordinationMetaData(CoordinationMetaData.builder().term(term).build()).build())); - gateway = maybeNew(gateway); - assertThat(gateway.getCurrentTerm(), equalTo(currentTerm)); - assertThat(gateway.getLastAcceptedState().coordinationMetaData().term(), equalTo(term)); + gateway = maybeNew(gateway); + assertThat(gateway.getCurrentTerm(), equalTo(currentTerm)); + assertThat(gateway.getLastAcceptedState().coordinationMetaData().term(), equalTo(term)); + } finally { + IOUtils.close(gateway); + } } - public void testMarkAcceptedConfigAsCommitted() { - CoordinationState.PersistedState gateway = newGatewayPersistedState(); + public void testMarkAcceptedConfigAsCommitted() throws IOException { + CoordinationState.PersistedState gateway = null; + try { + gateway = newGatewayPersistedState(); - //generate random coordinationMetaData with different lastAcceptedConfiguration and lastCommittedConfiguration - CoordinationMetaData coordinationMetaData; - do { - coordinationMetaData = createCoordinationMetaData(randomNonNegativeLong()); - } while (coordinationMetaData.getLastAcceptedConfiguration().equals(coordinationMetaData.getLastCommittedConfiguration())); + //generate random coordinationMetaData with different lastAcceptedConfiguration and lastCommittedConfiguration + CoordinationMetaData coordinationMetaData; + do { + coordinationMetaData = createCoordinationMetaData(randomNonNegativeLong()); + } while (coordinationMetaData.getLastAcceptedConfiguration().equals(coordinationMetaData.getLastCommittedConfiguration())); - ClusterState state = createClusterState(randomNonNegativeLong(), + ClusterState state = createClusterState(randomNonNegativeLong(), MetaData.builder().coordinationMetaData(coordinationMetaData) .clusterUUID(randomAlphaOfLength(10)).build()); - gateway.setLastAcceptedState(state); + gateway.setLastAcceptedState(state); - gateway = maybeNew(gateway); - assertThat(gateway.getLastAcceptedState().getLastAcceptedConfiguration(), + gateway = maybeNew(gateway); + assertThat(gateway.getLastAcceptedState().getLastAcceptedConfiguration(), not(equalTo(gateway.getLastAcceptedState().getLastCommittedConfiguration()))); - gateway.markLastAcceptedStateAsCommitted(); + gateway.markLastAcceptedStateAsCommitted(); - CoordinationMetaData expectedCoordinationMetaData = CoordinationMetaData.builder(coordinationMetaData) + CoordinationMetaData expectedCoordinationMetaData = CoordinationMetaData.builder(coordinationMetaData) .lastCommittedConfiguration(coordinationMetaData.getLastAcceptedConfiguration()).build(); - ClusterState expectedClusterState = + ClusterState expectedClusterState = ClusterState.builder(state).metaData(MetaData.builder().coordinationMetaData(expectedCoordinationMetaData) .clusterUUID(state.metaData().clusterUUID()).clusterUUIDCommitted(true).build()).build(); - gateway = maybeNew(gateway); - assertClusterStateEqual(expectedClusterState, gateway.getLastAcceptedState()); - gateway.markLastAcceptedStateAsCommitted(); + gateway = maybeNew(gateway); + assertClusterStateEqual(expectedClusterState, gateway.getLastAcceptedState()); + gateway.markLastAcceptedStateAsCommitted(); - gateway = maybeNew(gateway); - assertClusterStateEqual(expectedClusterState, gateway.getLastAcceptedState()); + gateway = maybeNew(gateway); + assertClusterStateEqual(expectedClusterState, gateway.getLastAcceptedState()); + } finally { + IOUtils.close(gateway); + } } + } diff --git a/server/src/test/java/org/elasticsearch/gateway/LucenePersistedStateFactoryTests.java b/server/src/test/java/org/elasticsearch/gateway/LucenePersistedStateFactoryTests.java new file mode 100644 index 0000000000000..677c552143af2 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/gateway/LucenePersistedStateFactoryTests.java @@ -0,0 +1,745 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.gateway; + +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.Term; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.SimpleFSDirectory; +import org.elasticsearch.Version; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.coordination.CoordinationMetaData; +import org.elasticsearch.cluster.coordination.CoordinationState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; +import org.elasticsearch.core.internal.io.IOUtils; +import org.elasticsearch.env.Environment; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.index.Index; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOError; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; + +public class LucenePersistedStateFactoryTests extends ESTestCase { + + private LucenePersistedStateFactory newPersistedStateFactory(NodeEnvironment nodeEnvironment) { + return new LucenePersistedStateFactory(nodeEnvironment, xContentRegistry(), + usually() + ? BigArrays.NON_RECYCLING_INSTANCE + : new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService())); + } + + public void testPersistsAndReloadsTerm() throws IOException { + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { + final LucenePersistedStateFactory persistedStateFactory = newPersistedStateFactory(nodeEnvironment); + final long newTerm = randomNonNegativeLong(); + + try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { + assertThat(persistedState.getCurrentTerm(), equalTo(0L)); + persistedState.setCurrentTerm(newTerm); + assertThat(persistedState.getCurrentTerm(), equalTo(newTerm)); + } + + try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { + assertThat(persistedState.getCurrentTerm(), equalTo(newTerm)); + } + } + } + + public void testPersistsAndReloadsGlobalMetadata() throws IOException { + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { + final LucenePersistedStateFactory persistedStateFactory = newPersistedStateFactory(nodeEnvironment); + final String clusterUUID = UUIDs.randomBase64UUID(random()); + final long version = randomLongBetween(1L, Long.MAX_VALUE); + + try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { + final ClusterState clusterState = persistedState.getLastAcceptedState(); + persistedState.setLastAcceptedState(ClusterState.builder(clusterState) + .metaData(MetaData.builder(clusterState.metaData()) + .clusterUUID(clusterUUID) + .clusterUUIDCommitted(true) + .version(version)) + .incrementVersion().build()); + assertThat(persistedState.getLastAcceptedState().metaData().clusterUUID(), equalTo(clusterUUID)); + assertTrue(persistedState.getLastAcceptedState().metaData().clusterUUIDCommitted()); + } + + try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { + final ClusterState clusterState = persistedState.getLastAcceptedState(); + assertThat(clusterState.metaData().clusterUUID(), equalTo(clusterUUID)); + assertTrue(clusterState.metaData().clusterUUIDCommitted()); + assertThat(clusterState.metaData().version(), equalTo(version)); + + persistedState.setLastAcceptedState(ClusterState.builder(clusterState) + .metaData(MetaData.builder(clusterState.metaData()) + .clusterUUID(clusterUUID) + .clusterUUIDCommitted(true) + .version(version + 1)) + .incrementVersion().build()); + } + + try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { + final ClusterState clusterState = persistedState.getLastAcceptedState(); + assertThat(clusterState.metaData().clusterUUID(), equalTo(clusterUUID)); + assertTrue(clusterState.metaData().clusterUUIDCommitted()); + assertThat(clusterState.metaData().version(), equalTo(version + 1)); + } + } + } + + public void testLoadsFreshestState() throws IOException { + final Path[] dataPaths = createDataPaths(); + final long freshTerm = randomLongBetween(1L, Long.MAX_VALUE); + final long staleTerm = randomBoolean() ? freshTerm : randomLongBetween(1L, freshTerm); + final long freshVersion = randomLongBetween(2L, Long.MAX_VALUE); + final long staleVersion = staleTerm == freshTerm ? randomLongBetween(1L, freshVersion - 1) : randomLongBetween(1L, Long.MAX_VALUE); + + final HashSet unimportantPaths = Arrays.stream(dataPaths).collect(Collectors.toCollection(HashSet::new)); + + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(dataPaths)) { + try (CoordinationState.PersistedState persistedState + = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { + final ClusterState clusterState = persistedState.getLastAcceptedState(); + persistedState.setCurrentTerm(randomLongBetween(1L, Long.MAX_VALUE)); + persistedState.setLastAcceptedState( + ClusterState.builder(clusterState).version(staleVersion) + .metaData(MetaData.builder(clusterState.metaData()).coordinationMetaData( + CoordinationMetaData.builder(clusterState.coordinationMetaData()).term(staleTerm).build())).build()); + } + } + + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(new Path[]{randomFrom(dataPaths)})) { + unimportantPaths.remove(nodeEnvironment.nodeDataPaths()[0]); + try (CoordinationState.PersistedState persistedState + = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { + final ClusterState clusterState = persistedState.getLastAcceptedState(); + persistedState.setLastAcceptedState( + ClusterState.builder(clusterState).version(freshVersion) + .metaData(MetaData.builder(clusterState.metaData()).coordinationMetaData( + CoordinationMetaData.builder(clusterState.coordinationMetaData()).term(freshTerm).build())).build()); + } + } + + if (randomBoolean() && unimportantPaths.isEmpty() == false) { + IOUtils.rm(randomFrom(unimportantPaths)); + } + + // verify that the freshest state is chosen + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(dataPaths)) { + try (CoordinationState.PersistedState persistedState + = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { + assertThat(persistedState.getLastAcceptedState().term(), equalTo(freshTerm)); + assertThat(persistedState.getLastAcceptedState().version(), equalTo(freshVersion)); + } + } + + // verify that the freshest state was rewritten to each data path + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(new Path[]{randomFrom(dataPaths)})) { + try (CoordinationState.PersistedState persistedState + = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { + assertThat(persistedState.getLastAcceptedState().term(), equalTo(freshTerm)); + assertThat(persistedState.getLastAcceptedState().version(), equalTo(freshVersion)); + } + } + } + + public void testFailsOnMismatchedNodeIds() throws IOException { + final Path[] dataPaths1 = createDataPaths(); + final Path[] dataPaths2 = createDataPaths(); + + final String[] nodeIds = new String[2]; + + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(dataPaths1)) { + nodeIds[0] = nodeEnvironment.nodeId(); + try (CoordinationState.PersistedState persistedState + = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { + persistedState.setLastAcceptedState( + ClusterState.builder(persistedState.getLastAcceptedState()).version(randomLongBetween(1L, Long.MAX_VALUE)).build()); + } + } + + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(dataPaths2)) { + nodeIds[1] = nodeEnvironment.nodeId(); + try (CoordinationState.PersistedState persistedState + = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { + persistedState.setLastAcceptedState( + ClusterState.builder(persistedState.getLastAcceptedState()).version(randomLongBetween(1L, Long.MAX_VALUE)).build()); + } + } + + for (Path dataPath : dataPaths2) { + IOUtils.rm(dataPath.resolve(MetaDataStateFormat.STATE_DIR_NAME)); + } + + final Path[] combinedPaths = Stream.concat(Arrays.stream(dataPaths1), Arrays.stream(dataPaths2)).toArray(Path[]::new); + + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(combinedPaths)) { + final String message = expectThrows(IllegalStateException.class, + () -> loadPersistedState(newPersistedStateFactory(nodeEnvironment))).getMessage(); + assertThat(message, + allOf(containsString("unexpected node ID in metadata"), containsString(nodeIds[0]), containsString(nodeIds[1]))); + assertTrue("[" + message + "] should match " + Arrays.toString(dataPaths2), + Arrays.stream(dataPaths2).anyMatch(p -> message.contains(p.toString()))); + } + } + + public void testFailsOnMismatchedCommittedClusterUUIDs() throws IOException { + final Path[] dataPaths1 = createDataPaths(); + final Path[] dataPaths2 = createDataPaths(); + final Path[] combinedPaths = Stream.concat(Arrays.stream(dataPaths1), Arrays.stream(dataPaths2)).toArray(Path[]::new); + + final String clusterUUID1 = UUIDs.randomBase64UUID(random()); + final String clusterUUID2 = UUIDs.randomBase64UUID(random()); + + // first establish consistent node IDs and write initial metadata + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(combinedPaths)) { + try (CoordinationState.PersistedState persistedState + = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { + assertFalse(persistedState.getLastAcceptedState().metaData().clusterUUIDCommitted()); + } + } + + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(dataPaths1)) { + try (CoordinationState.PersistedState persistedState + = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { + final ClusterState clusterState = persistedState.getLastAcceptedState(); + persistedState.setLastAcceptedState(ClusterState.builder(clusterState) + .metaData(MetaData.builder(clusterState.metaData()) + .clusterUUID(clusterUUID1) + .clusterUUIDCommitted(true) + .version(1)) + .incrementVersion().build()); + } + } + + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(dataPaths2)) { + try (CoordinationState.PersistedState persistedState + = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { + final ClusterState clusterState = persistedState.getLastAcceptedState(); + persistedState.setLastAcceptedState(ClusterState.builder(clusterState) + .metaData(MetaData.builder(clusterState.metaData()) + .clusterUUID(clusterUUID2) + .clusterUUIDCommitted(true) + .version(1)) + .incrementVersion().build()); + } + } + + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(combinedPaths)) { + final String message = expectThrows(IllegalStateException.class, + () -> loadPersistedState(newPersistedStateFactory(nodeEnvironment))).getMessage(); + assertThat(message, + allOf(containsString("mismatched cluster UUIDs in metadata"), containsString(clusterUUID1), containsString(clusterUUID2))); + assertTrue("[" + message + "] should match " + Arrays.toString(dataPaths1), + Arrays.stream(dataPaths1).anyMatch(p -> message.contains(p.toString()))); + assertTrue("[" + message + "] should match " + Arrays.toString(dataPaths2), + Arrays.stream(dataPaths2).anyMatch(p -> message.contains(p.toString()))); + } + } + + public void testFailsIfFreshestStateIsInStaleTerm() throws IOException { + final Path[] dataPaths1 = createDataPaths(); + final Path[] dataPaths2 = createDataPaths(); + final Path[] combinedPaths = Stream.concat(Arrays.stream(dataPaths1), Arrays.stream(dataPaths2)).toArray(Path[]::new); + + final long staleCurrentTerm = randomLongBetween(1L, Long.MAX_VALUE - 1); + final long freshCurrentTerm = randomLongBetween(staleCurrentTerm + 1, Long.MAX_VALUE); + + final long freshTerm = randomLongBetween(1L, Long.MAX_VALUE); + final long staleTerm = randomBoolean() ? freshTerm : randomLongBetween(1L, freshTerm); + final long freshVersion = randomLongBetween(2L, Long.MAX_VALUE); + final long staleVersion = staleTerm == freshTerm ? randomLongBetween(1L, freshVersion - 1) : randomLongBetween(1L, Long.MAX_VALUE); + + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(combinedPaths)) { + try (CoordinationState.PersistedState persistedState + = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { + final ClusterState clusterState = persistedState.getLastAcceptedState(); + persistedState.setCurrentTerm(staleCurrentTerm); + persistedState.setLastAcceptedState(ClusterState.builder(clusterState) + .metaData(MetaData.builder(clusterState.metaData()).version(1) + .coordinationMetaData(CoordinationMetaData.builder(clusterState.coordinationMetaData()).term(staleTerm).build())) + .version(staleVersion) + .build()); + } + } + + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(dataPaths1)) { + try (CoordinationState.PersistedState persistedState + = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { + persistedState.setCurrentTerm(freshCurrentTerm); + } + } + + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(dataPaths2)) { + try (CoordinationState.PersistedState persistedState + = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { + final ClusterState clusterState = persistedState.getLastAcceptedState(); + persistedState.setLastAcceptedState(ClusterState.builder(clusterState) + .metaData(MetaData.builder(clusterState.metaData()).version(2) + .coordinationMetaData(CoordinationMetaData.builder(clusterState.coordinationMetaData()).term(freshTerm).build())) + .version(freshVersion) + .build()); + } + } + + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(combinedPaths)) { + final String message = expectThrows(IllegalStateException.class, + () -> loadPersistedState(newPersistedStateFactory(nodeEnvironment))).getMessage(); + assertThat(message, allOf( + containsString("inconsistent terms found"), + containsString(Long.toString(staleCurrentTerm)), + containsString(Long.toString(freshCurrentTerm)))); + assertTrue("[" + message + "] should match " + Arrays.toString(dataPaths1), + Arrays.stream(dataPaths1).anyMatch(p -> message.contains(p.toString()))); + assertTrue("[" + message + "] should match " + Arrays.toString(dataPaths2), + Arrays.stream(dataPaths2).anyMatch(p -> message.contains(p.toString()))); + } + } + + public void testFailsGracefullyOnExceptionDuringFlush() throws IOException { + final AtomicBoolean throwException = new AtomicBoolean(); + + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { + final LucenePersistedStateFactory persistedStateFactory + = new LucenePersistedStateFactory(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE) { + @Override + Directory createDirectory(Path path) throws IOException { + return new FilterDirectory(super.createDirectory(path)) { + @Override + public IndexOutput createOutput(String name, IOContext context) throws IOException { + if (throwException.get()) { + throw new IOException("simulated"); + } + return super.createOutput(name, context); + } + }; + } + }; + final long newTerm = randomNonNegativeLong(); + + try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { + persistedState.setCurrentTerm(newTerm); + final ClusterState clusterState = persistedState.getLastAcceptedState(); + final ClusterState newState = ClusterState.builder(clusterState) + .metaData(MetaData.builder(clusterState.metaData()) + .clusterUUID(UUIDs.randomBase64UUID(random())) + .clusterUUIDCommitted(true) + .version(randomLongBetween(1L, Long.MAX_VALUE))) + .incrementVersion().build(); + throwException.set(true); + assertThat(expectThrows(UncheckedIOException.class, () -> persistedState.setLastAcceptedState(newState)).getMessage(), + containsString("simulated")); + } + } + } + + public void testThrowsIOErrorOnExceptionDuringCommit() throws IOException { + final AtomicBoolean throwException = new AtomicBoolean(); + + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { + final LucenePersistedStateFactory persistedStateFactory + = new LucenePersistedStateFactory(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE) { + @Override + Directory createDirectory(Path path) throws IOException { + return new FilterDirectory(super.createDirectory(path)) { + @Override + public void sync(Collection names) throws IOException { + if (throwException.get() && names.stream().anyMatch(n -> n.startsWith("pending_segments_"))) { + throw new IOException("simulated"); + } + } + }; + } + }; + final long newTerm = randomNonNegativeLong(); + + try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { + persistedState.setCurrentTerm(newTerm); + final ClusterState clusterState = persistedState.getLastAcceptedState(); + final ClusterState newState = ClusterState.builder(clusterState) + .metaData(MetaData.builder(clusterState.metaData()) + .clusterUUID(UUIDs.randomBase64UUID(random())) + .clusterUUIDCommitted(true) + .version(randomLongBetween(1L, Long.MAX_VALUE))) + .incrementVersion().build(); + throwException.set(true); + assertThat(expectThrows(IOError.class, () -> persistedState.setLastAcceptedState(newState)).getMessage(), + containsString("simulated")); + } + } + } + + public void testFailsIfGlobalMetadataIsMissing() throws IOException { + // if someone attempted surgery on the metadata index by hand, e.g. deleting broken segments, then maybe the global metadata + // isn't there any more + + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { + try (CoordinationState.PersistedState persistedState + = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { + persistedState.setLastAcceptedState( + ClusterState.builder(persistedState.getLastAcceptedState()).version(randomLongBetween(1L, Long.MAX_VALUE)).build()); + } + + final Path brokenPath = randomFrom(nodeEnvironment.nodeDataPaths()); + try (Directory directory = new SimpleFSDirectory(brokenPath.resolve(LucenePersistedStateFactory.METADATA_DIRECTORY_NAME))) { + final IndexWriterConfig indexWriterConfig = new IndexWriterConfig(); + indexWriterConfig.setOpenMode(IndexWriterConfig.OpenMode.CREATE); + try (IndexWriter indexWriter = new IndexWriter(directory, indexWriterConfig)) { + indexWriter.commit(); + } + } + + final String message = expectThrows(IllegalStateException.class, + () -> loadPersistedState(newPersistedStateFactory(nodeEnvironment))).getMessage(); + assertThat(message, allOf(containsString("no global metadata found"), containsString(brokenPath.toString()))); + } + } + + public void testFailsIfGlobalMetadataIsDuplicated() throws IOException { + // if someone attempted surgery on the metadata index by hand, e.g. deleting broken segments, then maybe the global metadata + // is duplicated + + final Path[] dataPaths1 = createDataPaths(); + final Path[] dataPaths2 = createDataPaths(); + final Path[] combinedPaths = Stream.concat(Arrays.stream(dataPaths1), Arrays.stream(dataPaths2)).toArray(Path[]::new); + + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(combinedPaths)) { + try (CoordinationState.PersistedState persistedState + = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { + persistedState.setLastAcceptedState( + ClusterState.builder(persistedState.getLastAcceptedState()).version(randomLongBetween(1L, Long.MAX_VALUE)).build()); + } + + final Path brokenPath = randomFrom(nodeEnvironment.nodeDataPaths()); + final Path dupPath = randomValueOtherThan(brokenPath, () -> randomFrom(nodeEnvironment.nodeDataPaths())); + try (Directory directory = new SimpleFSDirectory(brokenPath.resolve(LucenePersistedStateFactory.METADATA_DIRECTORY_NAME)); + Directory dupDirectory = new SimpleFSDirectory(dupPath.resolve(LucenePersistedStateFactory.METADATA_DIRECTORY_NAME))) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + indexWriter.addIndexes(dupDirectory); + indexWriter.commit(); + } + } + + final String message = expectThrows(IllegalStateException.class, + () -> loadPersistedState(newPersistedStateFactory(nodeEnvironment))).getMessage(); + assertThat(message, allOf(containsString("duplicate global metadata found"), containsString(brokenPath.toString()))); + } + } + + public void testFailsIfIndexMetadataIsDuplicated() throws IOException { + // if someone attempted surgery on the metadata index by hand, e.g. deleting broken segments, then maybe some index metadata + // is duplicated + + final Path[] dataPaths1 = createDataPaths(); + final Path[] dataPaths2 = createDataPaths(); + final Path[] combinedPaths = Stream.concat(Arrays.stream(dataPaths1), Arrays.stream(dataPaths2)).toArray(Path[]::new); + + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(combinedPaths)) { + final String indexUUID = UUIDs.randomBase64UUID(random()); + final String indexName = randomAlphaOfLength(10); + + try (CoordinationState.PersistedState persistedState + = loadPersistedState(newPersistedStateFactory(nodeEnvironment))) { + final ClusterState clusterState = persistedState.getLastAcceptedState(); + persistedState.setLastAcceptedState(ClusterState.builder(clusterState) + .metaData(MetaData.builder(clusterState.metaData()) + .version(1L) + .coordinationMetaData(CoordinationMetaData.builder(clusterState.coordinationMetaData()).term(1L).build()) + .put(IndexMetaData.builder(indexName) + .version(1L) + .settings(Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetaData.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) + .put(IndexMetaData.SETTING_INDEX_UUID, indexUUID)))) + .incrementVersion().build()); + } + + final Path brokenPath = randomFrom(nodeEnvironment.nodeDataPaths()); + final Path dupPath = randomValueOtherThan(brokenPath, () -> randomFrom(nodeEnvironment.nodeDataPaths())); + try (Directory directory = new SimpleFSDirectory(brokenPath.resolve(LucenePersistedStateFactory.METADATA_DIRECTORY_NAME)); + Directory dupDirectory = new SimpleFSDirectory(dupPath.resolve(LucenePersistedStateFactory.METADATA_DIRECTORY_NAME))) { + try (IndexWriter indexWriter = new IndexWriter(directory, new IndexWriterConfig())) { + indexWriter.deleteDocuments(new Term("type", "global")); // do not duplicate global metadata + indexWriter.addIndexes(dupDirectory); + indexWriter.commit(); + } + } + + final String message = expectThrows(IllegalStateException.class, + () -> loadPersistedState(newPersistedStateFactory(nodeEnvironment))).getMessage(); + assertThat(message, allOf( + containsString("duplicate metadata found"), + containsString(brokenPath.toString()), + containsString(indexName), + containsString(indexUUID))); + } + } + + public void testPersistsAndReloadsIndexMetadataIffVersionOrTermChanges() throws IOException { + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { + final LucenePersistedStateFactory persistedStateFactory = newPersistedStateFactory(nodeEnvironment); + final long globalVersion = randomLongBetween(1L, Long.MAX_VALUE); + final String indexUUID = UUIDs.randomBase64UUID(random()); + final long indexMetaDataVersion = randomLongBetween(1L, Long.MAX_VALUE); + + final long oldTerm = randomLongBetween(1L, Long.MAX_VALUE - 1); + final long newTerm = randomLongBetween(oldTerm + 1, Long.MAX_VALUE); + + try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { + final ClusterState clusterState = persistedState.getLastAcceptedState(); + persistedState.setLastAcceptedState(ClusterState.builder(clusterState) + .metaData(MetaData.builder(clusterState.metaData()) + .version(globalVersion) + .coordinationMetaData(CoordinationMetaData.builder(clusterState.coordinationMetaData()).term(oldTerm).build()) + .put(IndexMetaData.builder("test") + .version(indexMetaDataVersion - 1) // -1 because it's incremented in .put() + .settings(Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetaData.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) + .put(IndexMetaData.SETTING_INDEX_UUID, indexUUID)))) + .incrementVersion().build()); + } + + try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { + final ClusterState clusterState = persistedState.getLastAcceptedState(); + final IndexMetaData indexMetaData = clusterState.metaData().index("test"); + assertThat(indexMetaData.getIndexUUID(), equalTo(indexUUID)); + assertThat(indexMetaData.getVersion(), equalTo(indexMetaDataVersion)); + assertThat(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.get(indexMetaData.getSettings()), equalTo(0)); + + // ensure we do not wastefully persist the same index metadata version by making a bad update with the same version + persistedState.setLastAcceptedState(ClusterState.builder(clusterState) + .metaData(MetaData.builder(clusterState.metaData()) + .put(IndexMetaData.builder(indexMetaData).settings(Settings.builder() + .put(indexMetaData.getSettings()) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1)).build(), false)) + .incrementVersion().build()); + } + + try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { + final ClusterState clusterState = persistedState.getLastAcceptedState(); + final IndexMetaData indexMetaData = clusterState.metaData().index("test"); + assertThat(indexMetaData.getVersion(), equalTo(indexMetaDataVersion)); + assertThat(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.get(indexMetaData.getSettings()), equalTo(0)); + + // ensure that we do persist the same index metadata version by making an update with a higher version + persistedState.setLastAcceptedState(ClusterState.builder(clusterState) + .metaData(MetaData.builder(clusterState.metaData()) + .put(IndexMetaData.builder(indexMetaData).settings(Settings.builder() + .put(indexMetaData.getSettings()) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2)).build(), true)) + .incrementVersion().build()); + } + + try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { + final ClusterState clusterState = persistedState.getLastAcceptedState(); + final IndexMetaData indexMetaData = clusterState.metaData().index("test"); + assertThat(indexMetaData.getVersion(), equalTo(indexMetaDataVersion + 1)); + assertThat(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.get(indexMetaData.getSettings()), equalTo(2)); + + // ensure that we also persist the index metadata when the term changes + persistedState.setLastAcceptedState(ClusterState.builder(clusterState) + .metaData(MetaData.builder(clusterState.metaData()) + .coordinationMetaData(CoordinationMetaData.builder(clusterState.coordinationMetaData()).term(newTerm).build()) + .put(IndexMetaData.builder(indexMetaData).settings(Settings.builder() + .put(indexMetaData.getSettings()) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 3)).build(), false)) + .incrementVersion().build()); + } + + try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { + final ClusterState clusterState = persistedState.getLastAcceptedState(); + final IndexMetaData indexMetaData = clusterState.metaData().index("test"); + assertThat(indexMetaData.getIndexUUID(), equalTo(indexUUID)); + assertThat(indexMetaData.getVersion(), equalTo(indexMetaDataVersion + 1)); + assertThat(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.get(indexMetaData.getSettings()), equalTo(3)); + } + } + } + + public void testPersistsAndReloadsIndexMetadataForMultipleIndices() throws IOException { + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { + final LucenePersistedStateFactory persistedStateFactory = newPersistedStateFactory(nodeEnvironment); + + final long term = randomLongBetween(1L, Long.MAX_VALUE); + final String addedIndexUuid = UUIDs.randomBase64UUID(random()); + final String updatedIndexUuid = UUIDs.randomBase64UUID(random()); + final String deletedIndexUuid = UUIDs.randomBase64UUID(random()); + + try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { + final ClusterState clusterState = persistedState.getLastAcceptedState(); + persistedState.setLastAcceptedState(ClusterState.builder(clusterState) + .metaData(MetaData.builder(clusterState.metaData()) + .version(clusterState.metaData().version() + 1) + .coordinationMetaData(CoordinationMetaData.builder(clusterState.coordinationMetaData()).term(term).build()) + .put(IndexMetaData.builder("updated") + .version(randomLongBetween(0L, Long.MAX_VALUE - 1) - 1) // -1 because it's incremented in .put() + .settings(Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) + .put(IndexMetaData.SETTING_INDEX_UUID, updatedIndexUuid))) + .put(IndexMetaData.builder("deleted") + .version(randomLongBetween(0L, Long.MAX_VALUE - 1) - 1) // -1 because it's incremented in .put() + .settings(Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) + .put(IndexMetaData.SETTING_INDEX_UUID, deletedIndexUuid)))) + .incrementVersion().build()); + } + + try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { + final ClusterState clusterState = persistedState.getLastAcceptedState(); + + assertThat(clusterState.metaData().indices().size(), equalTo(2)); + assertThat(clusterState.metaData().index("updated").getIndexUUID(), equalTo(updatedIndexUuid)); + assertThat(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.get(clusterState.metaData().index("updated").getSettings()), + equalTo(1)); + assertThat(clusterState.metaData().index("deleted").getIndexUUID(), equalTo(deletedIndexUuid)); + + persistedState.setLastAcceptedState(ClusterState.builder(clusterState) + .metaData(MetaData.builder(clusterState.metaData()) + .version(clusterState.metaData().version() + 1) + .remove("deleted") + .put(IndexMetaData.builder("updated") + .settings(Settings.builder() + .put(clusterState.metaData().index("updated").getSettings()) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 2))) + .put(IndexMetaData.builder("added") + .version(randomLongBetween(0L, Long.MAX_VALUE - 1) - 1) // -1 because it's incremented in .put() + .settings(Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) + .put(IndexMetaData.SETTING_INDEX_UUID, addedIndexUuid)))) + .incrementVersion().build()); + } + + try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { + final ClusterState clusterState = persistedState.getLastAcceptedState(); + + assertThat(clusterState.metaData().indices().size(), equalTo(2)); + assertThat(clusterState.metaData().index("updated").getIndexUUID(), equalTo(updatedIndexUuid)); + assertThat(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.get(clusterState.metaData().index("updated").getSettings()), + equalTo(2)); + assertThat(clusterState.metaData().index("added").getIndexUUID(), equalTo(addedIndexUuid)); + assertThat(clusterState.metaData().index("deleted"), nullValue()); + } + } + } + + public void testReloadsMetadataAcrossMultipleSegments() throws IOException { + try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { + final LucenePersistedStateFactory persistedStateFactory = newPersistedStateFactory(nodeEnvironment); + + final int writes = between(5, 20); + final List indices = new ArrayList<>(writes); + + try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { + for (int i = 0; i < writes; i++) { + final Index index = new Index("test-" + i, UUIDs.randomBase64UUID(random())); + indices.add(index); + final ClusterState clusterState = persistedState.getLastAcceptedState(); + persistedState.setLastAcceptedState(ClusterState.builder(clusterState) + .metaData(MetaData.builder(clusterState.metaData()) + .version(i + 2) + .put(IndexMetaData.builder(index.getName()) + .settings(Settings.builder() + .put(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 1) + .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0) + .put(IndexMetaData.SETTING_INDEX_VERSION_CREATED.getKey(), Version.CURRENT) + .put(IndexMetaData.SETTING_INDEX_UUID, index.getUUID())))) + .incrementVersion().build()); + } + } + + try (CoordinationState.PersistedState persistedState = loadPersistedState(persistedStateFactory)) { + final ClusterState clusterState = persistedState.getLastAcceptedState(); + for (Index index : indices) { + final IndexMetaData indexMetaData = clusterState.metaData().index(index.getName()); + assertThat(indexMetaData.getIndexUUID(), equalTo(index.getUUID())); + } + } + } + } + + @Override + public Settings buildEnvSettings(Settings settings) { + assertTrue(settings.hasValue(Environment.PATH_DATA_SETTING.getKey())); + return Settings.builder() + .put(settings) + .put(Environment.PATH_HOME_SETTING.getKey(), createTempDir().toAbsolutePath()).build(); + } + + public static Path[] createDataPaths() { + final Path[] dataPaths = new Path[randomIntBetween(1, 4)]; + for (int i = 0; i < dataPaths.length; i++) { + dataPaths[i] = createTempDir(); + } + return dataPaths; + } + + private NodeEnvironment newNodeEnvironment(Path[] dataPaths) throws IOException { + return newNodeEnvironment(Settings.builder() + .putList(Environment.PATH_DATA_SETTING.getKey(), Arrays.stream(dataPaths).map(Path::toString).collect(Collectors.toList())) + .build()); + } + + private static CoordinationState.PersistedState loadPersistedState(LucenePersistedStateFactory persistedStateFactory) + throws IOException { + + return persistedStateFactory.loadPersistedState(LucenePersistedStateFactoryTests::clusterStateFromMetadata); + } + + private static ClusterState clusterStateFromMetadata(long version, MetaData metaData) { + return ClusterState.builder(ClusterName.DEFAULT).version(version).metaData(metaData).build(); + } + + +} diff --git a/server/src/test/java/org/elasticsearch/gateway/MetaDataWriteDataNodesIT.java b/server/src/test/java/org/elasticsearch/gateway/MetaDataWriteDataNodesIT.java index a5b644ed5cc2a..6587e6cfa92f2 100644 --- a/server/src/test/java/org/elasticsearch/gateway/MetaDataWriteDataNodesIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/MetaDataWriteDataNodesIT.java @@ -21,10 +21,11 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.cluster.coordination.Coordinator; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.discovery.Discovery; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.test.ESIntegTestCase; @@ -190,8 +191,7 @@ private boolean indexDirectoryExists(String nodeName, Index index) { } private ImmutableOpenMap getIndicesMetaDataOnNode(String nodeName) { - GatewayMetaState nodeMetaState = ((InternalTestCluster) cluster()).getInstance(GatewayMetaState.class, nodeName); - MetaData nodeMetaData = nodeMetaState.getMetaData(); - return nodeMetaData.getIndices(); + final Coordinator coordinator = (Coordinator) internalCluster().getInstance(Discovery.class, nodeName); + return coordinator.getApplierState().getMetaData().getIndices(); } } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java index df9638d17835a..ed12f948dcd98 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -35,7 +35,6 @@ import org.elasticsearch.cluster.coordination.CoordinationMetaData.VotingConfiguration; import org.elasticsearch.cluster.coordination.LinearizabilityChecker.History; import org.elasticsearch.cluster.coordination.LinearizabilityChecker.SequentialSpec; -import org.elasticsearch.cluster.metadata.Manifest; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeRole; @@ -63,7 +62,6 @@ import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.gateway.ClusterStateUpdaters; import org.elasticsearch.gateway.GatewayService; -import org.elasticsearch.gateway.MetaStateService; import org.elasticsearch.gateway.MockGatewayMetaState; import org.elasticsearch.node.Node; import org.elasticsearch.test.ESTestCase; @@ -741,17 +739,13 @@ class MockPersistedState implements CoordinationState.PersistedState { try { if (oldState.nodeEnvironment != null) { nodeEnvironment = oldState.nodeEnvironment; - final MetaStateService metaStateService = new MetaStateService(nodeEnvironment, xContentRegistry()); final MetaData updatedMetaData = adaptGlobalMetaData.apply(oldState.getLastAcceptedState().metaData()); if (updatedMetaData != oldState.getLastAcceptedState().metaData()) { - metaStateService.writeGlobalStateAndUpdateManifest("update global state", updatedMetaData); + throw new AssertionError("TODO adapting persistent metadata is not supported yet"); } final long updatedTerm = adaptCurrentTerm.apply(oldState.getCurrentTerm()); if (updatedTerm != oldState.getCurrentTerm()) { - final Manifest manifest = metaStateService.loadManifestOrEmpty(); - metaStateService.writeManifestAndCleanup("update term", - new Manifest(updatedTerm, manifest.getClusterStateVersion(), manifest.getGlobalGeneration(), - manifest.getIndexGenerations())); + throw new AssertionError("TODO adapting persistent current term is not supported yet"); } final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(newLocalNode); gatewayMetaState.start(Settings.EMPTY, nodeEnvironment, xContentRegistry()); @@ -854,6 +848,11 @@ public void setLastAcceptedState(ClusterState clusterState) { @Override public void close() { assertTrue(openPersistedStates.remove(this)); + try { + delegate.close(); + } catch (IOException e) { + throw new AssertionError("unexpected", e); + } } } diff --git a/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java b/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java index b73a90b428485..8e2e5f0da4ca4 100644 --- a/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java +++ b/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java @@ -20,11 +20,13 @@ package org.elasticsearch.gateway; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.plugins.MetaDataUpgrader; @@ -53,6 +55,13 @@ void upgradeMetaData(Settings settings, MetaStateService metaStateService, MetaD // MetaData upgrade is tested in GatewayMetaStateTests, we override this method to NOP to make mocking easier } + @Override + MetaData upgradeMetaDataForMasterEligibleNode(MetaData metaData, MetaDataIndexUpgradeService metaDataIndexUpgradeService, + MetaDataUpgrader metaDataUpgrader) { + // MetaData upgrade is tested in GatewayMetaStateTests, we override this method to NOP to make mocking easier + return metaData; + } + @Override ClusterState prepareInitialClusterState(TransportService transportService, ClusterService clusterService, ClusterState clusterState) { // Just set localNode here, not to mess with ClusterService and IndicesService mocking @@ -66,6 +75,6 @@ public void start(Settings settings, NodeEnvironment nodeEnvironment, NamedXCont when(clusterService.getClusterSettings()) .thenReturn(new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); start(settings, transportService, clusterService, new MetaStateService(nodeEnvironment, xContentRegistry), - null, null); + null, null, new LucenePersistedStateFactory(nodeEnvironment, xContentRegistry, BigArrays.NON_RECYCLING_INSTANCE)); } }