Skip to content

Omit loading IndexMetaData when inspecting shards #50214

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterShardHealth;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand All @@ -43,6 +44,7 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.util.concurrent.CountDown;
Expand Down Expand Up @@ -100,7 +102,7 @@ protected void masterOperation(Task task, IndicesShardStoresRequest request, Clu
final RoutingTable routingTables = state.routingTable();
final RoutingNodes routingNodes = state.getRoutingNodes();
final String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(state, request);
final Set<ShardId> shardIdsToFetch = new HashSet<>();
final Set<Tuple<ShardId, String>> shardsToFetch = new HashSet<>();

logger.trace("using cluster state version [{}] to determine shards", state.version());
// collect relevant shard ids of the requested indices for fetching store infos
Expand All @@ -109,11 +111,12 @@ protected void masterOperation(Task task, IndicesShardStoresRequest request, Clu
if (indexShardRoutingTables == null) {
continue;
}
final String customDataPath = IndexMetaData.INDEX_DATA_PATH_SETTING.get(state.metaData().index(index).getSettings());
for (IndexShardRoutingTable routing : indexShardRoutingTables) {
final int shardId = routing.shardId().id();
ClusterShardHealth shardHealth = new ClusterShardHealth(shardId, routing);
if (request.shardStatuses().contains(shardHealth.getStatus())) {
shardIdsToFetch.add(routing.shardId());
shardsToFetch.add(Tuple.tuple(routing.shardId(), customDataPath));
}
}
}
Expand All @@ -123,7 +126,7 @@ protected void masterOperation(Task task, IndicesShardStoresRequest request, Clu
// we could fetch all shard store info from every node once (nNodes requests)
// we have to implement a TransportNodesAction instead of using TransportNodesListGatewayStartedShards
// for fetching shard stores info, that operates on a list of shards instead of a single shard
new AsyncShardStoresInfoFetches(state.nodes(), routingNodes, shardIdsToFetch, listener).start();
new AsyncShardStoresInfoFetches(state.nodes(), routingNodes, shardsToFetch, listener).start();
}

@Override
Expand All @@ -135,46 +138,46 @@ protected ClusterBlockException checkBlock(IndicesShardStoresRequest request, Cl
private class AsyncShardStoresInfoFetches {
private final DiscoveryNodes nodes;
private final RoutingNodes routingNodes;
private final Set<ShardId> shardIds;
private final Set<Tuple<ShardId, String>> shards;
private final ActionListener<IndicesShardStoresResponse> listener;
private CountDown expectedOps;
private final Queue<InternalAsyncFetch.Response> fetchResponses;

AsyncShardStoresInfoFetches(DiscoveryNodes nodes, RoutingNodes routingNodes, Set<ShardId> shardIds,
AsyncShardStoresInfoFetches(DiscoveryNodes nodes, RoutingNodes routingNodes, Set<Tuple<ShardId, String>> shards,
ActionListener<IndicesShardStoresResponse> listener) {
this.nodes = nodes;
this.routingNodes = routingNodes;
this.shardIds = shardIds;
this.shards = shards;
this.listener = listener;
this.fetchResponses = new ConcurrentLinkedQueue<>();
this.expectedOps = new CountDown(shardIds.size());
this.expectedOps = new CountDown(shards.size());
}

void start() {
if (shardIds.isEmpty()) {
if (shards.isEmpty()) {
listener.onResponse(new IndicesShardStoresResponse());
} else {
// explicitely type lister, some IDEs (Eclipse) are not able to correctly infer the function type
Lister<BaseNodesResponse<NodeGatewayStartedShards>, NodeGatewayStartedShards> lister = this::listStartedShards;
for (ShardId shardId : shardIds) {
InternalAsyncFetch fetch = new InternalAsyncFetch(logger, "shard_stores", shardId, lister);
for (Tuple<ShardId, String> shard : shards) {
InternalAsyncFetch fetch = new InternalAsyncFetch(logger, "shard_stores", shard.v1(), shard.v2(), lister);
fetch.fetchData(nodes, Collections.<String>emptySet());
}
}
}

private void listStartedShards(ShardId shardId, DiscoveryNode[] nodes,
private void listStartedShards(ShardId shardId, String customDataPath, DiscoveryNode[] nodes,
ActionListener<BaseNodesResponse<NodeGatewayStartedShards>> listener) {
var request = new TransportNodesListGatewayStartedShards.Request(shardId, nodes);
var request = new TransportNodesListGatewayStartedShards.Request(shardId, customDataPath, nodes);
client.executeLocally(TransportNodesListGatewayStartedShards.TYPE, request,
ActionListener.wrap(listener::onResponse, listener::onFailure));
}

private class InternalAsyncFetch extends AsyncShardFetch<NodeGatewayStartedShards> {

InternalAsyncFetch(Logger logger, String type, ShardId shardId,
InternalAsyncFetch(Logger logger, String type, ShardId shardId, String customDataPath,
Lister<? extends BaseNodesResponse<NodeGatewayStartedShards>, NodeGatewayStartedShards> action) {
super(logger, type, shardId, action);
super(logger, type, shardId, customDataPath, action);
}

@Override
Expand Down
39 changes: 18 additions & 21 deletions server/src/main/java/org/elasticsearch/env/NodeEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Strings;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
Expand Down Expand Up @@ -610,7 +611,7 @@ public void deleteShardDirectoryUnderLock(ShardLock lock, IndexSettings indexSet
acquireFSLockForPaths(indexSettings, paths);
IOUtils.rm(paths);
if (indexSettings.hasCustomDataPath()) {
Path customLocation = resolveCustomLocation(indexSettings, shardId);
Path customLocation = resolveCustomLocation(indexSettings.customDataPath(), shardId);
logger.trace("acquiring lock for {}, custom path: [{}]", shardId, customLocation);
acquireFSLockForPaths(indexSettings, customLocation);
logger.trace("deleting custom shard {} directory [{}]", shardId, customLocation);
Expand Down Expand Up @@ -687,7 +688,7 @@ public void deleteIndexDirectoryUnderLock(Index index, IndexSettings indexSettin
logger.trace("deleting index {} directory, paths({}): [{}]", index, indexPaths.length, indexPaths);
IOUtils.rm(indexPaths);
if (indexSettings.hasCustomDataPath()) {
Path customLocation = resolveIndexCustomLocation(indexSettings);
Path customLocation = resolveIndexCustomLocation(indexSettings.customDataPath(), index.getUUID());
logger.trace("deleting custom index {} directory [{}]", index, customLocation);
IOUtils.rm(customLocation);
}
Expand Down Expand Up @@ -933,7 +934,7 @@ public Path[] indexPaths(Index index) {
* returned paths. The returned array may contain paths to non-existing directories.
*
* @see IndexSettings#hasCustomDataPath()
* @see #resolveCustomLocation(IndexSettings, ShardId)
* @see #resolveCustomLocation(String, ShardId)
*
*/
public Path[] availableShardPaths(ShardId shardId) {
Expand Down Expand Up @@ -1233,17 +1234,12 @@ private static boolean isIndexMetaDataPath(Path path) {

/**
* Resolve the custom path for a index's shard.
* Uses the {@code IndexMetaData.SETTING_DATA_PATH} setting to determine
* the root path for the index.
*
* @param indexSettings settings for the index
*/
public static Path resolveBaseCustomLocation(IndexSettings indexSettings, Path sharedDataPath) {
String customDataDir = indexSettings.customDataPath();
if (customDataDir != null) {
public static Path resolveBaseCustomLocation(String customDataPath, Path sharedDataPath) {
if (Strings.isNotEmpty(customDataPath)) {
// This assert is because this should be caught by MetaDataCreateIndexService
assert sharedDataPath != null;
return sharedDataPath.resolve(customDataDir).resolve("0");
return sharedDataPath.resolve(customDataPath).resolve("0");
} else {
throw new IllegalArgumentException("no custom " + IndexMetaData.SETTING_DATA_PATH + " setting available");
}
Expand All @@ -1254,30 +1250,31 @@ public static Path resolveBaseCustomLocation(IndexSettings indexSettings, Path s
* Uses the {@code IndexMetaData.SETTING_DATA_PATH} setting to determine
* the root path for the index.
*
* @param indexSettings settings for the index
* @param customDataPath the custom data path
*/
private Path resolveIndexCustomLocation(IndexSettings indexSettings) {
return resolveIndexCustomLocation(indexSettings, sharedDataPath);
private Path resolveIndexCustomLocation(String customDataPath, String indexUUID) {
return resolveIndexCustomLocation(customDataPath, indexUUID, sharedDataPath);
}

private static Path resolveIndexCustomLocation(IndexSettings indexSettings, Path sharedDataPath) {
return resolveBaseCustomLocation(indexSettings, sharedDataPath).resolve(indexSettings.getUUID());
private static Path resolveIndexCustomLocation(String customDataPath, String indexUUID, Path sharedDataPath) {
return resolveBaseCustomLocation(customDataPath, sharedDataPath).resolve(indexUUID);
}

/**
* Resolve the custom path for a index's shard.
* Uses the {@code IndexMetaData.SETTING_DATA_PATH} setting to determine
* the root path for the index.
*
* @param indexSettings settings for the index
* @param customDataPath the custom data path
* @param shardId shard to resolve the path to
*/
public Path resolveCustomLocation(IndexSettings indexSettings, final ShardId shardId) {
return resolveCustomLocation(indexSettings, shardId, sharedDataPath);
public Path resolveCustomLocation(String customDataPath, final ShardId shardId) {
return resolveCustomLocation(customDataPath, shardId, sharedDataPath);
}

public static Path resolveCustomLocation(IndexSettings indexSettings, final ShardId shardId, Path sharedDataPath) {
return resolveIndexCustomLocation(indexSettings, sharedDataPath).resolve(Integer.toString(shardId.id()));
public static Path resolveCustomLocation(String customDataPath, final ShardId shardId, Path sharedDataPath) {
return resolveIndexCustomLocation(customDataPath, shardId.getIndex().getUUID(),
sharedDataPath).resolve(Integer.toString(shardId.id()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -61,22 +62,25 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
* An action that lists the relevant shard data that needs to be fetched.
*/
public interface Lister<NodesResponse extends BaseNodesResponse<NodeResponse>, NodeResponse extends BaseNodeResponse> {
void list(ShardId shardId, DiscoveryNode[] nodes, ActionListener<NodesResponse> listener);
void list(ShardId shardId, @Nullable String customDataPath, DiscoveryNode[] nodes, ActionListener<NodesResponse> listener);
}

protected final Logger logger;
protected final String type;
protected final ShardId shardId;
protected final String customDataPath;
private final Lister<BaseNodesResponse<T>, T> action;
private final Map<String, NodeEntry<T>> cache = new HashMap<>();
private final Set<String> nodesToIgnore = new HashSet<>();
private final AtomicLong round = new AtomicLong();
private boolean closed;

protected AsyncShardFetch(Logger logger, String type, ShardId shardId, Lister<? extends BaseNodesResponse<T>, T> action) {
protected AsyncShardFetch(Logger logger, String type, ShardId shardId, String customDataPath,
Lister<? extends BaseNodesResponse<T>, T> action) {
this.logger = logger;
this.type = type;
this.shardId = shardId;
this.shardId = Objects.requireNonNull(shardId);
this.customDataPath = Objects.requireNonNull(customDataPath);
this.action = (Lister<BaseNodesResponse<T>, T>) action;
}

Expand Down Expand Up @@ -285,7 +289,7 @@ private boolean hasAnyNodeFetching(Map<String, NodeEntry<T>> shardCache) {
// visible for testing
void asyncFetch(final DiscoveryNode[] nodes, long fetchingRound) {
logger.trace("{} fetching [{}] from {}", shardId, type, nodes);
action.list(shardId, nodes, new ActionListener<BaseNodesResponse<T>>() {
action.list(shardId, customDataPath, nodes, new ActionListener<BaseNodesResponse<T>>() {
@Override
public void onResponse(BaseNodesResponse<T> response) {
processAsyncFetch(response.getNodes(), response.failures(), fetchingRound);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RerouteService;
Expand Down Expand Up @@ -189,8 +190,9 @@ private boolean hasNewNodes(DiscoveryNodes nodes) {

class InternalAsyncFetch<T extends BaseNodeResponse> extends AsyncShardFetch<T> {

InternalAsyncFetch(Logger logger, String type, ShardId shardId, Lister<? extends BaseNodesResponse<T>, T> action) {
super(logger, type, shardId, action);
InternalAsyncFetch(Logger logger, String type, ShardId shardId, String customDataPath,
Lister<? extends BaseNodesResponse<T>, T> action) {
super(logger, type, shardId, customDataPath, action);
}

@Override
Expand All @@ -217,7 +219,9 @@ protected AsyncShardFetch.FetchResult<NodeGatewayStartedShards> fetchData(ShardR
Lister<BaseNodesResponse<NodeGatewayStartedShards>, NodeGatewayStartedShards> lister = this::listStartedShards;
AsyncShardFetch<NodeGatewayStartedShards> fetch =
asyncFetchStarted.computeIfAbsent(shard.shardId(),
shardId -> new InternalAsyncFetch<>(logger, "shard_started", shardId, lister));
shardId -> new InternalAsyncFetch<>(logger, "shard_started", shardId,
IndexMetaData.INDEX_DATA_PATH_SETTING.get(allocation.metaData().index(shard.index()).getSettings()),
lister));
AsyncShardFetch.FetchResult<NodeGatewayStartedShards> shardState =
fetch.fetchData(allocation.nodes(), allocation.getIgnoreNodes(shard.shardId()));

Expand All @@ -227,9 +231,9 @@ protected AsyncShardFetch.FetchResult<NodeGatewayStartedShards> fetchData(ShardR
return shardState;
}

private void listStartedShards(ShardId shardId, DiscoveryNode[] nodes,
private void listStartedShards(ShardId shardId, String customDataPath, DiscoveryNode[] nodes,
ActionListener<BaseNodesResponse<NodeGatewayStartedShards>> listener) {
var request = new TransportNodesListGatewayStartedShards.Request(shardId, nodes);
var request = new TransportNodesListGatewayStartedShards.Request(shardId, customDataPath, nodes);
client.executeLocally(TransportNodesListGatewayStartedShards.TYPE, request,
ActionListener.wrap(listener::onResponse, listener::onFailure));
}
Expand All @@ -244,12 +248,12 @@ class InternalReplicaShardAllocator extends ReplicaShardAllocator {
}

@Override
protected AsyncShardFetch.FetchResult<NodeStoreFilesMetaData>
fetchData(ShardRouting shard, RoutingAllocation allocation) {
protected AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> fetchData(ShardRouting shard, RoutingAllocation allocation) {
// explicitely type lister, some IDEs (Eclipse) are not able to correctly infer the function type
Lister<BaseNodesResponse<NodeStoreFilesMetaData>, NodeStoreFilesMetaData> lister = this::listStoreFilesMetaData;
AsyncShardFetch<NodeStoreFilesMetaData> fetch = asyncFetchStore.computeIfAbsent(shard.shardId(),
shardId -> new InternalAsyncFetch<>(logger, "shard_store", shard.shardId(), lister));
shardId -> new InternalAsyncFetch<>(logger, "shard_store", shard.shardId(),
IndexMetaData.INDEX_DATA_PATH_SETTING.get(allocation.metaData().index(shard.index()).getSettings()), lister));
AsyncShardFetch.FetchResult<NodeStoreFilesMetaData> shardStores =
fetch.fetchData(allocation.nodes(), allocation.getIgnoreNodes(shard.shardId()));
if (shardStores.hasData()) {
Expand All @@ -258,9 +262,9 @@ class InternalReplicaShardAllocator extends ReplicaShardAllocator {
return shardStores;
}

private void listStoreFilesMetaData(ShardId shardId, DiscoveryNode[] nodes,
private void listStoreFilesMetaData(ShardId shardId, String customDataPath, DiscoveryNode[] nodes,
ActionListener<BaseNodesResponse<NodeStoreFilesMetaData>> listener) {
var request = new TransportNodesListShardStoreMetaData.Request(shardId, nodes);
var request = new TransportNodesListShardStoreMetaData.Request(shardId, customDataPath, nodes);
client.executeLocally(TransportNodesListShardStoreMetaData.TYPE, request,
ActionListener.wrap(listener::onResponse, listener::onFailure));
}
Expand Down
Loading