diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java index 6efb9c8e89bed..f692fb6046850 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterShardHealth; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -43,6 +44,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.ImmutableOpenIntMap; import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.util.concurrent.CountDown; @@ -100,7 +102,7 @@ protected void masterOperation(Task task, IndicesShardStoresRequest request, Clu final RoutingTable routingTables = state.routingTable(); final RoutingNodes routingNodes = state.getRoutingNodes(); final String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(state, request); - final Set shardIdsToFetch = new HashSet<>(); + final Set> shardsToFetch = new HashSet<>(); logger.trace("using cluster state version [{}] to determine shards", state.version()); // collect relevant shard ids of the requested indices for fetching store infos @@ -109,11 +111,12 @@ protected void masterOperation(Task task, IndicesShardStoresRequest request, Clu if (indexShardRoutingTables == null) { continue; } + final String customDataPath = IndexMetaData.INDEX_DATA_PATH_SETTING.get(state.metaData().index(index).getSettings()); for (IndexShardRoutingTable routing : indexShardRoutingTables) { final int shardId = routing.shardId().id(); ClusterShardHealth shardHealth = new ClusterShardHealth(shardId, routing); if (request.shardStatuses().contains(shardHealth.getStatus())) { - shardIdsToFetch.add(routing.shardId()); + shardsToFetch.add(Tuple.tuple(routing.shardId(), customDataPath)); } } } @@ -123,7 +126,7 @@ protected void masterOperation(Task task, IndicesShardStoresRequest request, Clu // we could fetch all shard store info from every node once (nNodes requests) // we have to implement a TransportNodesAction instead of using TransportNodesListGatewayStartedShards // for fetching shard stores info, that operates on a list of shards instead of a single shard - new AsyncShardStoresInfoFetches(state.nodes(), routingNodes, shardIdsToFetch, listener).start(); + new AsyncShardStoresInfoFetches(state.nodes(), routingNodes, shardsToFetch, listener).start(); } @Override @@ -135,46 +138,46 @@ protected ClusterBlockException checkBlock(IndicesShardStoresRequest request, Cl private class AsyncShardStoresInfoFetches { private final DiscoveryNodes nodes; private final RoutingNodes routingNodes; - private final Set shardIds; + private final Set> shards; private final ActionListener listener; private CountDown expectedOps; private final Queue fetchResponses; - AsyncShardStoresInfoFetches(DiscoveryNodes nodes, RoutingNodes routingNodes, Set shardIds, + AsyncShardStoresInfoFetches(DiscoveryNodes nodes, RoutingNodes routingNodes, Set> shards, ActionListener listener) { this.nodes = nodes; this.routingNodes = routingNodes; - this.shardIds = shardIds; + this.shards = shards; this.listener = listener; this.fetchResponses = new ConcurrentLinkedQueue<>(); - this.expectedOps = new CountDown(shardIds.size()); + this.expectedOps = new CountDown(shards.size()); } void start() { - if (shardIds.isEmpty()) { + if (shards.isEmpty()) { listener.onResponse(new IndicesShardStoresResponse()); } else { // explicitely type lister, some IDEs (Eclipse) are not able to correctly infer the function type Lister, NodeGatewayStartedShards> lister = this::listStartedShards; - for (ShardId shardId : shardIds) { - InternalAsyncFetch fetch = new InternalAsyncFetch(logger, "shard_stores", shardId, lister); + for (Tuple shard : shards) { + InternalAsyncFetch fetch = new InternalAsyncFetch(logger, "shard_stores", shard.v1(), shard.v2(), lister); fetch.fetchData(nodes, Collections.emptySet()); } } } - private void listStartedShards(ShardId shardId, DiscoveryNode[] nodes, + private void listStartedShards(ShardId shardId, String customDataPath, DiscoveryNode[] nodes, ActionListener> listener) { - var request = new TransportNodesListGatewayStartedShards.Request(shardId, nodes); + var request = new TransportNodesListGatewayStartedShards.Request(shardId, customDataPath, nodes); client.executeLocally(TransportNodesListGatewayStartedShards.TYPE, request, ActionListener.wrap(listener::onResponse, listener::onFailure)); } private class InternalAsyncFetch extends AsyncShardFetch { - InternalAsyncFetch(Logger logger, String type, ShardId shardId, + InternalAsyncFetch(Logger logger, String type, ShardId shardId, String customDataPath, Lister, NodeGatewayStartedShards> action) { - super(logger, type, shardId, action); + super(logger, type, shardId, customDataPath, action); } @Override diff --git a/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java b/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java index 160662a63e5b3..ec5b4fe43c8fe 100644 --- a/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java +++ b/server/src/main/java/org/elasticsearch/env/NodeEnvironment.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.Strings; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.Directory; @@ -610,7 +611,7 @@ public void deleteShardDirectoryUnderLock(ShardLock lock, IndexSettings indexSet acquireFSLockForPaths(indexSettings, paths); IOUtils.rm(paths); if (indexSettings.hasCustomDataPath()) { - Path customLocation = resolveCustomLocation(indexSettings, shardId); + Path customLocation = resolveCustomLocation(indexSettings.customDataPath(), shardId); logger.trace("acquiring lock for {}, custom path: [{}]", shardId, customLocation); acquireFSLockForPaths(indexSettings, customLocation); logger.trace("deleting custom shard {} directory [{}]", shardId, customLocation); @@ -687,7 +688,7 @@ public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettin logger.trace("deleting index {} directory, paths({}): [{}]", index, indexPaths.length, indexPaths); IOUtils.rm(indexPaths); if (indexSettings.hasCustomDataPath()) { - Path customLocation = resolveIndexCustomLocation(indexSettings); + Path customLocation = resolveIndexCustomLocation(indexSettings.customDataPath(), index.getUUID()); logger.trace("deleting custom index {} directory [{}]", index, customLocation); IOUtils.rm(customLocation); } @@ -933,7 +934,7 @@ public Path[] indexPaths(Index index) { * returned paths. The returned array may contain paths to non-existing directories. * * @see IndexSettings#hasCustomDataPath() - * @see #resolveCustomLocation(IndexSettings, ShardId) + * @see #resolveCustomLocation(String, ShardId) * */ public Path[] availableShardPaths(ShardId shardId) { @@ -1233,17 +1234,12 @@ private static boolean isIndexMetaDataPath(Path path) { /** * Resolve the custom path for a index's shard. - * Uses the {@code IndexMetaData.SETTING_DATA_PATH} setting to determine - * the root path for the index. - * - * @param indexSettings settings for the index */ - public static Path resolveBaseCustomLocation(IndexSettings indexSettings, Path sharedDataPath) { - String customDataDir = indexSettings.customDataPath(); - if (customDataDir != null) { + public static Path resolveBaseCustomLocation(String customDataPath, Path sharedDataPath) { + if (Strings.isNotEmpty(customDataPath)) { // This assert is because this should be caught by MetaDataCreateIndexService assert sharedDataPath != null; - return sharedDataPath.resolve(customDataDir).resolve("0"); + return sharedDataPath.resolve(customDataPath).resolve("0"); } else { throw new IllegalArgumentException("no custom " + IndexMetaData.SETTING_DATA_PATH + " setting available"); } @@ -1254,14 +1250,14 @@ public static Path resolveBaseCustomLocation(IndexSettings indexSettings, Path s * Uses the {@code IndexMetaData.SETTING_DATA_PATH} setting to determine * the root path for the index. * - * @param indexSettings settings for the index + * @param customDataPath the custom data path */ - private Path resolveIndexCustomLocation(IndexSettings indexSettings) { - return resolveIndexCustomLocation(indexSettings, sharedDataPath); + private Path resolveIndexCustomLocation(String customDataPath, String indexUUID) { + return resolveIndexCustomLocation(customDataPath, indexUUID, sharedDataPath); } - private static Path resolveIndexCustomLocation(IndexSettings indexSettings, Path sharedDataPath) { - return resolveBaseCustomLocation(indexSettings, sharedDataPath).resolve(indexSettings.getUUID()); + private static Path resolveIndexCustomLocation(String customDataPath, String indexUUID, Path sharedDataPath) { + return resolveBaseCustomLocation(customDataPath, sharedDataPath).resolve(indexUUID); } /** @@ -1269,15 +1265,16 @@ private static Path resolveIndexCustomLocation(IndexSettings indexSettings, Path * Uses the {@code IndexMetaData.SETTING_DATA_PATH} setting to determine * the root path for the index. * - * @param indexSettings settings for the index + * @param customDataPath the custom data path * @param shardId shard to resolve the path to */ - public Path resolveCustomLocation(IndexSettings indexSettings, final ShardId shardId) { - return resolveCustomLocation(indexSettings, shardId, sharedDataPath); + public Path resolveCustomLocation(String customDataPath, final ShardId shardId) { + return resolveCustomLocation(customDataPath, shardId, sharedDataPath); } - public static Path resolveCustomLocation(IndexSettings indexSettings, final ShardId shardId, Path sharedDataPath) { - return resolveIndexCustomLocation(indexSettings, sharedDataPath).resolve(Integer.toString(shardId.id())); + public static Path resolveCustomLocation(String customDataPath, final ShardId shardId, Path sharedDataPath) { + return resolveIndexCustomLocation(customDataPath, shardId.getIndex().getUUID(), + sharedDataPath).resolve(Integer.toString(shardId.id())); } /** diff --git a/server/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java index 007357ee54a75..43f7a29cbcb95 100644 --- a/server/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java +++ b/server/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java @@ -42,6 +42,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -61,22 +62,25 @@ public abstract class AsyncShardFetch implements Rel * An action that lists the relevant shard data that needs to be fetched. */ public interface Lister, NodeResponse extends BaseNodeResponse> { - void list(ShardId shardId, DiscoveryNode[] nodes, ActionListener listener); + void list(ShardId shardId, @Nullable String customDataPath, DiscoveryNode[] nodes, ActionListener listener); } protected final Logger logger; protected final String type; protected final ShardId shardId; + protected final String customDataPath; private final Lister, T> action; private final Map> cache = new HashMap<>(); private final Set nodesToIgnore = new HashSet<>(); private final AtomicLong round = new AtomicLong(); private boolean closed; - protected AsyncShardFetch(Logger logger, String type, ShardId shardId, Lister, T> action) { + protected AsyncShardFetch(Logger logger, String type, ShardId shardId, String customDataPath, + Lister, T> action) { this.logger = logger; this.type = type; - this.shardId = shardId; + this.shardId = Objects.requireNonNull(shardId); + this.customDataPath = Objects.requireNonNull(customDataPath); this.action = (Lister, T>) action; } @@ -285,7 +289,7 @@ private boolean hasAnyNodeFetching(Map> shardCache) { // visible for testing void asyncFetch(final DiscoveryNode[] nodes, long fetchingRound) { logger.trace("{} fetching [{}] from {}", shardId, type, nodes); - action.list(shardId, nodes, new ActionListener>() { + action.list(shardId, customDataPath, nodes, new ActionListener>() { @Override public void onResponse(BaseNodesResponse response) { processAsyncFetch(response.getNodes(), response.failures(), fetchingRound); diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index a83e3b4cfd782..a1319bcd59031 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodesResponse; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RerouteService; @@ -189,8 +190,9 @@ private boolean hasNewNodes(DiscoveryNodes nodes) { class InternalAsyncFetch extends AsyncShardFetch { - InternalAsyncFetch(Logger logger, String type, ShardId shardId, Lister, T> action) { - super(logger, type, shardId, action); + InternalAsyncFetch(Logger logger, String type, ShardId shardId, String customDataPath, + Lister, T> action) { + super(logger, type, shardId, customDataPath, action); } @Override @@ -217,7 +219,9 @@ protected AsyncShardFetch.FetchResult fetchData(ShardR Lister, NodeGatewayStartedShards> lister = this::listStartedShards; AsyncShardFetch fetch = asyncFetchStarted.computeIfAbsent(shard.shardId(), - shardId -> new InternalAsyncFetch<>(logger, "shard_started", shardId, lister)); + shardId -> new InternalAsyncFetch<>(logger, "shard_started", shardId, + IndexMetaData.INDEX_DATA_PATH_SETTING.get(allocation.metaData().index(shard.index()).getSettings()), + lister)); AsyncShardFetch.FetchResult shardState = fetch.fetchData(allocation.nodes(), allocation.getIgnoreNodes(shard.shardId())); @@ -227,9 +231,9 @@ protected AsyncShardFetch.FetchResult fetchData(ShardR return shardState; } - private void listStartedShards(ShardId shardId, DiscoveryNode[] nodes, + private void listStartedShards(ShardId shardId, String customDataPath, DiscoveryNode[] nodes, ActionListener> listener) { - var request = new TransportNodesListGatewayStartedShards.Request(shardId, nodes); + var request = new TransportNodesListGatewayStartedShards.Request(shardId, customDataPath, nodes); client.executeLocally(TransportNodesListGatewayStartedShards.TYPE, request, ActionListener.wrap(listener::onResponse, listener::onFailure)); } @@ -244,12 +248,12 @@ class InternalReplicaShardAllocator extends ReplicaShardAllocator { } @Override - protected AsyncShardFetch.FetchResult - fetchData(ShardRouting shard, RoutingAllocation allocation) { + protected AsyncShardFetch.FetchResult fetchData(ShardRouting shard, RoutingAllocation allocation) { // explicitely type lister, some IDEs (Eclipse) are not able to correctly infer the function type Lister, NodeStoreFilesMetaData> lister = this::listStoreFilesMetaData; AsyncShardFetch fetch = asyncFetchStore.computeIfAbsent(shard.shardId(), - shardId -> new InternalAsyncFetch<>(logger, "shard_store", shard.shardId(), lister)); + shardId -> new InternalAsyncFetch<>(logger, "shard_store", shard.shardId(), + IndexMetaData.INDEX_DATA_PATH_SETTING.get(allocation.metaData().index(shard.index()).getSettings()), lister)); AsyncShardFetch.FetchResult shardStores = fetch.fetchData(allocation.nodes(), allocation.getIgnoreNodes(shard.shardId())); if (shardStores.hasData()) { @@ -258,9 +262,9 @@ class InternalReplicaShardAllocator extends ReplicaShardAllocator { return shardStores; } - private void listStoreFilesMetaData(ShardId shardId, DiscoveryNode[] nodes, + private void listStoreFilesMetaData(ShardId shardId, String customDataPath, DiscoveryNode[] nodes, ActionListener> listener) { - var request = new TransportNodesListShardStoreMetaData.Request(shardId, nodes); + var request = new TransportNodesListShardStoreMetaData.Request(shardId, customDataPath, nodes); client.executeLocally(TransportNodesListShardStoreMetaData.TYPE, request, ActionListener.wrap(listener::onResponse, listener::onFailure)); } diff --git a/server/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java b/server/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java index ca68dfc9c1ccc..a99ba6ab2b323 100644 --- a/server/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/server/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; @@ -33,6 +34,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -109,26 +111,24 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request, Task task) ShardStateMetaData shardStateMetaData = ShardStateMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, nodeEnv.availableShardPaths(request.shardId)); if (shardStateMetaData != null) { - IndexMetaData metaData = clusterService.state().metaData().index(shardId.getIndex()); - if (metaData == null) { - // we may send this requests while processing the cluster state that recovered the index - // sometimes the request comes in before the local node processed that cluster state - // in such cases we can load it from disk - metaData = IndexMetaData.FORMAT.loadLatestState(logger, namedXContentRegistry, - nodeEnv.indexPaths(shardId.getIndex())); - } - if (metaData == null) { - ElasticsearchException e = new ElasticsearchException("failed to find local IndexMetaData"); - e.setShard(request.shardId); - throw e; - } - if (indicesService.getShardOrNull(shardId) == null) { + final String customDataPath; + if (request.getCustomDataPath() != null) { + customDataPath = request.getCustomDataPath(); + } else { + // TODO: Fallback for BWC with older ES versions. Remove once request.getCustomDataPath() always returns non-null + final IndexMetaData metaData = clusterService.state().metaData().index(shardId.getIndex()); + if (metaData != null) { + customDataPath = new IndexSettings(metaData, settings).customDataPath(); + } else { + logger.trace("{} node doesn't have meta data for the requests index", shardId); + throw new ElasticsearchException("node doesn't have meta data for index " + shardId.getIndex()); + } + } // we don't have an open shard on the store, validate the files on disk are openable ShardPath shardPath = null; try { - IndexSettings indexSettings = new IndexSettings(metaData, settings); - shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, indexSettings); + shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, customDataPath); if (shardPath == null) { throw new IllegalStateException(shardId + " no shard path found"); } @@ -162,27 +162,47 @@ protected NodeGatewayStartedShards nodeOperation(NodeRequest request, Task task) public static class Request extends BaseNodesRequest { - private ShardId shardId; + private final ShardId shardId; + @Nullable + private final String customDataPath; public Request(StreamInput in) throws IOException { super(in); shardId = new ShardId(in); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + customDataPath = in.readString(); + } else { + customDataPath = null; + } } - public Request(ShardId shardId, DiscoveryNode[] nodes) { + public Request(ShardId shardId, String customDataPath, DiscoveryNode[] nodes) { super(nodes); - this.shardId = shardId; + this.shardId = Objects.requireNonNull(shardId); + this.customDataPath = Objects.requireNonNull(customDataPath); } - public ShardId shardId() { - return this.shardId; + return shardId; + } + + /** + * Returns the custom data path that is used to look up information for this shard. + * Returns an empty string if no custom data path is used for this index. + * Returns null if custom data path information is not available (due to BWC). + */ + @Nullable + public String getCustomDataPath() { + return customDataPath; } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); shardId.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeString(customDataPath); + } } } @@ -211,33 +231,55 @@ protected void writeNodesTo(StreamOutput out, List nod public static class NodeRequest extends BaseNodeRequest { - private ShardId shardId; + private final ShardId shardId; + @Nullable + private final String customDataPath; public NodeRequest(StreamInput in) throws IOException { super(in); shardId = new ShardId(in); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + customDataPath = in.readString(); + } else { + customDataPath = null; + } } public NodeRequest(Request request) { - this.shardId = request.shardId(); + this.shardId = Objects.requireNonNull(request.shardId()); + this.customDataPath = Objects.requireNonNull(request.getCustomDataPath()); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); shardId.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + assert customDataPath != null; + out.writeString(customDataPath); + } } public ShardId getShardId() { return shardId; } + + /** + * Returns the custom data path that is used to look up information for this shard. + * Returns an empty string if no custom data path is used for this index. + * Returns null if custom data path information is not available (due to BWC). + */ + @Nullable + public String getCustomDataPath() { + return customDataPath; + } } public static class NodeGatewayStartedShards extends BaseNodeResponse { - private String allocationId = null; - private boolean primary = false; - private Exception storeException = null; + private final String allocationId; + private final boolean primary; + private final Exception storeException; public NodeGatewayStartedShards(StreamInput in) throws IOException { super(in); @@ -245,6 +287,8 @@ public NodeGatewayStartedShards(StreamInput in) throws IOException { primary = in.readBoolean(); if (in.readBoolean()) { storeException = in.readException(); + } else { + storeException = null; } } diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 47c3bfb35fee7..481ad3f5a7f6f 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -368,12 +368,12 @@ public synchronized IndexShard createShard( eventListener.beforeIndexShardCreated(shardId, indexSettings); ShardPath path; try { - path = ShardPath.loadShardPath(logger, nodeEnv, shardId, this.indexSettings); + path = ShardPath.loadShardPath(logger, nodeEnv, shardId, this.indexSettings.customDataPath()); } catch (IllegalStateException ex) { logger.warn("{} failed to load shard path, trying to remove leftover", shardId); try { ShardPath.deleteLeftoverShardDirectory(logger, nodeEnv, lock, this.indexSettings); - path = ShardPath.loadShardPath(logger, nodeEnv, shardId, this.indexSettings); + path = ShardPath.loadShardPath(logger, nodeEnv, shardId, this.indexSettings.customDataPath()); } catch (Exception inner) { ex.addSuppressed(inner); throw ex; diff --git a/server/src/main/java/org/elasticsearch/index/IndexSettings.java b/server/src/main/java/org/elasticsearch/index/IndexSettings.java index 99076f812b96e..cce17b8f441e7 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexSettings.java +++ b/server/src/main/java/org/elasticsearch/index/IndexSettings.java @@ -19,6 +19,7 @@ package org.elasticsearch.index; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.util.Strings; import org.apache.lucene.index.MergePolicy; import org.elasticsearch.Version; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -631,14 +632,14 @@ public String getUUID() { * Returns true if the index has a custom data path */ public boolean hasCustomDataPath() { - return customDataPath() != null; + return Strings.isNotEmpty(customDataPath()); } /** - * Returns the customDataPath for this index, if configured. null o.w. + * Returns the customDataPath for this index, if configured. "" o.w. */ public String customDataPath() { - return settings.get(IndexMetaData.SETTING_DATA_PATH); + return IndexMetaData.INDEX_DATA_PATH_SETTING.get(settings); } /** diff --git a/server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java b/server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java index 674124637113f..b7a178972fa72 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java +++ b/server/src/main/java/org/elasticsearch/index/shard/RemoveCorruptedShardDataCommand.java @@ -192,7 +192,7 @@ protected void findAndProcessShardPath(OptionSet options, Environment environmen if (Files.exists(shardPathLocation) == false) { continue; } - final ShardPath shardPath = ShardPath.loadShardPath(logger, shId, indexSettings, + final ShardPath shardPath = ShardPath.loadShardPath(logger, shId, indexSettings.customDataPath(), new Path[]{shardPathLocation}, nodePath.path); if (shardPath != null) { consumer.accept(shardPath); diff --git a/server/src/main/java/org/elasticsearch/index/shard/ShardPath.java b/server/src/main/java/org/elasticsearch/index/shard/ShardPath.java index ac865704d51bb..aab86d64ba2b6 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/ShardPath.java +++ b/server/src/main/java/org/elasticsearch/index/shard/ShardPath.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.shard; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.util.Strings; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.xcontent.NamedXContentRegistry; @@ -114,13 +115,13 @@ public boolean isCustomDataPath() { /** * This method walks through the nodes shard paths to find the data and state path for the given shard. If multiple * directories with a valid shard state exist the one with the highest version will be used. - * Note: this method resolves custom data locations for the shard. + * Note: this method resolves custom data locations for the shard if such a custom data path is provided. */ public static ShardPath loadShardPath(Logger logger, NodeEnvironment env, - ShardId shardId, IndexSettings indexSettings) throws IOException { + ShardId shardId, String customDataPath) throws IOException { final Path[] paths = env.availableShardPaths(shardId); final Path sharedDataPath = env.sharedDataPath(); - return loadShardPath(logger, shardId, indexSettings, paths, sharedDataPath); + return loadShardPath(logger, shardId, customDataPath, paths, sharedDataPath); } /** @@ -128,9 +129,9 @@ public static ShardPath loadShardPath(Logger logger, NodeEnvironment env, * directories with a valid shard state exist the one with the highest version will be used. * Note: this method resolves custom data locations for the shard. */ - public static ShardPath loadShardPath(Logger logger, ShardId shardId, IndexSettings indexSettings, Path[] availableShardPaths, + public static ShardPath loadShardPath(Logger logger, ShardId shardId, String customDataPath, Path[] availableShardPaths, Path sharedDataPath) throws IOException { - final String indexUUID = indexSettings.getUUID(); + final String indexUUID = shardId.getIndex().getUUID(); Path loadedPath = null; for (Path path : availableShardPaths) { // EMPTY is safe here because we never call namedObject @@ -156,13 +157,14 @@ public static ShardPath loadShardPath(Logger logger, ShardId shardId, IndexSetti } else { final Path dataPath; final Path statePath = loadedPath; - if (indexSettings.hasCustomDataPath()) { - dataPath = NodeEnvironment.resolveCustomLocation(indexSettings, shardId, sharedDataPath); + final boolean hasCustomDataPath = Strings.isNotEmpty(customDataPath); + if (hasCustomDataPath) { + dataPath = NodeEnvironment.resolveCustomLocation(customDataPath, shardId, sharedDataPath); } else { dataPath = statePath; } logger.debug("{} loaded data path [{}], state path [{}]", shardId, dataPath, statePath); - return new ShardPath(indexSettings.hasCustomDataPath(), dataPath, statePath, shardId); + return new ShardPath(hasCustomDataPath, dataPath, statePath, shardId); } } @@ -195,7 +197,7 @@ public static ShardPath selectNewPathForShard(NodeEnvironment env, ShardId shard final Path statePath; if (indexSettings.hasCustomDataPath()) { - dataPath = env.resolveCustomLocation(indexSettings, shardId); + dataPath = env.resolveCustomLocation(indexSettings.customDataPath(), shardId); statePath = env.nodePaths()[0].resolve(shardId); } else { BigInteger totFreeSpace = BigInteger.ZERO; diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 4810f9f00e8e5..deba07bc76139 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -971,7 +971,7 @@ public ShardDeletionCheckResult canDeleteShardContent(ShardId shardId, IndexSett } else if (indexSettings.hasCustomDataPath()) { // lets see if it's on a custom path (return false if the shared doesn't exist) // we don't need to delete anything that is not there - return Files.exists(nodeEnv.resolveCustomLocation(indexSettings, shardId)) ? + return Files.exists(nodeEnv.resolveCustomLocation(indexSettings.customDataPath(), shardId)) ? ShardDeletionCheckResult.FOLDER_FOUND_CAN_DELETE : ShardDeletionCheckResult.NO_FOLDER_FOUND; } else { diff --git a/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java index 97deeefa74da3..1e4ef6781a761 100644 --- a/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java +++ b/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java @@ -34,13 +34,13 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; @@ -60,6 +60,7 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.concurrent.TimeUnit; public class TransportNodesListShardStoreMetaData extends TransportNodesAction { - private ShardId shardId; + private final ShardId shardId; + @Nullable + private final String customDataPath; public Request(StreamInput in) throws IOException { super(in); shardId = new ShardId(in); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + customDataPath = in.readString(); + } else { + customDataPath = null; + } } - public Request(ShardId shardId, DiscoveryNode[] nodes) { + public Request(ShardId shardId, String customDataPath, DiscoveryNode[] nodes) { super(nodes); - this.shardId = shardId; + this.shardId = Objects.requireNonNull(shardId); + this.customDataPath = Objects.requireNonNull(customDataPath); + } + + public ShardId shardId() { + return shardId; + } + + /** + * Returns the custom data path that is used to look up information for this shard. + * Returns an empty string if no custom data path is used for this index. + * Returns null if custom data path information is not available (due to BWC). + */ + @Nullable + public String getCustomDataPath() { + return customDataPath; } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); shardId.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeString(customDataPath); + } } } @@ -302,21 +329,47 @@ protected void writeNodesTo(StreamOutput out, List nodes public static class NodeRequest extends BaseNodeRequest { - private ShardId shardId; + private final ShardId shardId; + @Nullable + private final String customDataPath; public NodeRequest(StreamInput in) throws IOException { super(in); shardId = new ShardId(in); + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + customDataPath = in.readString(); + } else { + customDataPath = null; + } } - NodeRequest(TransportNodesListShardStoreMetaData.Request request) { - this.shardId = request.shardId; + public NodeRequest(Request request) { + this.shardId = Objects.requireNonNull(request.shardId()); + this.customDataPath = Objects.requireNonNull(request.getCustomDataPath()); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); shardId.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + assert customDataPath != null; + out.writeString(customDataPath); + } + } + + public ShardId getShardId() { + return shardId; + } + + /** + * Returns the custom data path that is used to look up information for this shard. + * Returns an empty string if no custom data path is used for this index. + * Returns null if custom data path information is not available (due to BWC). + */ + @Nullable + public String getCustomDataPath() { + return customDataPath; } } diff --git a/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java b/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java index 5fddb2ee4c38d..19100343d2b15 100644 --- a/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java +++ b/server/src/test/java/org/elasticsearch/env/NodeEnvironmentTests.java @@ -19,14 +19,13 @@ package org.elasticsearch.env; import org.apache.lucene.index.SegmentInfos; -import org.elasticsearch.common.util.set.Sets; -import org.elasticsearch.core.internal.io.IOUtils; import org.apache.lucene.util.LuceneTestCase; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.SuppressForbidden; import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.set.Sets; +import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.gateway.MetaDataStateFormat; import org.elasticsearch.index.Index; import org.elasticsearch.index.IndexSettings; @@ -357,19 +356,11 @@ public void testCustomDataPaths() throws Exception { String[] dataPaths = tmpPaths(); NodeEnvironment env = newNodeEnvironment(dataPaths, "/tmp", Settings.EMPTY); - final Settings indexSettings = Settings.builder().put(IndexMetaData.SETTING_INDEX_UUID, "myindexUUID").build(); - IndexSettings s1 = IndexSettingsModule.newIndexSettings("myindex", indexSettings); - IndexSettings s2 = IndexSettingsModule.newIndexSettings("myindex", Settings.builder() - .put(indexSettings) - .put(IndexMetaData.SETTING_DATA_PATH, "/tmp/foo").build()); Index index = new Index("myindex", "myindexUUID"); ShardId sid = new ShardId(index, 0); - assertFalse("no settings should mean no custom data path", s1.hasCustomDataPath()); - assertTrue("settings with path_data should have a custom data path", s2.hasCustomDataPath()); - assertThat(env.availableShardPaths(sid), equalTo(env.availableShardPaths(sid))); - assertThat(env.resolveCustomLocation(s2, sid).toAbsolutePath(), + assertThat(env.resolveCustomLocation("/tmp/foo", sid).toAbsolutePath(), equalTo(PathUtils.get("/tmp/foo/0/" + index.getUUID() + "/0").toAbsolutePath())); assertThat("shard paths with a custom data_path should contain only regular paths", @@ -379,10 +370,8 @@ public void testCustomDataPaths() throws Exception { assertThat("index paths uses the regular template", env.indexPaths(index), equalTo(stringsToPaths(dataPaths, "indices/" + index.getUUID()))); - IndexSettings s3 = new IndexSettings(s2.getIndexMetaData(), Settings.builder().build()); - assertThat(env.availableShardPaths(sid), equalTo(env.availableShardPaths(sid))); - assertThat(env.resolveCustomLocation(s3, sid).toAbsolutePath(), + assertThat(env.resolveCustomLocation("/tmp/foo", sid).toAbsolutePath(), equalTo(PathUtils.get("/tmp/foo/0/" + index.getUUID() + "/0").toAbsolutePath())); assertThat("shard paths with a custom data_path should contain only regular paths", diff --git a/server/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java b/server/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java index b4ce705735b7c..259c806bea06a 100644 --- a/server/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/AsyncShardFetchTests.java @@ -373,7 +373,7 @@ static class Entry { private AtomicInteger reroute = new AtomicInteger(); TestFetch(ThreadPool threadPool) { - super(LogManager.getLogger(TestFetch.class), "test", new ShardId("test", "_na_", 1), null); + super(LogManager.getLogger(TestFetch.class), "test", new ShardId("test", "_na_", 1), "", null); this.threadPool = threadPool; } diff --git a/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java b/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java index 126c7eeb21ca7..e3a754f0003db 100644 --- a/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java @@ -542,7 +542,9 @@ public void assertSyncIdsNotNull() { public void testStartedShardFoundIfStateNotYetProcessed() throws Exception { // nodes may need to report the shards they processed the initial recovered cluster state from the master final String nodeName = internalCluster().startNode(); - assertAcked(prepareCreate("test").setSettings(Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1))); + createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).build()); + final String customDataPath = IndexMetaData.INDEX_DATA_PATH_SETTING.get( + client().admin().indices().prepareGetSettings("test").get().getIndexToSettings().get("test")); final Index index = resolveIndex("test"); final ShardId shardId = new ShardId(index, 0); indexDoc("test", "1"); @@ -579,7 +581,7 @@ public Settings onNodeStopped(String nodeName) throws Exception { TransportNodesListGatewayStartedShards.NodesGatewayStartedShards response; response = ActionTestUtils.executeBlocking(internalCluster().getInstance(TransportNodesListGatewayStartedShards.class), - new TransportNodesListGatewayStartedShards.Request(shardId, new DiscoveryNode[]{node})); + new TransportNodesListGatewayStartedShards.Request(shardId, customDataPath, new DiscoveryNode[]{node})); assertThat(response.getNodes(), hasSize(1)); assertThat(response.getNodes().get(0).allocationId(), notNullValue()); diff --git a/server/src/test/java/org/elasticsearch/index/shard/ShardPathTests.java b/server/src/test/java/org/elasticsearch/index/shard/ShardPathTests.java index cbaae21476855..721e5435172a3 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/ShardPathTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/ShardPathTests.java @@ -18,15 +18,12 @@ */ package org.elasticsearch.index.shard; -import org.elasticsearch.Version; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.routing.AllocationId; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.Index; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.IndexSettingsModule; import java.io.IOException; import java.nio.file.Path; @@ -37,16 +34,12 @@ public class ShardPathTests extends ESTestCase { public void testLoadShardPath() throws IOException { try (NodeEnvironment env = newNodeEnvironment(Settings.builder().build())) { - Settings.Builder builder = Settings.builder().put(IndexMetaData.SETTING_INDEX_UUID, "0xDEADBEEF") - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT); - Settings settings = builder.build(); ShardId shardId = new ShardId("foo", "0xDEADBEEF", 0); Path[] paths = env.availableShardPaths(shardId); Path path = randomFrom(paths); ShardStateMetaData.FORMAT.writeAndCleanup( new ShardStateMetaData(true, "0xDEADBEEF", AllocationId.newInitializing()), path); - ShardPath shardPath = - ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings)); + ShardPath shardPath = ShardPath.loadShardPath(logger, env, shardId, ""); assertEquals(path, shardPath.getDataPath()); assertEquals("0xDEADBEEF", shardPath.getShardId().getIndex().getUUID()); assertEquals("foo", shardPath.getShardId().getIndexName()); @@ -58,32 +51,24 @@ public void testLoadShardPath() throws IOException { public void testFailLoadShardPathOnMultiState() throws IOException { try (NodeEnvironment env = newNodeEnvironment(Settings.builder().build())) { final String indexUUID = "0xDEADBEEF"; - Settings.Builder builder = Settings.builder().put(IndexMetaData.SETTING_INDEX_UUID, indexUUID) - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT); - Settings settings = builder.build(); ShardId shardId = new ShardId("foo", indexUUID, 0); Path[] paths = env.availableShardPaths(shardId); assumeTrue("This test tests multi data.path but we only got one", paths.length > 1); ShardStateMetaData.FORMAT.writeAndCleanup( new ShardStateMetaData(true, indexUUID, AllocationId.newInitializing()), paths); - Exception e = expectThrows(IllegalStateException.class, () -> - ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings))); + Exception e = expectThrows(IllegalStateException.class, () -> ShardPath.loadShardPath(logger, env, shardId, "")); assertThat(e.getMessage(), containsString("more than one shard state found")); } } public void testFailLoadShardPathIndexUUIDMissmatch() throws IOException { try (NodeEnvironment env = newNodeEnvironment(Settings.builder().build())) { - Settings.Builder builder = Settings.builder().put(IndexMetaData.SETTING_INDEX_UUID, "foobar") - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT); - Settings settings = builder.build(); ShardId shardId = new ShardId("foo", "foobar", 0); Path[] paths = env.availableShardPaths(shardId); Path path = randomFrom(paths); ShardStateMetaData.FORMAT.writeAndCleanup( new ShardStateMetaData(true, "0xDEADBEEF", AllocationId.newInitializing()), path); - Exception e = expectThrows(IllegalStateException.class, () -> - ShardPath.loadShardPath(logger, env, shardId, IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings))); + Exception e = expectThrows(IllegalStateException.class, () -> ShardPath.loadShardPath(logger, env, shardId, "")); assertThat(e.getMessage(), containsString("expected: foobar on shard path")); } } @@ -107,22 +92,19 @@ public void testValidCtor() { public void testGetRootPaths() throws IOException { boolean useCustomDataPath = randomBoolean(); - final Settings indexSettings; final Settings nodeSettings; final String indexUUID = "0xDEADBEEF"; - Settings.Builder indexSettingsBuilder = Settings.builder() - .put(IndexMetaData.SETTING_INDEX_UUID, indexUUID) - .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT); final Path customPath; + final String customDataPath; if (useCustomDataPath) { final Path path = createTempDir(); - indexSettings = indexSettingsBuilder.put(IndexMetaData.SETTING_DATA_PATH, "custom").build(); + customDataPath = "custom"; nodeSettings = Settings.builder().put(Environment.PATH_SHARED_DATA_SETTING.getKey(), path.toAbsolutePath().toAbsolutePath()) .build(); customPath = path.resolve("custom").resolve("0"); } else { customPath = null; - indexSettings = indexSettingsBuilder.build(); + customDataPath = ""; nodeSettings = Settings.EMPTY; } try (NodeEnvironment env = newNodeEnvironment(nodeSettings)) { @@ -131,8 +113,7 @@ public void testGetRootPaths() throws IOException { Path path = randomFrom(paths); ShardStateMetaData.FORMAT.writeAndCleanup( new ShardStateMetaData(true, indexUUID, AllocationId.newInitializing()), path); - ShardPath shardPath = ShardPath.loadShardPath(logger, env, shardId, - IndexSettingsModule.newIndexSettings(shardId.getIndex(), indexSettings, nodeSettings)); + ShardPath shardPath = ShardPath.loadShardPath(logger, env, shardId, customDataPath); boolean found = false; for (Path p : env.nodeDataPaths()) { if (p.equals(shardPath.getRootStatePath())) { diff --git a/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java b/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java index 1ed0538288d6b..bd35f0f1783ca 100644 --- a/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/indices/IndicesServiceTests.java @@ -248,7 +248,8 @@ public void testDeleteIndexStore() throws Exception { assertHitCount(client().prepareSearch("test").get(), 1); IndexMetaData secondMetaData = clusterService.state().metaData().index("test"); assertAcked(client().admin().indices().prepareClose("test")); - ShardPath path = ShardPath.loadShardPath(logger, getNodeEnvironment(), new ShardId(test.index(), 0), test.getIndexSettings()); + ShardPath path = ShardPath.loadShardPath(logger, getNodeEnvironment(), new ShardId(test.index(), 0), + test.getIndexSettings().customDataPath()); assertTrue(path.exists()); try { @@ -281,7 +282,8 @@ public void testPendingTasks() throws Exception { assertTrue(indexShard.routingEntry().started()); final ShardPath shardPath = indexShard.shardPath(); - assertEquals(ShardPath.loadShardPath(logger, getNodeEnvironment(), indexShard.shardId(), indexSettings), shardPath); + assertEquals(ShardPath.loadShardPath(logger, getNodeEnvironment(), indexShard.shardId(), indexSettings.customDataPath()), + shardPath); final IndicesService indicesService = getIndicesService(); expectThrows(ShardLockObtainFailedException.class, () ->