Skip to content

Use Lucene exclusively for metadata storage #50144

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ task verifyVersions {
* after the backport of the backcompat code is complete.
*/

boolean bwc_tests_enabled = false
final String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/issues/48701" /* place a PR link here when committing bwc changes */
boolean bwc_tests_enabled = true
final String bwc_tests_disabled_issue = "" /* place a PR link here when committing bwc changes */
if (bwc_tests_enabled == false) {
if (bwc_tests_disabled_issue.isEmpty()) {
throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false")
Expand Down
139 changes: 6 additions & 133 deletions server/src/main/java/org/elasticsearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,22 @@
package org.elasticsearch.gateway;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.coordination.CoordinationState.PersistedState;
import org.elasticsearch.cluster.coordination.InMemoryPersistedState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.Manifest;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.Index;
import org.elasticsearch.plugins.MetaDataUpgrader;
import org.elasticsearch.transport.TransportService;

Expand All @@ -65,7 +57,6 @@
* non-stale state, and master-ineligible nodes receive the real cluster state from the elected master after joining the cluster.
*/
public class GatewayMetaState implements Closeable {
private static final Logger logger = LogManager.getLogger(GatewayMetaState.class);

// Set by calling start()
private final SetOnce<PersistedState> persistedState = new SetOnce<>();
Expand All @@ -81,45 +72,23 @@ public MetaData getMetaData() {
}

public void start(Settings settings, TransportService transportService, ClusterService clusterService,
MetaStateService metaStateService, MetaDataIndexUpgradeService metaDataIndexUpgradeService,
MetaDataIndexUpgradeService metaDataIndexUpgradeService,
MetaDataUpgrader metaDataUpgrader, LucenePersistedStateFactory lucenePersistedStateFactory) {
assert persistedState.get() == null : "should only start once, but already have " + persistedState.get();

if (DiscoveryNode.isMasterNode(settings)) {
if (DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings)) {
try {
persistedState.set(lucenePersistedStateFactory.loadPersistedState((version, metadata) ->
PersistedState ps = lucenePersistedStateFactory.loadPersistedState((version, metadata) ->
prepareInitialClusterState(transportService, clusterService,
ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))
.version(version)
.metaData(upgradeMetaDataForMasterEligibleNode(metadata, metaDataIndexUpgradeService, metaDataUpgrader))
.build())));
.build()));
persistedState.set(ps);
} catch (IOException e) {
throw new ElasticsearchException("failed to load metadata", e);
}
}

if (DiscoveryNode.isDataNode(settings)) {
final Tuple<Manifest, ClusterState> manifestClusterStateTuple;
try {
upgradeMetaData(settings, metaStateService, metaDataIndexUpgradeService, metaDataUpgrader);
manifestClusterStateTuple = loadStateAndManifest(ClusterName.CLUSTER_NAME_SETTING.get(settings), metaStateService);
} catch (IOException e) {
throw new ElasticsearchException("failed to load metadata", e);
}

final IncrementalClusterStateWriter incrementalClusterStateWriter
= new IncrementalClusterStateWriter(settings, clusterService.getClusterSettings(), metaStateService,
manifestClusterStateTuple.v1(),
prepareInitialClusterState(transportService, clusterService, manifestClusterStateTuple.v2()),
transportService.getThreadPool()::relativeTimeInMillis);

clusterService.addLowPriorityApplier(new GatewayClusterApplier(incrementalClusterStateWriter));

if (DiscoveryNode.isMasterNode(settings) == false) {
persistedState.set(
new InMemoryPersistedState(manifestClusterStateTuple.v1().getCurrentTerm(), manifestClusterStateTuple.v2()));
}
} else if (DiscoveryNode.isMasterNode(settings) == false) {
} else {
persistedState.set(
new InMemoryPersistedState(0L, ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings)).build()));
}
Expand All @@ -144,70 +113,6 @@ MetaData upgradeMetaDataForMasterEligibleNode(MetaData metaData,
return upgradeMetaData(metaData, metaDataIndexUpgradeService, metaDataUpgrader);
}

// exposed so it can be overridden by tests
void upgradeMetaData(Settings settings, MetaStateService metaStateService, MetaDataIndexUpgradeService metaDataIndexUpgradeService,
MetaDataUpgrader metaDataUpgrader) throws IOException {
if (isMasterOrDataNode(settings)) {
try {
final Tuple<Manifest, MetaData> metaStateAndData = metaStateService.loadFullState();
final Manifest manifest = metaStateAndData.v1();
final MetaData metaData = metaStateAndData.v2();

// We finished global state validation and successfully checked all indices for backward compatibility
// and found no non-upgradable indices, which means the upgrade can continue.
// Now it's safe to overwrite global and index metadata.
// We don't re-write metadata if it's not upgraded by upgrade plugins, because
// if there is manifest file, it means metadata is properly persisted to all data paths
// if there is no manifest file (upgrade from 6.x to 7.x) metadata might be missing on some data paths,
// but anyway we will re-write it as soon as we receive first ClusterState
final IncrementalClusterStateWriter.AtomicClusterStateWriter writer
= new IncrementalClusterStateWriter.AtomicClusterStateWriter(metaStateService, manifest);
final MetaData upgradedMetaData = upgradeMetaData(metaData, metaDataIndexUpgradeService, metaDataUpgrader);

final long globalStateGeneration;
if (MetaData.isGlobalStateEquals(metaData, upgradedMetaData) == false) {
globalStateGeneration = writer.writeGlobalState("upgrade", upgradedMetaData);
} else {
globalStateGeneration = manifest.getGlobalGeneration();
}

Map<Index, Long> indices = new HashMap<>(manifest.getIndexGenerations());
for (IndexMetaData indexMetaData : upgradedMetaData) {
if (metaData.hasIndexMetaData(indexMetaData) == false) {
final long generation = writer.writeIndex("upgrade", indexMetaData);
indices.put(indexMetaData.getIndex(), generation);
}
}

final Manifest newManifest = new Manifest(manifest.getCurrentTerm(), manifest.getClusterStateVersion(),
globalStateGeneration, indices);
writer.writeManifestAndCleanup("startup", newManifest);
} catch (Exception e) {
logger.error("failed to read or upgrade local state, exiting...", e);
throw e;
}
}
}

private static Tuple<Manifest,ClusterState> loadStateAndManifest(ClusterName clusterName,
MetaStateService metaStateService) throws IOException {
final long startNS = System.nanoTime();
final Tuple<Manifest, MetaData> manifestAndMetaData = metaStateService.loadFullState();
final Manifest manifest = manifestAndMetaData.v1();

final ClusterState clusterState = ClusterState.builder(clusterName)
.version(manifest.getClusterStateVersion())
.metaData(manifestAndMetaData.v2()).build();

logger.debug("took {} to load state", TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - startNS)));

return Tuple.tuple(manifest, clusterState);
}

private static boolean isMasterOrDataNode(Settings settings) {
return DiscoveryNode.isMasterNode(settings) || DiscoveryNode.isDataNode(settings);
}

/**
* Elasticsearch 2.0 removed several deprecated features and as well as support for Lucene 3.x. This method calls
* {@link MetaDataIndexUpgradeService} to makes sure that indices are compatible with the current version. The
Expand Down Expand Up @@ -262,36 +167,4 @@ public void close() throws IOException {
IOUtils.close(persistedState.get());
}

private static class GatewayClusterApplier implements ClusterStateApplier {

private final IncrementalClusterStateWriter incrementalClusterStateWriter;

private GatewayClusterApplier(IncrementalClusterStateWriter incrementalClusterStateWriter) {
this.incrementalClusterStateWriter = incrementalClusterStateWriter;
}

@Override
public void applyClusterState(ClusterChangedEvent event) {
if (event.state().blocks().disableStatePersistence()) {
incrementalClusterStateWriter.setIncrementalWrite(false);
return;
}

try {
// Hack: This is to ensure that non-master-eligible Zen2 nodes always store a current term
// that's higher than the last accepted term.
// TODO: can we get rid of this hack?
if (event.state().term() > incrementalClusterStateWriter.getPreviousManifest().getCurrentTerm()) {
incrementalClusterStateWriter.setCurrentTerm(event.state().term());
}

incrementalClusterStateWriter.updateClusterState(event.state());
incrementalClusterStateWriter.setIncrementalWrite(true);
} catch (WriteStateException e) {
logger.warn("Exception occurred when storing new meta data", e);
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.CoordinationState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.Manifest;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.Loggers;
Expand Down Expand Up @@ -125,11 +127,43 @@ public class LucenePersistedStateFactory {
private final NodeEnvironment nodeEnvironment;
private final NamedXContentRegistry namedXContentRegistry;
private final BigArrays bigArrays;
private final LegacyLoader legacyLoader;

public LucenePersistedStateFactory(NodeEnvironment nodeEnvironment, NamedXContentRegistry namedXContentRegistry, BigArrays bigArrays) {
/**
* Allows interacting with legacy metadata
*/
public interface LegacyLoader {
/**
* Loads legacy state
*/
Tuple<Manifest, MetaData> loadClusterState() throws IOException;

/**
* Cleans legacy state
*/
void clean() throws IOException;
}

LucenePersistedStateFactory(NodeEnvironment nodeEnvironment, NamedXContentRegistry namedXContentRegistry, BigArrays bigArrays) {
this(nodeEnvironment, namedXContentRegistry, bigArrays, new LegacyLoader() {
@Override
public Tuple<Manifest, MetaData> loadClusterState() {
return new Tuple<>(Manifest.empty(), MetaData.EMPTY_META_DATA);
}

@Override
public void clean() {

}
});
}

public LucenePersistedStateFactory(NodeEnvironment nodeEnvironment, NamedXContentRegistry namedXContentRegistry, BigArrays bigArrays,
LegacyLoader legacyLoader) {
this.nodeEnvironment = nodeEnvironment;
this.namedXContentRegistry = namedXContentRegistry;
this.bigArrays = bigArrays;
this.legacyLoader = legacyLoader;
}

CoordinationState.PersistedState loadPersistedState(BiFunction<Long, MetaData, ClusterState> clusterStateFromMetaData)
Expand Down Expand Up @@ -175,7 +209,9 @@ CoordinationState.PersistedState loadPersistedState(BiFunction<Long, MetaData, C
success = true;
return lucenePersistedState;
} finally {
if (success == false) {
if (success) {
legacyLoader.clean();
} else {
IOUtils.closeWhileHandlingException(lucenePersistedState);
}
}
Expand Down Expand Up @@ -205,10 +241,12 @@ private OnDiskState(String nodeId, Path dataPath, long currentTerm, long lastAcc
}
}

private static final OnDiskState NO_ON_DISK_STATE = new OnDiskState(null, null, 0L, 0L, MetaData.EMPTY_META_DATA);

private OnDiskState loadBestOnDiskState() throws IOException {
String committedClusterUuid = null;
Path committedClusterUuidPath = null;
OnDiskState bestOnDiskState = new OnDiskState(null, null, 0L, 0L, MetaData.EMPTY_META_DATA);
OnDiskState bestOnDiskState = NO_ON_DISK_STATE;
OnDiskState maxCurrentTermOnDiskState = bestOnDiskState;

// We use a write-all-read-one strategy: metadata is written to every data path when accepting it, which means it is mostly
Expand Down Expand Up @@ -243,7 +281,8 @@ private OnDiskState loadBestOnDiskState() throws IOException {

long acceptedTerm = onDiskState.metaData.coordinationMetaData().term();
long maxAcceptedTerm = bestOnDiskState.metaData.coordinationMetaData().term();
if (acceptedTerm > maxAcceptedTerm
if (bestOnDiskState == NO_ON_DISK_STATE
|| acceptedTerm > maxAcceptedTerm
|| (acceptedTerm == maxAcceptedTerm
&& (onDiskState.lastAcceptedVersion > bestOnDiskState.lastAcceptedVersion
|| (onDiskState.lastAcceptedVersion == bestOnDiskState.lastAcceptedVersion)
Expand All @@ -262,6 +301,14 @@ private OnDiskState loadBestOnDiskState() throws IOException {
"] with greater term [" + maxCurrentTermOnDiskState.currentTerm + "]");
}

if (bestOnDiskState == NO_ON_DISK_STATE) {
final Tuple<Manifest, MetaData> legacyState = legacyLoader.loadClusterState();
if (legacyState.v1().isEmpty() == false) {
return new OnDiskState(nodeEnvironment.nodeId(), null, legacyState.v1().getCurrentTerm(),
legacyState.v1().getClusterStateVersion(), legacyState.v2());
}
}

return bestOnDiskState;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public MetaStateService(NodeEnvironment nodeEnv, NamedXContentRegistry namedXCon
* meta state with globalGeneration -1 and empty meta data is returned.
* @throws IOException if some IOException when loading files occurs or there is no metadata referenced by manifest file.
*/
Tuple<Manifest, MetaData> loadFullState() throws IOException {
public Tuple<Manifest, MetaData> loadFullState() throws IOException {
final Manifest manifest = MANIFEST_FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPaths());
if (manifest == null) {
return loadFullStateBWC();
Expand Down Expand Up @@ -184,17 +184,6 @@ List<IndexMetaData> loadIndicesStates(Predicate<String> excludeIndexPathIdsPredi
return indexMetaDataList;
}

/**
* Loads Manifest file from disk, returns <code>Manifest.empty()</code> if there is no manifest file.
*/
public Manifest loadManifestOrEmpty() throws IOException {
Manifest manifest = MANIFEST_FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.nodeDataPaths());
if (manifest == null) {
manifest = Manifest.empty();
}
return manifest;
}

/**
* Loads the global state, *without* index state, see {@link #loadFullState()} for that.
*/
Expand Down Expand Up @@ -276,28 +265,13 @@ public void cleanupIndex(Index index, long currentGeneration) {
}

/**
* Writes index metadata and updates manifest file accordingly.
* Used by tests.
* Removes manifest file, global metadata and all index metadata
*/
public void writeIndexAndUpdateManifest(String reason, IndexMetaData metaData) throws IOException {
long generation = writeIndex(reason, metaData);
Manifest manifest = loadManifestOrEmpty();
Map<Index, Long> indices = new HashMap<>(manifest.getIndexGenerations());
indices.put(metaData.getIndex(), generation);
manifest = new Manifest(manifest.getCurrentTerm(), manifest.getClusterStateVersion(), manifest.getGlobalGeneration(), indices);
writeManifestAndCleanup(reason, manifest);
cleanupIndex(metaData.getIndex(), generation);
}

/**
* Writes global metadata and updates manifest file accordingly.
* Used by tests.
*/
public void writeGlobalStateAndUpdateManifest(String reason, MetaData metaData) throws IOException {
long generation = writeGlobalState(reason, metaData);
Manifest manifest = loadManifestOrEmpty();
manifest = new Manifest(manifest.getCurrentTerm(), manifest.getClusterStateVersion(), generation, manifest.getIndexGenerations());
writeManifestAndCleanup(reason, manifest);
cleanupGlobalState(generation);
public void deleteAll() throws IOException {
MANIFEST_FORMAT.cleanupOldFiles(Long.MAX_VALUE, nodeEnv.nodeDataPaths());
META_DATA_FORMAT.cleanupOldFiles(Long.MAX_VALUE, nodeEnv.nodeDataPaths());
for (String indexFolderName : nodeEnv.availableIndexFolders()) {
INDEX_META_DATA_FORMAT.cleanupOldFiles(Long.MAX_VALUE, nodeEnv.resolveIndexFolder(indexFolderName));
}
}
}
Loading