Skip to content

Add async dangling indices support #50642

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ private static IndexService validateActiveShardCountAndCreateIndexService(String
"]: cannot be greater than number of shard copies [" +
(tmpImd.getNumberOfReplicas() + 1) + "]");
}
return indicesService.createIndex(tmpImd, Collections.emptyList());
return indicesService.createIndex(tmpImd, Collections.emptyList(), false);
}

private void validate(CreateIndexClusterStateUpdateRequest request, ClusterState state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public ClusterState applyAliasActions(ClusterState currentState, Iterable<AliasA
if (indexService == null) {
// temporarily create the index and add mappings so we can parse the filter
try {
indexService = indicesService.createIndex(index, emptyList());
indexService = indicesService.createIndex(index, emptyList(), false);
indicesToClose.add(index.getIndex());
} catch (IOException e) {
throw new ElasticsearchException("Failed to create temporary index for parsing the alias", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ private static void validateAndAddTemplate(final PutRequest request, IndexTempla
.build();

final IndexMetaData tmpIndexMetadata = IndexMetaData.builder(temporaryIndexName).settings(dummySettings).build();
IndexService dummyIndexService = indicesService.createIndex(tmpIndexMetadata, Collections.emptyList());
IndexService dummyIndexService = indicesService.createIndex(tmpIndexMetadata, Collections.emptyList(), false);
createdIndex = dummyIndexService.index();

templateBuilder.order(request.order);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ ClusterState executeRefresh(final ClusterState currentState, final List<RefreshT
IndexService indexService = indicesService.indexService(indexMetaData.getIndex());
if (indexService == null) {
// we need to create the index here, and add the current mapping to it, so we can merge
indexService = indicesService.createIndex(indexMetaData, Collections.emptyList());
indexService = indicesService.createIndex(indexMetaData, Collections.emptyList(), false);
removeIndex = true;
indexService.mapperService().merge(indexMetaData, MergeReason.MAPPING_RECOVERY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ public void apply(Settings value, Settings current, Settings previous) {
IndicesQueryCache.INDICES_CACHE_QUERY_COUNT_SETTING,
IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING,
IndicesService.INDICES_ID_FIELD_DATA_ENABLED_SETTING,
IndicesService.WRITE_DANGLING_INDICES_INFO_SETTING,
MappingUpdatedAction.INDICES_MAPPING_DYNAMIC_TIMEOUT_SETTING,
MetaData.SETTING_READ_ONLY_SETTING,
MetaData.SETTING_READ_ONLY_ALLOW_DELETE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,11 @@ public void start(Settings settings, TransportService transportService, ClusterS
.build());
lucenePersistedState = new LucenePersistedState(
persistenceWriter, currentTerm, clusterState);
metaStateService.deleteAll(); // delete legacy files
if (DiscoveryNode.isDataNode(settings)) {
metaStateService.unreferenceAll(); // unreference legacy files (only keep them for dangling indices functionality)
} else {
metaStateService.deleteAll(); // delete legacy files
}
success = true;
} finally {
if (success == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,15 +264,27 @@ public void cleanupIndex(Index index, long currentGeneration) {
INDEX_META_DATA_FORMAT.cleanupOldFiles(currentGeneration, nodeEnv.indexPaths(index));
}

/**
* Creates empty cluster state file on disk, deleting global metadata and unreferencing all index metadata
* (only used for dangling indices at that point).
*/
public void unreferenceAll() throws IOException {
MANIFEST_FORMAT.writeAndCleanup(Manifest.empty(), nodeEnv.nodeDataPaths()); // write empty file so that indices become unreferenced
META_DATA_FORMAT.cleanupOldFiles(Long.MAX_VALUE, nodeEnv.nodeDataPaths());
}

/**
* Removes manifest file, global metadata and all index metadata
*/
public void deleteAll() throws IOException {
MANIFEST_FORMAT.cleanupOldFiles(Long.MAX_VALUE, nodeEnv.nodeDataPaths());
META_DATA_FORMAT.cleanupOldFiles(Long.MAX_VALUE, nodeEnv.nodeDataPaths());
// To ensure that the metadata is never reimported by loadFullStateBWC in case where the deletions here fail mid-way through,
// we first write an empty manifest file so that the indices become unreferenced, then clean up the indices, and only then delete
// the manifest file.
unreferenceAll();
for (String indexFolderName : nodeEnv.availableIndexFolders()) {
// delete meta state directories of indices
MetaDataStateFormat.deleteMetaState(nodeEnv.resolveIndexFolder(indexFolderName));
}
MANIFEST_FORMAT.cleanupOldFiles(Long.MAX_VALUE, nodeEnv.nodeDataPaths()); // finally delete manifest
}
}
42 changes: 38 additions & 4 deletions server/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.ShardLock;
import org.elasticsearch.env.ShardLockObtainFailedException;
import org.elasticsearch.gateway.MetaDataStateFormat;
import org.elasticsearch.gateway.WriteStateException;
import org.elasticsearch.index.analysis.IndexAnalyzers;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
Expand Down Expand Up @@ -90,6 +92,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
Expand Down Expand Up @@ -324,6 +327,29 @@ public synchronized void close(final String reason, boolean delete) throws IOExc
}
}

// method is synchronized so that IndexService can't be closed while we're writing out dangling indices information
public synchronized void writeDanglingIndicesInfo() {
if (closed.get()) {
return;
}
try {
IndexMetaData.FORMAT.writeAndCleanup(getMetaData(), nodeEnv.indexPaths(index()));
} catch (WriteStateException e) {
logger.warn(() -> new ParameterizedMessage("failed to write dangling indices state for index {}", index()), e);
}
}

// method is synchronized so that IndexService can't be closed while we're deleting dangling indices information
public synchronized void deleteDanglingIndicesInfo() {
if (closed.get()) {
return;
}
try {
MetaDataStateFormat.deleteMetaState(nodeEnv.indexPaths(index()));
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("failed to delete dangling indices state for index {}", index()), e);
}
}

public String indexUUID() {
return indexSettings.getUUID();
Expand Down Expand Up @@ -669,24 +695,30 @@ public IndexMetaData getMetaData() {
return indexSettings.getIndexMetaData();
}

private final CopyOnWriteArrayList<Consumer<IndexMetaData>> metaDataListeners = new CopyOnWriteArrayList<>();

public void addMetaDataListener(Consumer<IndexMetaData> listener) {
metaDataListeners.add(listener);
}

@Override
public synchronized void updateMetaData(final IndexMetaData currentIndexMetaData, final IndexMetaData newIndexMetaData) {
final boolean updateIndexMetaData = indexSettings.updateIndexMetaData(newIndexMetaData);
final boolean updateIndexSettings = indexSettings.updateIndexMetaData(newIndexMetaData);

if (Assertions.ENABLED && currentIndexMetaData != null) {
final long currentSettingsVersion = currentIndexMetaData.getSettingsVersion();
final long newSettingsVersion = newIndexMetaData.getSettingsVersion();
if (currentSettingsVersion == newSettingsVersion) {
assert updateIndexMetaData == false;
assert updateIndexSettings == false;
} else {
assert updateIndexMetaData;
assert updateIndexSettings;
assert currentSettingsVersion < newSettingsVersion :
"expected current settings version [" + currentSettingsVersion + "] "
+ "to be less than new settings version [" + newSettingsVersion + "]";
}
}

if (updateIndexMetaData) {
if (updateIndexSettings) {
for (final IndexShard shard : this.shards.values()) {
try {
shard.onSettingsChanged();
Expand Down Expand Up @@ -722,6 +754,8 @@ public boolean isForceExecution() {
}
updateFsyncTaskIfNecessary();
}

metaDataListeners.forEach(c -> c.accept(newIndexMetaData));
}

private void updateFsyncTaskIfNecessary() {
Expand Down
94 changes: 92 additions & 2 deletions server/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
Expand All @@ -65,8 +66,12 @@
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
Expand Down Expand Up @@ -116,6 +121,7 @@
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.node.Node;
import org.elasticsearch.plugins.IndexStorePlugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.RepositoriesService;
Expand Down Expand Up @@ -158,6 +164,7 @@
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.common.util.CollectionUtils.arrayAsArrayList;
import static org.elasticsearch.common.util.concurrent.EsExecutors.daemonThreadFactory;
import static org.elasticsearch.index.IndexService.IndexCreationContext.CREATE_INDEX;
import static org.elasticsearch.index.IndexService.IndexCreationContext.META_DATA_VERIFICATION;
import static org.elasticsearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder;
Expand All @@ -172,6 +179,11 @@ public class IndicesService extends AbstractLifecycleComponent
public static final Setting<Boolean> INDICES_ID_FIELD_DATA_ENABLED_SETTING =
Setting.boolSetting("indices.id_field_data.enabled", false, Property.Dynamic, Property.NodeScope);

public static final Setting<Boolean> WRITE_DANGLING_INDICES_INFO_SETTING = Setting.boolSetting(
"gateway.write_dangling_indices_info",
true,
Setting.Property.NodeScope
);

/**
* The node's settings.
Expand Down Expand Up @@ -209,6 +221,12 @@ public class IndicesService extends AbstractLifecycleComponent
private final CountDownLatch closeLatch = new CountDownLatch(1);
private volatile boolean idFieldDataEnabled;

@Nullable
private final EsThreadPoolExecutor danglingIndicesThreadPoolExecutor;
private final Set<Index> danglingIndicesToWrite = Sets.newConcurrentHashSet();
private final boolean nodeWriteDanglingIndicesInfo;


@Override
protected void doStart() {
// Start thread that will manage cleaning the field data cache periodically
Expand Down Expand Up @@ -289,12 +307,25 @@ protected void closeInternal() {
}
}
};

final String nodeName = Objects.requireNonNull(Node.NODE_NAME_SETTING.get(settings));
nodeWriteDanglingIndicesInfo = WRITE_DANGLING_INDICES_INFO_SETTING.get(settings);
danglingIndicesThreadPoolExecutor = nodeWriteDanglingIndicesInfo ? EsExecutors.newScaling(
nodeName + "/" + DANGLING_INDICES_UPDATE_THREAD_NAME,
1, 1,
0, TimeUnit.MILLISECONDS,
daemonThreadFactory(nodeName, DANGLING_INDICES_UPDATE_THREAD_NAME),
threadPool.getThreadContext()) : null;
}

private static final String DANGLING_INDICES_UPDATE_THREAD_NAME = "DanglingIndices#updateTask";

@Override
protected void doStop() {
ThreadPool.terminate(danglingIndicesThreadPoolExecutor, 10, TimeUnit.SECONDS);

ExecutorService indicesStopExecutor =
Executors.newFixedThreadPool(5, EsExecutors.daemonThreadFactory(settings, "indices_shutdown"));
Executors.newFixedThreadPool(5, daemonThreadFactory(settings, "indices_shutdown"));

// Copy indices because we modify it asynchronously in the body of the loop
final Set<Index> indices = this.indices.values().stream().map(s -> s.index()).collect(Collectors.toSet());
Expand Down Expand Up @@ -455,6 +486,7 @@ public boolean hasIndex(Index index) {
public IndexService indexService(Index index) {
return indices.get(index.getUUID());
}

/**
* Returns an IndexService for the specified index if exists otherwise a {@link IndexNotFoundException} is thrown.
*/
Expand All @@ -478,7 +510,8 @@ public IndexService indexServiceSafe(Index index) {
*/
@Override
public synchronized IndexService createIndex(
final IndexMetaData indexMetaData, final List<IndexEventListener> builtInListeners) throws IOException {
final IndexMetaData indexMetaData, final List<IndexEventListener> builtInListeners,
final boolean writeDanglingIndices) throws IOException {
ensureChangesAllowed();
if (indexMetaData.getIndexUUID().equals(IndexMetaData.INDEX_UUID_NA_VALUE)) {
throw new IllegalArgumentException("index must have a real UUID found value: [" + indexMetaData.getIndexUUID() + "]");
Expand Down Expand Up @@ -514,8 +547,18 @@ public void onStoreClosed(ShardId shardId) {
indexingMemoryController);
boolean success = false;
try {
if (writeDanglingIndices && nodeWriteDanglingIndicesInfo) {
indexService.addMetaDataListener(imd -> updateDanglingIndicesInfo(index));
}
indexService.getIndexEventListener().afterIndexCreated(indexService);
indices = Maps.copyMapWithAddedEntry(indices, index.getUUID(), indexService);
if (writeDanglingIndices) {
if (nodeWriteDanglingIndicesInfo) {
updateDanglingIndicesInfo(index);
} else {
indexService.deleteDanglingIndicesInfo();
}
}
success = true;
return indexService;
} finally {
Expand Down Expand Up @@ -1487,4 +1530,51 @@ public static Optional<String> checkShardLimit(int newShards, ClusterState state
}
return Optional.empty();
}

private void updateDanglingIndicesInfo(Index index) {
assert DiscoveryNode.isDataNode(settings) : "dangling indices information should only be persisted on data nodes";
assert nodeWriteDanglingIndicesInfo : "writing dangling indices info is not enabled";
assert danglingIndicesThreadPoolExecutor != null : "executor for dangling indices info is not available";
if (danglingIndicesToWrite.add(index)) {
logger.trace("triggered dangling indices update for {}", index);
final long triggeredTimeMillis = threadPool.relativeTimeInMillis();
try {
danglingIndicesThreadPoolExecutor.execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to write dangling indices state for index {}", index), e);
}

@Override
protected void doRun() {
final boolean exists = danglingIndicesToWrite.remove(index);
assert exists : "removed non-existing item for " + index;
final IndexService indexService = indices.get(index.getUUID());
if (indexService != null) {
final long executedTimeMillis = threadPool.relativeTimeInMillis();
logger.trace("writing out dangling indices state for index {}, triggered {} ago", index,
TimeValue.timeValueMillis(Math.min(0L, executedTimeMillis - triggeredTimeMillis)));
indexService.writeDanglingIndicesInfo();
final long completedTimeMillis = threadPool.relativeTimeInMillis();
logger.trace("writing out of dangling indices state for index {} completed after {}", index,
TimeValue.timeValueMillis(Math.min(0L, completedTimeMillis - executedTimeMillis)));
} else {
logger.trace("omit writing dangling indices state for index {} as index is deallocated on this node", index);
}
}
});
} catch (EsRejectedExecutionException e) {
// ignore cases where we are shutting down..., there is really nothing interesting to be done here...
assert danglingIndicesThreadPoolExecutor.isShutdown();
}
} else {
logger.trace("dangling indices update already pending for {}", index);
}
}

// visible for testing
public boolean allPendingDanglingIndicesWritten() {
return nodeWriteDanglingIndicesInfo == false ||
(danglingIndicesToWrite.isEmpty() && danglingIndicesThreadPoolExecutor.getActiveCount() == 0);
}
}
Loading