From f666fa70242dda37a15ab158dfba27beddf5abae Mon Sep 17 00:00:00 2001 From: jimczi Date: Tue, 1 Dec 2020 22:26:53 +0100 Subject: [PATCH 1/4] Adds a consistent shard index to ShardSearchRequest This change ensures that the shard index that is used to tiebreak documents with identical sort remains consistent between two requests that target the same shards. The index is now always computed from the natural order of the shards in the search request. This change also adds the consistent shard index to the ShardSearchRequest. That allows the slice builder to use this information to build more balanced slice query. Relates #56828 --- .../search/AbstractSearchAsyncAction.java | 37 +++++--- .../search/CanMatchPreFilterSearchPhase.java | 8 +- .../action/search/FetchSearchPhase.java | 2 +- .../SearchDfsQueryThenFetchAsyncAction.java | 7 +- .../action/search/SearchPhaseContext.java | 6 +- .../action/search/SearchPhaseController.java | 12 ++- .../SearchQueryThenFetchAsyncAction.java | 7 +- .../action/search/TransportSearchAction.java | 22 ++--- .../search/DefaultSearchContext.java | 2 +- .../search/internal/ShardSearchRequest.java | 62 +++++++------- .../search/slice/SliceBuilder.java | 59 ++----------- .../AbstractSearchAsyncActionTests.java | 9 +- .../CanMatchPreFilterSearchPhaseTests.java | 10 +-- .../action/search/MockSearchPhaseContext.java | 2 +- .../action/search/SearchAsyncActionTests.java | 5 -- .../SearchQueryThenFetchAsyncActionTests.java | 2 +- .../search/DefaultSearchContextTests.java | 1 - .../search/SearchServiceTests.java | 72 ++++++++-------- .../internal/ShardSearchRequestTests.java | 8 +- .../search/query/QuerySearchResultTests.java | 2 +- .../search/slice/SliceBuilderTests.java | 85 +++++-------------- .../index/engine/FrozenIndexTests.java | 24 +++--- 22 files changed, 175 insertions(+), 269 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index b580ba0704b64..cba3f48a32fe8 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; @@ -48,10 +49,9 @@ import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -83,7 +83,6 @@ abstract class AbstractSearchAsyncAction exten private final ClusterState clusterState; private final Map aliasFilter; private final Map concreteIndexBoosts; - private final Map> indexRoutings; private final SetOnce> shardFailures = new SetOnce<>(); private final Object shardFailuresMutex = new Object(); private final AtomicBoolean hasShardResponse = new AtomicBoolean(false); @@ -94,6 +93,7 @@ abstract class AbstractSearchAsyncAction exten protected final GroupShardsIterator toSkipShardsIts; protected final GroupShardsIterator shardsIts; + private final Map shardItIndexMap; private final int expectedTotalOps; private final AtomicInteger totalOps = new AtomicInteger(); private final int maxConcurrentRequestsPerNode; @@ -106,7 +106,6 @@ abstract class AbstractSearchAsyncAction exten AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService, BiFunction nodeIdToConnection, Map aliasFilter, Map concreteIndexBoosts, - Map> indexRoutings, Executor executor, SearchRequest request, ActionListener listener, GroupShardsIterator shardsIts, SearchTimeProvider timeProvider, ClusterState clusterState, @@ -124,6 +123,17 @@ abstract class AbstractSearchAsyncAction exten } this.toSkipShardsIts = new GroupShardsIterator<>(toSkipIterators); this.shardsIts = new GroupShardsIterator<>(iterators); + this.shardItIndexMap = new HashMap<>(); + + // we compute the shard index based on the natural order of the shards + // that participate in the search request. This means that this number is + // consistent between two requests that target the same shards. + List naturalOrder = new ArrayList<>(iterators); + CollectionUtil.timSort(naturalOrder); + for (int i = 0; i < naturalOrder.size(); i++) { + shardItIndexMap.put(naturalOrder.get(i), i); + } + // we need to add 1 for non active partition, since we count it in the total. This means for each shard in the iterator we sum up // it's number of active shards but use 1 as the default if no replica of a shard is active at this point. // on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result @@ -143,7 +153,6 @@ abstract class AbstractSearchAsyncAction exten this.clusterState = clusterState; this.concreteIndexBoosts = concreteIndexBoosts; this.aliasFilter = aliasFilter; - this.indexRoutings = indexRoutings; this.results = resultConsumer; this.clusters = clusters; } @@ -210,10 +219,13 @@ public final void run() { throw new SearchPhaseExecutionException(getName(), msg, null, ShardSearchFailure.EMPTY_ARRAY); } } - for (int index = 0; index < shardsIts.size(); index++) { - final SearchShardIterator shardRoutings = shardsIts.get(index); + + for (int i = 0; i < shardsIts.size(); i++) { + final SearchShardIterator shardRoutings = shardsIts.get(i); assert shardRoutings.skip() == false; - performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull()); + assert shardItIndexMap.containsKey(shardRoutings); + int shardIndex = shardItIndexMap.get(shardRoutings); + performPhaseOnShard(shardIndex, shardRoutings, shardRoutings.nextOrNull()); } } } @@ -651,15 +663,12 @@ public final void onFailure(Exception e) { } @Override - public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shardIt) { + public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shardIt, int shardIndex) { AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID()); assert filter != null; float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST); - String indexName = shardIt.shardId().getIndex().getName(); - final String[] routings = indexRoutings.getOrDefault(indexName, Collections.emptySet()) - .toArray(new String[0]); - ShardSearchRequest shardRequest = new ShardSearchRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(), - filter, indexBoost, timeProvider.getAbsoluteStartMillis(), shardIt.getClusterAlias(), routings, + ShardSearchRequest shardRequest = new ShardSearchRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), shardIndex, + getNumShards(), filter, indexBoost, timeProvider.getAbsoluteStartMillis(), shardIt.getClusterAlias(), shardIt.getSearchContextId(), shardIt.getSearchContextKeepAlive()); // if we already received a search result we can inform the shard that it // can return a null response if the request rewrites to match none rather diff --git a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index 663cb861cc047..a1dfc602fb916 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -36,7 +36,6 @@ import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.Executor; import java.util.function.BiFunction; import java.util.function.Function; @@ -63,14 +62,13 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction nodeIdToConnection, Map aliasFilter, Map concreteIndexBoosts, - Map> indexRoutings, Executor executor, SearchRequest request, ActionListener listener, GroupShardsIterator shardsIts, TransportSearchAction.SearchTimeProvider timeProvider, ClusterState clusterState, SearchTask task, Function, SearchPhase> phaseFactory, SearchResponse.Clusters clusters) { //We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests - super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings, + super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener, shardsIts, timeProvider, clusterState, task, new CanMatchSearchPhaseResults(shardsIts.size()), shardsIts.size(), clusters); this.phaseFactory = phaseFactory; @@ -86,7 +84,7 @@ public void addReleasable(Releasable releasable) { protected void executePhaseOnShard(SearchShardIterator shardIt, SearchShardTarget shard, SearchActionListener listener) { getSearchTransport().sendCanMatch(getConnection(shard.getClusterAlias(), shard.getNodeId()), - buildShardSearchRequest(shardIt), getTask(), listener); + buildShardSearchRequest(shardIt, listener.requestIndex), getTask(), listener); } @Override @@ -149,7 +147,7 @@ private static Comparator shardComparator(GroupShardsIterator[] minAndMaxes, SortOrder order) { final Comparator comparator = Comparator.comparing(index -> minAndMaxes[index], MinAndMax.getComparator(order)); - return comparator.thenComparing(index -> shardsIts.get(index).shardId()); + return comparator.thenComparing(index -> shardsIts.get(index)); } private static final class CanMatchSearchPhaseResults extends SearchPhaseResults { diff --git a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java index 55d40a023d592..94baf2a379e83 100644 --- a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java @@ -153,7 +153,7 @@ private void innerRun() throws Exception { ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getContextId(), i, entry, lastEmittedDocPerShard, searchShardTarget.getOriginalIndices(), queryResult.getShardSearchRequest(), queryResult.getRescoreDocIds()); - executeFetch(i, searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(), + executeFetch(queryResult.getShardIndex(), searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(), connection); } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index b53e635d9866a..12d81578e2b56 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -32,7 +32,6 @@ import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.Executor; import java.util.function.BiFunction; @@ -45,14 +44,14 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction SearchDfsQueryThenFetchAsyncAction(final Logger logger, final SearchTransportService searchTransportService, final BiFunction nodeIdToConnection, final Map aliasFilter, - final Map concreteIndexBoosts, final Map> indexRoutings, + final Map concreteIndexBoosts, final SearchPhaseController searchPhaseController, final Executor executor, final QueryPhaseResultConsumer queryPhaseResultConsumer, final SearchRequest request, final ActionListener listener, final GroupShardsIterator shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider, final ClusterState clusterState, final SearchTask task, SearchResponse.Clusters clusters) { - super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings, + super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener, shardsIts, timeProvider, clusterState, task, new ArraySearchPhaseResults<>(shardsIts.size()), request.getMaxConcurrentShardRequests(), clusters); @@ -68,7 +67,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction protected void executePhaseOnShard(final SearchShardIterator shardIt, final SearchShardTarget shard, final SearchActionListener listener) { getSearchTransport().sendExecuteDfs(getConnection(shard.getClusterAlias(), shard.getNodeId()), - buildShardSearchRequest(shardIt) , getTask(), listener); + buildShardSearchRequest(shardIt, listener.requestIndex) , getTask(), listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java index e56100dc5287f..93463921d7e99 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java @@ -115,8 +115,12 @@ default void sendReleaseSearchContext(ShardSearchContextId contextId, /** * Builds an request for the initial search phase. + * + * @param shardIt the target {@link SearchShardIterator} + * @param shardIndex the index of the shard that is used in the coordinator node to + * tiebreak results with identical sort values */ - ShardSearchRequest buildShardSearchRequest(SearchShardIterator shardIt); + ShardSearchRequest buildShardSearchRequest(SearchShardIterator shardIt, int shardIndex); /** * Processes the phase transition from on phase to another. This method handles all errors that happen during the initial run execution diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index a6c5964c9470c..8f962b0210edc 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -428,9 +428,8 @@ ReducedQueryPhase reducedQueryPhase(Collection quer throw new IllegalStateException(errorMsg); } validateMergeSortValueFormats(queryResults); - final QuerySearchResult firstResult = queryResults.stream().findFirst().get().queryResult(); - final boolean hasSuggest = firstResult.suggest() != null; - final boolean hasProfileResults = firstResult.hasProfileResults(); + final boolean hasSuggest = queryResults.stream().anyMatch(res -> res.queryResult().suggest() != null); + final boolean hasProfileResults = queryResults.stream().anyMatch(res -> res.queryResult().hasProfileResults()); // count the total (we use the query result provider here, since we might not get any hits (we scrolled past them)) final Map> groupedSuggestions = hasSuggest ? new HashMap<>() : Collections.emptyMap(); @@ -438,11 +437,16 @@ ReducedQueryPhase reducedQueryPhase(Collection quer : Collections.emptyMap(); int from = 0; int size = 0; + DocValueFormat[] sortValueFormats = null; for (SearchPhaseResult entry : queryResults) { QuerySearchResult result = entry.queryResult(); from = result.from(); // sorted queries can set the size to 0 if they have enough competitive hits. size = Math.max(result.size(), size); + if (result.sortValueFormats() != null) { + sortValueFormats = result.sortValueFormats(); + } + if (hasSuggest) { assert result.suggest() != null; for (Suggestion> suggestion : result.suggest()) { @@ -477,7 +481,7 @@ ReducedQueryPhase reducedQueryPhase(Collection quer final TotalHits totalHits = topDocsStats.getTotalHits(); return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.getMaxScore(), topDocsStats.timedOut, topDocsStats.terminatedEarly, reducedSuggest, aggregations, shardResults, sortedTopDocs, - firstResult.sortValueFormats(), numReducePhases, size, from, false); + sortValueFormats, numReducePhases, size, from, false); } private static InternalAggregations reduceAggs(InternalAggregation.ReduceContextBuilder aggReduceContextBuilder, diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 79f5e5ca9571e..2ed9d4762800a 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -33,7 +33,6 @@ import org.elasticsearch.transport.Transport; import java.util.Map; -import java.util.Set; import java.util.concurrent.Executor; import java.util.function.BiFunction; @@ -52,14 +51,14 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction nodeIdToConnection, final Map aliasFilter, - final Map concreteIndexBoosts, final Map> indexRoutings, + final Map concreteIndexBoosts, final SearchPhaseController searchPhaseController, final Executor executor, final QueryPhaseResultConsumer resultConsumer, final SearchRequest request, final ActionListener listener, final GroupShardsIterator shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider, ClusterState clusterState, SearchTask task, SearchResponse.Clusters clusters) { - super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings, + super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener, shardsIts, timeProvider, clusterState, task, resultConsumer, request.getMaxConcurrentShardRequests(), clusters); this.topDocsSize = getTopDocsSize(request); @@ -79,7 +78,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction listener) { - ShardSearchRequest request = rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt)); + ShardSearchRequest request = rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt, listener.requestIndex)); getSearchTransport().sendExecuteQuery(getConnection(shard.getClusterAlias(), shard.getNodeId()), request, getTask(), listener); } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index ead3a7d8ec1cc..176879ecede5c 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -235,11 +235,11 @@ public AbstractSearchAsyncAction asyncSearchAction( SearchTask task, SearchRequest searchRequest, Executor executor, GroupShardsIterator shardsIts, SearchTimeProvider timeProvider, BiFunction connectionLookup, ClusterState clusterState, Map aliasFilter, - Map concreteIndexBoosts, Map> indexRoutings, - ActionListener listener, boolean preFilter, ThreadPool threadPool, SearchResponse.Clusters clusters) { + Map concreteIndexBoosts, ActionListener listener, + boolean preFilter, ThreadPool threadPool, SearchResponse.Clusters clusters) { return new AbstractSearchAsyncAction<>( actionName, logger, searchTransportService, connectionLookup, aliasFilter, concreteIndexBoosts, - indexRoutings, executor, searchRequest, listener, shardsIts, timeProvider, clusterState, task, + executor, searchRequest, listener, shardsIts, timeProvider, clusterState, task, new ArraySearchPhaseResults<>(shardsIts.size()), 1, clusters) { @Override protected void executePhaseOnShard(SearchShardIterator shardIt, SearchShardTarget shard, @@ -609,13 +609,11 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea // of just for the _search api final List localShardIterators; final Map aliasFilter; - final Map> indexRoutings; final String[] concreteLocalIndices; if (searchContext != null) { assert searchRequest.pointInTimeBuilder() != null; aliasFilter = searchContext.aliasFilter(); - indexRoutings = Map.of(); concreteLocalIndices = localIndices == null ? new String[0] : localIndices.indices(); localShardIterators = getLocalLocalShardsIteratorFromPointInTime(clusterState, localIndices, searchRequest.getLocalClusterAlias(), searchContext, searchRequest.pointInTimeBuilder().getKeepAlive()); @@ -637,7 +635,6 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), localIndices)) .collect(Collectors.toList()); aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap); - indexRoutings = routingMap; } final GroupShardsIterator shardIterators = mergeShardsIterators(localShardIterators, remoteShardIterators); @@ -672,7 +669,7 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea localShardIterators.size() + remoteShardIterators.size()); searchAsyncActionProvider.asyncSearchAction( task, searchRequest, asyncSearchExecutor, shardIterators, timeProvider, connectionLookup, clusterState, - Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, indexRoutings, listener, + Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener, preFilterSearchShards, threadPool, clusters).start(); } @@ -746,8 +743,7 @@ AbstractSearchAsyncAction asyncSearchAction( SearchTask task, SearchRequest searchRequest, Executor executor, GroupShardsIterator shardIterators, SearchTimeProvider timeProvider, BiFunction connectionLookup, ClusterState clusterState, Map aliasFilter, Map concreteIndexBoosts, - Map> indexRoutings, ActionListener listener, boolean preFilter, - ThreadPool threadPool, SearchResponse.Clusters clusters); + ActionListener listener, boolean preFilter, ThreadPool threadPool, SearchResponse.Clusters clusters); } private AbstractSearchAsyncAction searchAsyncAction( @@ -760,14 +756,13 @@ private AbstractSearchAsyncAction searchAsyncAction ClusterState clusterState, Map aliasFilter, Map concreteIndexBoosts, - Map> indexRoutings, ActionListener listener, boolean preFilter, ThreadPool threadPool, SearchResponse.Clusters clusters) { if (preFilter) { return new CanMatchPreFilterSearchPhase(logger, searchTransportService, connectionLookup, - aliasFilter, concreteIndexBoosts, indexRoutings, executor, searchRequest, listener, shardIterators, + aliasFilter, concreteIndexBoosts, executor, searchRequest, listener, shardIterators, timeProvider, clusterState, task, (iter) -> { AbstractSearchAsyncAction action = searchAsyncAction( task, @@ -779,7 +774,6 @@ private AbstractSearchAsyncAction searchAsyncAction clusterState, aliasFilter, concreteIndexBoosts, - indexRoutings, listener, false, threadPool, @@ -799,12 +793,12 @@ public void run() { switch (searchRequest.searchType()) { case DFS_QUERY_THEN_FETCH: searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup, - aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, + aliasFilter, concreteIndexBoosts, searchPhaseController, executor, queryResultConsumer, searchRequest, listener, shardIterators, timeProvider, clusterState, task, clusters); break; case QUERY_THEN_FETCH: searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup, - aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, queryResultConsumer, + aliasFilter, concreteIndexBoosts, searchPhaseController, executor, queryResultConsumer, searchRequest, listener, shardIterators, timeProvider, clusterState, task, clusters); break; default: diff --git a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java index 56cfd7c6ec1c3..ebb964bb648ca 100644 --- a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java @@ -274,7 +274,7 @@ public Query buildFilteredQuery(Query query) { } if (sliceBuilder != null) { - Query slicedQuery = sliceBuilder.toFilter(clusterService, request, this.queryShardContext); + Query slicedQuery = sliceBuilder.toFilter(request, queryShardContext); if (slicedQuery instanceof MatchNoDocsQuery) { return slicedQuery; } else { diff --git a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java index af2e46a65dffb..aece612ebc750 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java @@ -72,6 +72,7 @@ public class ShardSearchRequest extends TransportRequest implements IndicesRequest { private final String clusterAlias; private final ShardId shardId; + private final int shardIndex; private final int numberOfShards; private final SearchType searchType; private final Scroll scroll; @@ -79,8 +80,6 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque private final Boolean requestCache; private final long nowInMillis; private final boolean allowPartialSearchResults; - private final String[] indexRoutings; - private final String preference; private final OriginalIndices originalIndices; private boolean canReturnNullResponseIfMatchNoDocs; @@ -95,29 +94,30 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque public ShardSearchRequest(OriginalIndices originalIndices, SearchRequest searchRequest, ShardId shardId, + int shardIndex, int numberOfShards, AliasFilter aliasFilter, float indexBoost, long nowInMillis, - @Nullable String clusterAlias, - String[] indexRoutings) { - this(originalIndices, searchRequest, shardId, numberOfShards, aliasFilter, - indexBoost, nowInMillis, clusterAlias, indexRoutings, null, null); + @Nullable String clusterAlias) { + this(originalIndices, searchRequest, shardId, shardIndex, numberOfShards, aliasFilter, + indexBoost, nowInMillis, clusterAlias, null, null); } public ShardSearchRequest(OriginalIndices originalIndices, SearchRequest searchRequest, ShardId shardId, + int shardIndex, int numberOfShards, AliasFilter aliasFilter, float indexBoost, long nowInMillis, @Nullable String clusterAlias, - String[] indexRoutings, ShardSearchContextId readerId, TimeValue keepAlive) { this(originalIndices, shardId, + shardIndex, numberOfShards, searchRequest.searchType(), searchRequest.source(), @@ -125,8 +125,6 @@ public ShardSearchRequest(OriginalIndices originalIndices, aliasFilter, indexBoost, searchRequest.allowPartialSearchResults(), - indexRoutings, - searchRequest.preference(), searchRequest.scroll(), nowInMillis, clusterAlias, @@ -140,12 +138,13 @@ public ShardSearchRequest(OriginalIndices originalIndices, public ShardSearchRequest(ShardId shardId, long nowInMillis, AliasFilter aliasFilter) { - this(OriginalIndices.NONE, shardId, -1, SearchType.QUERY_THEN_FETCH, null, null, - aliasFilter, 1.0f, false, Strings.EMPTY_ARRAY, null, null, nowInMillis, null, null, null); + this(OriginalIndices.NONE, shardId, -1, -1, SearchType.QUERY_THEN_FETCH, null, null, + aliasFilter, 1.0f, false, null, nowInMillis, null, null, null); } private ShardSearchRequest(OriginalIndices originalIndices, ShardId shardId, + int shardIndex, int numberOfShards, SearchType searchType, SearchSourceBuilder source, @@ -153,14 +152,13 @@ private ShardSearchRequest(OriginalIndices originalIndices, AliasFilter aliasFilter, float indexBoost, boolean allowPartialSearchResults, - String[] indexRoutings, - String preference, Scroll scroll, long nowInMillis, @Nullable String clusterAlias, ShardSearchContextId readerId, TimeValue keepAlive) { this.shardId = shardId; + this.shardIndex = shardIndex; this.numberOfShards = numberOfShards; this.searchType = searchType; this.source = source; @@ -168,8 +166,6 @@ private ShardSearchRequest(OriginalIndices originalIndices, this.aliasFilter = aliasFilter; this.indexBoost = indexBoost; this.allowPartialSearchResults = allowPartialSearchResults; - this.indexRoutings = indexRoutings; - this.preference = preference; this.scroll = scroll; this.nowInMillis = nowInMillis; this.clusterAlias = clusterAlias; @@ -183,6 +179,7 @@ public ShardSearchRequest(StreamInput in) throws IOException { super(in); shardId = new ShardId(in); searchType = SearchType.fromId(in.readByte()); + shardIndex = in.getVersion().onOrAfter(Version.V_8_0_0) ? in.readVInt() : -1; numberOfShards = in.readVInt(); scroll = in.readOptionalWriteable(Scroll::new); source = in.readOptionalWriteable(SearchSourceBuilder::new); @@ -200,8 +197,10 @@ public ShardSearchRequest(StreamInput in) throws IOException { requestCache = in.readOptionalBoolean(); clusterAlias = in.readOptionalString(); allowPartialSearchResults = in.readBoolean(); - indexRoutings = in.readStringArray(); - preference = in.readOptionalString(); + if (in.getVersion().before(Version.V_8_0_0)) { + in.readStringArray(); + in.readOptionalString(); + } if (in.getVersion().onOrAfter(Version.V_7_7_0)) { canReturnNullResponseIfMatchNoDocs = in.readBoolean(); bottomSortValues = in.readOptionalWriteable(SearchSortValuesAndFormats::new); @@ -219,6 +218,7 @@ public ShardSearchRequest(StreamInput in) throws IOException { public ShardSearchRequest(ShardSearchRequest clone) { this.shardId = clone.shardId; + this.shardIndex = clone.shardIndex; this.searchType = clone.searchType; this.numberOfShards = clone.numberOfShards; this.scroll = clone.scroll; @@ -229,8 +229,6 @@ public ShardSearchRequest(ShardSearchRequest clone) { this.requestCache = clone.requestCache; this.clusterAlias = clone.clusterAlias; this.allowPartialSearchResults = clone.allowPartialSearchResults; - this.indexRoutings = clone.indexRoutings; - this.preference = clone.preference; this.canReturnNullResponseIfMatchNoDocs = clone.canReturnNullResponseIfMatchNoDocs; this.bottomSortValues = clone.bottomSortValues; this.originalIndices = clone.originalIndices; @@ -248,7 +246,10 @@ public void writeTo(StreamOutput out) throws IOException { protected final void innerWriteTo(StreamOutput out, boolean asKey) throws IOException { shardId.writeTo(out); out.writeByte(searchType.id()); - if (!asKey) { + if (asKey == false) { + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeVInt(shardIndex); + } out.writeVInt(numberOfShards); } out.writeOptionalWriteable(scroll); @@ -265,9 +266,9 @@ protected final void innerWriteTo(StreamOutput out, boolean asKey) throws IOExce out.writeOptionalBoolean(requestCache); out.writeOptionalString(clusterAlias); out.writeBoolean(allowPartialSearchResults); - if (asKey == false) { - out.writeStringArray(indexRoutings); - out.writeOptionalString(preference); + if (asKey == false && out.getVersion().before(Version.V_8_0_0)) { + out.writeStringArray(Strings.EMPTY_ARRAY); + out.writeOptionalString(null); } if (asKey == false && out.getVersion().onOrAfter(Version.V_7_7_0)) { out.writeBoolean(canReturnNullResponseIfMatchNoDocs); @@ -313,6 +314,13 @@ public void source(SearchSourceBuilder source) { this.source = source; } + /** + * Returns the index of the shard that is used to tiebreak documents with identical sort values. + */ + public int shardIndex() { + return shardIndex; + } + public int numberOfShards() { return numberOfShards; } @@ -341,14 +349,6 @@ public Scroll scroll() { return scroll; } - public String[] indexRoutings() { - return indexRoutings; - } - - public String preference() { - return preference; - } - /** * Sets the bottom sort values that can be used by the searcher to filter documents * that are after it. This value is computed by coordinating nodes that throttles the diff --git a/server/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java b/server/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java index 11ff885a742b0..f5f5d48ef2506 100644 --- a/server/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java @@ -22,16 +22,11 @@ import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.MatchNoDocsQuery; import org.apache.lucene.search.Query; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.ShardIterator; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -44,21 +39,18 @@ import org.elasticsearch.search.internal.ShardSearchRequest; import java.io.IOException; -import java.util.Collections; -import java.util.Map; import java.util.Objects; -import java.util.Set; /** * A slice builder allowing to split a scroll in multiple partitions. - * If the provided field is the "_id" it uses a {@link org.elasticsearch.search.slice.TermsSliceQuery} - * to do the slicing. The slicing is done at the shard level first and then each shard is split into multiple slices. + * If the provided field is the "_id" it uses a {@link TermsSliceQuery} to do the slicing. + * The slicing is done at the shard level first and then each shard is split into multiple slices. * For instance if the number of shards is equal to 2 and the user requested 4 slices * then the slices 0 and 2 are assigned to the first shard and the slices 1 and 3 are assigned to the second shard. * This way the total number of bitsets that we need to build on each shard is bounded by the number of slices * (instead of {@code numShards*numSlices}). * Otherwise the provided field must be a numeric and doc_values must be enabled. In that case a - * {@link org.elasticsearch.search.slice.DocValuesSliceQuery} is used to filter the results. + * {@link DocValuesSliceQuery} is used to filter the results. */ public class SliceBuilder implements Writeable, ToXContentObject { @@ -205,40 +197,14 @@ public int hashCode() { * @param context Additional information needed to build the query */ @SuppressWarnings("rawtypes") - public Query toFilter(ClusterService clusterService, ShardSearchRequest request, QueryShardContext context) { + public Query toFilter(ShardSearchRequest request, QueryShardContext context) { final MappedFieldType type = context.getFieldType(field); if (type == null) { throw new IllegalArgumentException("field " + field + " not found"); } - int shardId = request.shardId().id(); + int shardIndex = request.shardIndex() != -1 ? request.shardIndex() : request.shardId().id(); int numShards = context.getIndexSettings().getNumberOfShards(); - if (request.preference() != null || request.indexRoutings().length > 0) { - GroupShardsIterator group = buildShardIterator(clusterService, request); - assert group.size() <= numShards : "index routing shards: " + group.size() + - " cannot be greater than total number of shards: " + numShards; - if (group.size() < numShards) { - /* - * The routing of this request targets a subset of the shards of this index so we need to we retrieve - * the original {@link GroupShardsIterator} and compute the request shard id and number of - * shards from it. - */ - numShards = group.size(); - int ord = 0; - shardId = -1; - // remap the original shard id with its index (position) in the sorted shard iterator. - for (ShardIterator it : group) { - assert it.shardId().getIndex().equals(request.shardId().getIndex()); - if (request.shardId().equals(it.shardId())) { - shardId = ord; - break; - } - ++ord; - } - assert shardId != -1 : "shard id: " + request.shardId().getId() + " not found in index shard routing"; - } - } - String field = this.field; boolean useTermQuery = false; if (IdFieldMapper.NAME.equals(field)) { @@ -262,7 +228,7 @@ public Query toFilter(ClusterService clusterService, ShardSearchRequest request, // first we check if the slice is responsible of this shard int targetShard = id % numShards; - if (targetShard != shardId) { + if (targetShard != shardIndex) { // the shard is not part of this slice, we can skip it. return new MatchNoDocsQuery("this shard is not part of the slice"); } @@ -287,7 +253,7 @@ public Query toFilter(ClusterService clusterService, ShardSearchRequest request, // the number of shards is greater than the number of slices // check if the shard is assigned to the slice - int targetSlice = shardId % max; + int targetSlice = shardIndex % max; if (id != targetSlice) { // the shard is not part of this slice, we can skip it. return new MatchNoDocsQuery("this shard is not part of the slice"); @@ -295,17 +261,6 @@ public Query toFilter(ClusterService clusterService, ShardSearchRequest request, return new MatchAllDocsQuery(); } - /** - * Returns the {@link GroupShardsIterator} for the provided request. - */ - private GroupShardsIterator buildShardIterator(ClusterService clusterService, ShardSearchRequest request) { - final ClusterState state = clusterService.state(); - String[] indices = new String[] { request.shardId().getIndex().getName() }; - Map> routingMap = request.indexRoutings().length > 0 ? - Collections.singletonMap(indices[0], Sets.newHashSet(request.indexRoutings())) : null; - return clusterService.operationRouting().searchShards(state, indices, routingMap, request.preference()); - } - @Override public String toString() { return Strings.toString(this, true, true); diff --git a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index 46b1e057c41a2..93df828c2e697 100644 --- a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -26,7 +26,6 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.shard.ShardId; @@ -87,7 +86,7 @@ private AbstractSearchAsyncAction createAction(SearchRequest return new AbstractSearchAsyncAction("test", logger, null, nodeIdToConnection, Collections.singletonMap("foo", new AliasFilter(new MatchAllQueryBuilder())), Collections.singletonMap("foo", 2.0f), - Collections.singletonMap("name", Sets.newHashSet("bar", "baz")), null, request, listener, + null, request, listener, new GroupShardsIterator<>( Collections.singletonList( new SearchShardIterator(null, null, Collections.emptyList(), null) @@ -142,21 +141,19 @@ private void runTestTook(final boolean controlled) { } public void testBuildShardSearchTransportRequest() { - SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(randomBoolean()).preference("_shards:1,3"); + SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(randomBoolean()); final AtomicLong expected = new AtomicLong(); AbstractSearchAsyncAction action = createAction(searchRequest, new ArraySearchPhaseResults<>(10), null, false, expected); String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10); SearchShardIterator iterator = new SearchShardIterator(clusterAlias, new ShardId(new Index("name", "foo"), 1), Collections.emptyList(), new OriginalIndices(new String[] {"name", "name1"}, IndicesOptions.strictExpand())); - ShardSearchRequest shardSearchTransportRequest = action.buildShardSearchRequest(iterator); + ShardSearchRequest shardSearchTransportRequest = action.buildShardSearchRequest(iterator, 10); assertEquals(IndicesOptions.strictExpand(), shardSearchTransportRequest.indicesOptions()); assertArrayEquals(new String[] {"name", "name1"}, shardSearchTransportRequest.indices()); assertEquals(new MatchAllQueryBuilder(), shardSearchTransportRequest.getAliasFilter().getQueryBuilder()); assertEquals(2.0f, shardSearchTransportRequest.indexBoost(), 0.0f); assertArrayEquals(new String[] {"name", "name1"}, shardSearchTransportRequest.indices()); - assertArrayEquals(new String[] {"bar", "baz"}, shardSearchTransportRequest.indexRoutings()); - assertEquals("_shards:1,3", shardSearchTransportRequest.preference()); assertEquals(clusterAlias, shardSearchTransportRequest.getClusterAlias()); } diff --git a/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java index af5f71e8e4bed..ab5731fbcdeb2 100644 --- a/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -93,7 +93,7 @@ public void sendCanMatch(Transport.Connection connection, ShardSearchRequest req searchTransportService, (clusterAlias, node) -> lookup.get(node), Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), - Collections.emptyMap(), Collections.emptyMap(), EsExecutors.newDirectExecutorService(), + Collections.emptyMap(), EsExecutors.newDirectExecutorService(), searchRequest, null, shardsIter, timeProvider, ClusterState.EMPTY_STATE, null, (iter) -> new SearchPhase("test") { @Override @@ -161,7 +161,7 @@ public void sendCanMatch(Transport.Connection connection, ShardSearchRequest req searchTransportService, (clusterAlias, node) -> lookup.get(node), Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), - Collections.emptyMap(), Collections.emptyMap(), EsExecutors.newDirectExecutorService(), + Collections.emptyMap(), EsExecutors.newDirectExecutorService(), searchRequest, null, shardsIter, timeProvider, ClusterState.EMPTY_STATE, null, (iter) -> new SearchPhase("test") { @Override @@ -223,7 +223,6 @@ public void sendCanMatch( (clusterAlias, node) -> lookup.get(node), Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), Collections.emptyMap(), - Collections.emptyMap(), EsExecutors.newDirectExecutorService(), searchRequest, null, @@ -241,7 +240,6 @@ public void sendCanMatch( }, aliasFilters, Collections.emptyMap(), - Collections.emptyMap(), executor, searchRequest, responseListener, @@ -328,7 +326,7 @@ public void sendCanMatch(Transport.Connection connection, ShardSearchRequest req searchTransportService, (clusterAlias, node) -> lookup.get(node), Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), - Collections.emptyMap(), Collections.emptyMap(), EsExecutors.newDirectExecutorService(), + Collections.emptyMap(), EsExecutors.newDirectExecutorService(), searchRequest, null, shardsIter, timeProvider, ClusterState.EMPTY_STATE, null, (iter) -> new SearchPhase("test") { @Override @@ -405,7 +403,7 @@ public void sendCanMatch(Transport.Connection connection, ShardSearchRequest req searchTransportService, (clusterAlias, node) -> lookup.get(node), Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), - Collections.emptyMap(), Collections.emptyMap(), EsExecutors.newDirectExecutorService(), + Collections.emptyMap(), EsExecutors.newDirectExecutorService(), searchRequest, null, shardsIter, timeProvider, ClusterState.EMPTY_STATE, null, (iter) -> new SearchPhase("test") { @Override diff --git a/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java b/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java index ae7ca32abe708..a5b785f9d6ee0 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java +++ b/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java @@ -117,7 +117,7 @@ public SearchTransportService getSearchTransport() { } @Override - public ShardSearchRequest buildShardSearchRequest(SearchShardIterator shardIt) { + public ShardSearchRequest buildShardSearchRequest(SearchShardIterator shardIt, int shardIndex) { Assert.fail("should not be called"); return null; } diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index 4ab2bf8d4e98c..06a3e3b8ee549 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -105,7 +105,6 @@ public void testSkipSearchShards() throws InterruptedException { return lookup.get(node); }, aliasFilters, Collections.emptyMap(), - Collections.emptyMap(), null, request, responseListener, @@ -211,7 +210,6 @@ public void testLimitConcurrentShardRequests() throws InterruptedException { return lookup.get(node); }, aliasFilters, Collections.emptyMap(), - Collections.emptyMap(), null, request, responseListener, @@ -314,7 +312,6 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI return lookup.get(node); }, aliasFilters, Collections.emptyMap(), - Collections.emptyMap(), executor, request, responseListener, @@ -424,7 +421,6 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI return lookup.get(node); }, aliasFilters, Collections.emptyMap(), - Collections.emptyMap(), executor, request, responseListener, @@ -525,7 +521,6 @@ public void testAllowPartialResults() throws InterruptedException { return lookup.get(node); }, aliasFilters, Collections.emptyMap(), - Collections.emptyMap(), null, request, responseListener, diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java index 47a1b3f618d42..525ebb494e08b 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -157,7 +157,7 @@ public void sendExecuteQuery(Transport.Connection connection, ShardSearchRequest SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, (clusterAlias, node) -> lookup.get(node), Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), - Collections.emptyMap(), Collections.emptyMap(), controller, executor, + Collections.emptyMap(), controller, executor, resultConsumer, searchRequest, null, shardsIter, timeProvider, null, task, SearchResponse.Clusters.EMPTY) { @Override diff --git a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java index 01ab3c591b41c..7056078e2b227 100644 --- a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java +++ b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java @@ -222,7 +222,6 @@ protected Engine.Searcher acquireSearcherInternal(String source) { when(queryShardContext.getIndexSettings()).thenReturn(indexSettings); when(queryShardContext.getFieldType(anyString())).thenReturn(mock(MappedFieldType.class)); - when(shardSearchRequest.indexRoutings()).thenReturn(new String[0]); readerContext.close(); readerContext = new ReaderContext(newContextId(), indexService, indexShard, diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 5a5df0e514052..44526aa755437 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -313,8 +313,8 @@ public void onFailure(Exception e) { final boolean useScroll = randomBoolean(); service.executeQueryPhase( new ShardSearchRequest(OriginalIndices.NONE, useScroll ? scrollSearchRequest : searchRequest, - indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null), + indexShard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null), true, new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()), result); SearchPhaseResult searchPhaseResult = result.get(); @@ -381,8 +381,8 @@ public void testSearchWhileIndexDeletedDoesNotLeakSearchContext() throws Executi PlainActionFuture result = new PlainActionFuture<>(); service.executeQueryPhase( new ShardSearchRequest(OriginalIndices.NONE, useScroll ? scrollSearchRequest : searchRequest, - new ShardId(resolveIndex("index"), 0), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null), + new ShardId(resolveIndex("index"), 0), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null), randomBoolean(), new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()), result); try { @@ -412,9 +412,10 @@ public void testTimeout() throws IOException { OriginalIndices.NONE, searchRequest, indexShard.shardId(), + 0, 1, new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, -1, null, null); + 1.0f, -1, null); try (ReaderContext reader = createReaderContext(indexService, indexShard); SearchContext contextWithDefaultTimeout = service.createContext(reader, requestWithDefaultTimeout, null, randomBoolean())) { @@ -428,9 +429,10 @@ public void testTimeout() throws IOException { OriginalIndices.NONE, searchRequest, indexShard.shardId(), + 0, 1, new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, -1, null, null); + 1.0f, -1, null); try (ReaderContext reader = createReaderContext(indexService, indexShard); SearchContext context = service.createContext(reader, requestWithCustomTimeout, null, randomBoolean())) { // the search context should inherit the query timeout @@ -458,8 +460,8 @@ public void testMaxDocvalueFieldsSearch() throws IOException { searchRequest.source(searchSourceBuilder); searchSourceBuilder.docValueField("field1"); - final ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); + final ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null); try (ReaderContext reader = createReaderContext(indexService, indexShard); SearchContext context = service.createContext(reader, request, null, randomBoolean())) { assertNotNull(context); @@ -502,7 +504,7 @@ public void testMaxScriptFieldsSearch() throws IOException { new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap())); } final ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, - indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); + indexShard.shardId(), 0, 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null); try(ReaderContext reader = createReaderContext(indexService, indexShard)) { try (SearchContext context = service.createContext(reader, request, null, randomBoolean())) { @@ -534,8 +536,8 @@ public void testIgnoreScriptfieldIfSizeZero() throws IOException { new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap())); searchSourceBuilder.size(0); final ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, - searchRequest, indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, -1, null, null); + searchRequest, indexShard.shardId(), 0, 1, new AliasFilter(null, Strings.EMPTY_ARRAY), + 1.0f, -1, null); try (ReaderContext reader = createReaderContext(indexService, indexShard); SearchContext context = service.createContext(reader, request, null, randomBoolean())) { assertEquals(0, context.scriptFields().fields().size()); @@ -686,7 +688,7 @@ private static class ShardScrollRequestTest extends ShardSearchRequest { ShardScrollRequestTest(ShardId shardId) { super(OriginalIndices.NONE, new SearchRequest().allowPartialSearchResults(true), - shardId, 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null); + shardId, 0, 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null); this.scroll = new Scroll(TimeValue.timeValueMinutes(1)); } @@ -703,47 +705,47 @@ public void testCanMatch() throws Exception { final IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index")); final IndexShard indexShard = indexService.getShard(0); SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true); - assertTrue(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); + assertTrue(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null)).canMatch()); searchRequest.source(new SearchSourceBuilder()); - assertTrue(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); + assertTrue(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null)).canMatch()); searchRequest.source(new SearchSourceBuilder().query(new MatchAllQueryBuilder())); - assertTrue(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); + assertTrue(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null)).canMatch()); searchRequest.source(new SearchSourceBuilder().query(new MatchNoneQueryBuilder()) .aggregation(new TermsAggregationBuilder("test").userValueTypeHint(ValueType.STRING).minDocCount(0))); - assertTrue(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); + assertTrue(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null)).canMatch()); searchRequest.source(new SearchSourceBuilder().query(new MatchNoneQueryBuilder()) .aggregation(new GlobalAggregationBuilder("test"))); - assertTrue(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); + assertTrue(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null)).canMatch()); searchRequest.source(new SearchSourceBuilder().query(new MatchNoneQueryBuilder())); - assertFalse(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); + assertFalse(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null)).canMatch()); assertEquals(0, numWrapInvocations.get()); - ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); + ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null); /* * Checks that canMatch takes into account the alias filter */ // the source cannot be rewritten to a match_none searchRequest.indices("alias").source(new SearchSourceBuilder().query(new MatchAllQueryBuilder())); - assertFalse(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(new TermQueryBuilder("foo", "bar"), "alias"), 1f, -1, null, null)).canMatch()); + assertFalse(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 0, 1, + new AliasFilter(new TermQueryBuilder("foo", "bar"), "alias"), 1f, -1, null)).canMatch()); // the source can match and can be rewritten to a match_none, but not the alias filter final IndexResponse response = client().prepareIndex("index").setSource("id", "1").get(); assertEquals(RestStatus.CREATED, response.status()); searchRequest.indices("alias").source(new SearchSourceBuilder().query(new TermQueryBuilder("id", "1"))); - assertFalse(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(new TermQueryBuilder("foo", "bar"), "alias"), 1f, -1, null, null)).canMatch()); + assertFalse(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 0, 1, + new AliasFilter(new TermQueryBuilder("foo", "bar"), "alias"), 1f, -1, null)).canMatch()); CountDownLatch latch = new CountDownLatch(1); SearchShardTask task = new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()); @@ -815,8 +817,8 @@ public void testSetSearchThrottled() { iae.getMessage()); assertFalse(service.getIndicesService().indexServiceSafe(index).getIndexSettings().isSearchThrottled()); SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(false); - ShardSearchRequest req = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, new ShardId(index, 0), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null); + ShardSearchRequest req = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, new ShardId(index, 0), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null); Thread currentThread = Thread.currentThread(); // we still make sure can match is executed on the network thread service.canMatch(req, ActionListener.wrap(r -> assertSame(Thread.currentThread(), currentThread), e -> fail("unexpected"))); @@ -872,7 +874,7 @@ public void testCreateSearchContext() throws IOException { SearchRequest searchRequest = new SearchRequest(); searchRequest.allowPartialSearchResults(randomBoolean()); ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shardId, - indexService.numberOfShards(), AliasFilter.EMPTY, 1f, nowInMillis, clusterAlias, Strings.EMPTY_ARRAY); + 0, indexService.numberOfShards(), AliasFilter.EMPTY, 1f, nowInMillis, clusterAlias); try (DefaultSearchContext searchContext = service.createSearchContext(request, new TimeValue(System.currentTimeMillis()))) { SearchShardTarget searchShardTarget = searchContext.shardTarget(); QueryShardContext queryShardContext = searchContext.getQueryShardContext(); @@ -925,7 +927,7 @@ public void testMatchNoDocsEmptyResponse() throws InterruptedException { .source(new SearchSourceBuilder() .aggregation(AggregationBuilders.count("count").field("value"))); ShardSearchRequest shardRequest = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), - 5, AliasFilter.EMPTY, 1.0f, 0, null, null); + 0, 5, AliasFilter.EMPTY, 1.0f, 0, null); SearchShardTask task = new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()); { @@ -1062,7 +1064,7 @@ public void testLookUpSearchContext() throws Exception { for (int i = 0; i < numContexts; i++) { ShardSearchRequest request = new ShardSearchRequest( OriginalIndices.NONE, new SearchRequest().allowPartialSearchResults(true), - indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); + indexShard.shardId(), 0, 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null); final ReaderContext context = searchService.createAndPutReaderContext(request, indexService, indexShard, indexShard.acquireSearcherSupplier(), randomBoolean()); assertThat(context.id().getId(), equalTo((long) (i + 1))); diff --git a/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java b/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java index 812eca890c608..630be3c44050f 100644 --- a/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java +++ b/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java @@ -82,7 +82,6 @@ private ShardSearchRequest createShardSearchRequest() throws IOException { } else { filteringAliases = new AliasFilter(null, Strings.EMPTY_ARRAY); } - final String[] routings = generateRandomStringArray(5, 10, false, true); ShardSearchContextId shardSearchContextId = null; TimeValue keepAlive = null; if (randomBoolean()) { @@ -91,9 +90,10 @@ private ShardSearchRequest createShardSearchRequest() throws IOException { keepAlive = TimeValue.timeValueSeconds(randomIntBetween(0, 120)); } } + int numberOfShards = randomIntBetween(1, 100); ShardSearchRequest req = new ShardSearchRequest(new OriginalIndices(searchRequest), searchRequest, shardId, - randomIntBetween(1, 100), filteringAliases, randomBoolean() ? 1.0f : randomFloat(), - Math.abs(randomLong()), randomAlphaOfLengthBetween(3, 10), routings, shardSearchContextId, keepAlive); + randomIntBetween(1, numberOfShards), numberOfShards, filteringAliases, randomBoolean() ? 1.0f : randomFloat(), + Math.abs(randomLong()), randomAlphaOfLengthBetween(3, 10), shardSearchContextId, keepAlive); req.canReturnNullResponseIfMatchNoDocs(randomBoolean()); if (randomBoolean()) { req.setBottomSortValues(SearchSortValuesAndFormatsTests.randomInstance()); @@ -157,8 +157,6 @@ private static void assertEquals(ShardSearchRequest orig, ShardSearchRequest cop assertEquals(orig.searchType(), copy.searchType()); assertEquals(orig.shardId(), copy.shardId()); assertEquals(orig.numberOfShards(), copy.numberOfShards()); - assertArrayEquals(orig.indexRoutings(), copy.indexRoutings()); - assertEquals(orig.preference(), copy.preference()); assertEquals(orig.cacheKey(), copy.cacheKey()); assertNotSame(orig, copy); assertEquals(orig.getAliasFilter(), copy.getAliasFilter()); diff --git a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java index b58f50f612b7c..1be3e321fb725 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java @@ -58,7 +58,7 @@ private static QuerySearchResult createTestInstance() throws Exception { ShardId shardId = new ShardId("index", "uuid", randomInt()); SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(randomBoolean()); ShardSearchRequest shardSearchRequest = new ShardSearchRequest(OriginalIndicesTests.randomOriginalIndices(), searchRequest, - shardId, 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, randomNonNegativeLong(), null, new String[0]); + shardId, 0, 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, randomNonNegativeLong(), null); QuerySearchResult result = new QuerySearchResult(new ShardSearchContextId(UUIDs.base64UUID(), randomLong()), new SearchShardTarget("node", shardId, null, OriginalIndices.NONE), shardSearchRequest); if (randomBoolean()) { diff --git a/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java b/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java index f59d285587a66..c72348c39520c 100644 --- a/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java +++ b/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java @@ -32,16 +32,8 @@ import org.elasticsearch.Version; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.Metadata; -import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.OperationRouting; -import org.elasticsearch.cluster.routing.PlainShardIterator; -import org.elasticsearch.cluster.routing.ShardIterator; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -57,7 +49,6 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.internal.ShardSearchRequest; import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.test.VersionUtils; import java.io.IOException; import java.util.ArrayList; @@ -72,7 +63,6 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; -import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -109,17 +99,13 @@ private IndexSettings createIndexSettings(Version indexVersionCreated, int numSh return new IndexSettings(indexState, Settings.EMPTY); } - private ShardSearchRequest createRequest(int shardId) { - return createRequest(shardId, Strings.EMPTY_ARRAY, null); - } - - private ShardSearchRequest createRequest(int shardId, String[] routings, String preference) { - return new ShardSearchRequest(OriginalIndices.NONE, new SearchRequest().preference(preference).allowPartialSearchResults(true), - new ShardId("index", "index", shardId), 1, null, 0f, System.currentTimeMillis(), null, routings); + private ShardSearchRequest createRequest(int shardIndex) { + return new ShardSearchRequest(OriginalIndices.NONE, new SearchRequest().allowPartialSearchResults(true), + new ShardId("index", "index", 0), shardIndex, 1, null, 0f, System.currentTimeMillis(), null); } private QueryShardContext createShardContext(Version indexVersionCreated, IndexReader reader, - String fieldName, DocValuesType dvType, int numShards, int shardId) { + String fieldName, DocValuesType dvType, int numShards) { MappedFieldType fieldType = new MappedFieldType(fieldName, true, false, dvType != null, TextSearchInfo.NONE, Collections.emptyMap()) { @@ -145,7 +131,6 @@ public Query existsQuery(QueryShardContext context) { QueryShardContext context = mock(QueryShardContext.class); when(context.getFieldType(fieldName)).thenReturn(fieldType); when(context.getIndexReader()).thenReturn(reader); - when(context.getShardId()).thenReturn(shardId); IndexSettings indexSettings = createIndexSettings(indexVersionCreated, numShards); when(context.getIndexSettings()).thenReturn(indexSettings); if (dvType != null) { @@ -211,15 +196,15 @@ public void testToFilterSimple() throws IOException { } try (IndexReader reader = DirectoryReader.open(dir)) { QueryShardContext context = - createShardContext(Version.CURRENT, reader, "_id", DocValuesType.SORTED_NUMERIC, 1,0); + createShardContext(Version.CURRENT, reader, "_id", DocValuesType.SORTED_NUMERIC, 1); SliceBuilder builder = new SliceBuilder(5, 10); - Query query = builder.toFilter(null, createRequest(0), context); + Query query = builder.toFilter(createRequest(0), context); assertThat(query, instanceOf(TermsSliceQuery.class)); - assertThat(builder.toFilter(null, createRequest(0), context), equalTo(query)); + assertThat(builder.toFilter(createRequest(0), context), equalTo(query)); try (IndexReader newReader = DirectoryReader.open(dir)) { when(context.getIndexReader()).thenReturn(newReader); - assertThat(builder.toFilter(null, createRequest(0), context), equalTo(query)); + assertThat(builder.toFilter(createRequest(0), context), equalTo(query)); } } } @@ -231,14 +216,14 @@ public void testToFilterRandom() throws IOException { } try (IndexReader reader = DirectoryReader.open(dir)) { QueryShardContext context = - createShardContext(Version.CURRENT, reader, "field", DocValuesType.SORTED_NUMERIC, 1,0); + createShardContext(Version.CURRENT, reader, "field", DocValuesType.SORTED_NUMERIC, 1); SliceBuilder builder = new SliceBuilder("field", 5, 10); - Query query = builder.toFilter(null, createRequest(0), context); + Query query = builder.toFilter(createRequest(0), context); assertThat(query, instanceOf(DocValuesSliceQuery.class)); - assertThat(builder.toFilter(null, createRequest(0), context), equalTo(query)); + assertThat(builder.toFilter(createRequest(0), context), equalTo(query)); try (IndexReader newReader = DirectoryReader.open(dir)) { when(context.getIndexReader()).thenReturn(newReader); - assertThat(builder.toFilter(null, createRequest(0), context), equalTo(query)); + assertThat(builder.toFilter(createRequest(0), context), equalTo(query)); } // numSlices > numShards @@ -248,8 +233,8 @@ public void testToFilterRandom() throws IOException { for (int i = 0; i < numSlices; i++) { for (int j = 0; j < numShards; j++) { SliceBuilder slice = new SliceBuilder("_id", i, numSlices); - context = createShardContext(Version.CURRENT, reader, "_id", DocValuesType.SORTED, numShards, j); - Query q = slice.toFilter(null, createRequest(j), context); + context = createShardContext(Version.CURRENT, reader, "_id", DocValuesType.SORTED, numShards); + Query q = slice.toFilter(createRequest(j), context); if (q instanceof TermsSliceQuery || q instanceof MatchAllDocsQuery) { AtomicInteger count = numSliceMap.get(j); if (count == null) { @@ -278,8 +263,8 @@ public void testToFilterRandom() throws IOException { for (int i = 0; i < numSlices; i++) { for (int j = 0; j < numShards; j++) { SliceBuilder slice = new SliceBuilder("_id", i, numSlices); - context = createShardContext(Version.CURRENT, reader, "_id", DocValuesType.SORTED, numShards, j); - Query q = slice.toFilter(null, createRequest(j), context); + context = createShardContext(Version.CURRENT, reader, "_id", DocValuesType.SORTED, numShards); + Query q = slice.toFilter(createRequest(j), context); if (q instanceof MatchNoDocsQuery == false) { assertThat(q, instanceOf(MatchAllDocsQuery.class)); targetShards.add(j); @@ -295,8 +280,8 @@ public void testToFilterRandom() throws IOException { for (int i = 0; i < numSlices; i++) { for (int j = 0; j < numShards; j++) { SliceBuilder slice = new SliceBuilder("_id", i, numSlices); - context = createShardContext(Version.CURRENT, reader, "_id", DocValuesType.SORTED, numShards, j); - Query q = slice.toFilter(null, createRequest(j), context); + context = createShardContext(Version.CURRENT, reader, "_id", DocValuesType.SORTED, numShards); + Query q = slice.toFilter(createRequest(j), context); if (i == j) { assertThat(q, instanceOf(MatchAllDocsQuery.class)); } else { @@ -313,41 +298,11 @@ public void testInvalidField() throws IOException { writer.commit(); } try (IndexReader reader = DirectoryReader.open(dir)) { - QueryShardContext context = createShardContext(Version.CURRENT, reader, "field", null, 1,0); + QueryShardContext context = createShardContext(Version.CURRENT, reader, "field", null, 1); SliceBuilder builder = new SliceBuilder("field", 5, 10); IllegalArgumentException exc = expectThrows(IllegalArgumentException.class, - () -> builder.toFilter(null, createRequest(0), context)); + () -> builder.toFilter(createRequest(0), context)); assertThat(exc.getMessage(), containsString("cannot load numeric doc values")); } } - - public void testToFilterWithRouting() throws IOException { - Directory dir = new ByteBuffersDirectory(); - try (IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())))) { - writer.commit(); - } - ClusterService clusterService = mock(ClusterService.class); - ClusterState state = mock(ClusterState.class); - when(state.metadata()).thenReturn(Metadata.EMPTY_METADATA); - when(clusterService.state()).thenReturn(state); - OperationRouting routing = mock(OperationRouting.class); - GroupShardsIterator it = new GroupShardsIterator<>( - Collections.singletonList( - new PlainShardIterator(new ShardId("index", "index", 1), Collections.emptyList()) - ) - ); - when(routing.searchShards(any(), any(), any(), any())).thenReturn(it); - when(clusterService.operationRouting()).thenReturn(routing); - when(clusterService.getSettings()).thenReturn(Settings.EMPTY); - try (IndexReader reader = DirectoryReader.open(dir)) { - Version version = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT); - QueryShardContext context = createShardContext(version, reader, "field", DocValuesType.SORTED, 5, 0); - SliceBuilder builder = new SliceBuilder("field", 6, 10); - String[] routings = new String[] { "foo" }; - Query query = builder.toFilter(clusterService, createRequest(1, routings, null), context); - assertEquals(new DocValuesSliceQuery("field", 6, 10), query); - query = builder.toFilter(clusterService, createRequest(1, Strings.EMPTY_ARRAY, "foo"), context); - assertEquals(new DocValuesSliceQuery("field", 6, 10), query); - } - } } diff --git a/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java b/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java index c52747e4be3d1..a9326b65f666d 100644 --- a/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java +++ b/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java @@ -302,18 +302,18 @@ public void testCanMatch() throws IOException { assertFalse(indexService.getIndexSettings().isSearchThrottled()); SearchService searchService = getInstanceFromNode(SearchService.class); SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true); - assertTrue(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); + assertTrue(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null)).canMatch()); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); searchRequest.source(sourceBuilder); sourceBuilder.query(QueryBuilders.rangeQuery("field").gte("2010-01-03||+2d").lte("2010-01-04||+2d/d")); - assertTrue(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); + assertTrue(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null)).canMatch()); sourceBuilder.query(QueryBuilders.rangeQuery("field").gt("2010-01-06T02:00").lt("2010-01-07T02:00")); - assertFalse(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); + assertFalse(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null)).canMatch()); } assertAcked(client().execute(FreezeIndexAction.INSTANCE, new FreezeRequest("index")).actionGet()); @@ -326,18 +326,18 @@ public void testCanMatch() throws IOException { assertTrue(indexService.getIndexSettings().isSearchThrottled()); SearchService searchService = getInstanceFromNode(SearchService.class); SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true); - assertTrue(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); + assertTrue(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null)).canMatch()); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(QueryBuilders.rangeQuery("field").gte("2010-01-03||+2d").lte("2010-01-04||+2d/d")); searchRequest.source(sourceBuilder); - assertTrue(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); + assertTrue(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null)).canMatch()); sourceBuilder.query(QueryBuilders.rangeQuery("field").gt("2010-01-06T02:00").lt("2010-01-07T02:00")); - assertFalse(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); + assertFalse(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null)).canMatch()); IndicesStatsResponse response = client().admin().indices().prepareStats("index").clear().setRefresh(true).get(); assertEquals(0, response.getTotal().refresh.getTotal()); From 0ccc78de9b31b8e145f0083795478afd4746315c Mon Sep 17 00:00:00 2001 From: jimczi Date: Tue, 1 Dec 2020 23:02:29 +0100 Subject: [PATCH 2/4] fix number of shards in slice builder --- .../main/java/org/elasticsearch/search/slice/SliceBuilder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java b/server/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java index f5f5d48ef2506..e5e53bd8ae362 100644 --- a/server/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java @@ -204,7 +204,7 @@ public Query toFilter(ShardSearchRequest request, QueryShardContext context) { } int shardIndex = request.shardIndex() != -1 ? request.shardIndex() : request.shardId().id(); - int numShards = context.getIndexSettings().getNumberOfShards(); + int numShards = request.numberOfShards(); String field = this.field; boolean useTermQuery = false; if (IdFieldMapper.NAME.equals(field)) { From a99e113de0c4bf7962e81c9d1b0b0da16f214315 Mon Sep 17 00:00:00 2001 From: jimczi Date: Wed, 2 Dec 2020 00:57:26 +0100 Subject: [PATCH 3/4] fix tests --- .../search/slice/SliceBuilder.java | 2 +- .../search/DefaultSearchContextTests.java | 2 + .../search/slice/SliceBuilderTests.java | 48 +++++++++---------- 3 files changed, 27 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java b/server/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java index e5e53bd8ae362..29e95e4b7d680 100644 --- a/server/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java @@ -204,7 +204,7 @@ public Query toFilter(ShardSearchRequest request, QueryShardContext context) { } int shardIndex = request.shardIndex() != -1 ? request.shardIndex() : request.shardId().id(); - int numShards = request.numberOfShards(); + int numShards = request.shardIndex() != -1 ? request.numberOfShards() : context.getIndexSettings().getNumberOfShards(); String field = this.field; boolean useTermQuery = false; if (IdFieldMapper.NAME.equals(field)) { diff --git a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java index 7056078e2b227..2f406a50bab85 100644 --- a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java +++ b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java @@ -83,6 +83,8 @@ public void testPreProcess() throws Exception { when(shardSearchRequest.searchType()).thenReturn(SearchType.DEFAULT); ShardId shardId = new ShardId("index", UUID.randomUUID().toString(), 1); when(shardSearchRequest.shardId()).thenReturn(shardId); + when(shardSearchRequest.shardIndex()).thenReturn(shardId.id()); + when(shardSearchRequest.numberOfShards()).thenReturn(2); ThreadPool threadPool = new TestThreadPool(this.getClass().getName()); IndexShard indexShard = mock(IndexShard.class); diff --git a/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java b/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java index c72348c39520c..b17cadc82c975 100644 --- a/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java +++ b/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java @@ -89,23 +89,23 @@ private static SliceBuilder mutate(SliceBuilder original) { } } - private IndexSettings createIndexSettings(Version indexVersionCreated, int numShards) { + private IndexSettings createIndexSettings(Version indexVersionCreated) { Settings settings = Settings.builder() .put(IndexMetadata.SETTING_VERSION_CREATED, indexVersionCreated) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .build(); IndexMetadata indexState = IndexMetadata.builder("index").settings(settings).build(); return new IndexSettings(indexState, Settings.EMPTY); } - private ShardSearchRequest createRequest(int shardIndex) { + private ShardSearchRequest createRequest(int shardIndex, int numShards) { return new ShardSearchRequest(OriginalIndices.NONE, new SearchRequest().allowPartialSearchResults(true), - new ShardId("index", "index", 0), shardIndex, 1, null, 0f, System.currentTimeMillis(), null); + new ShardId("index", "index", 0), shardIndex, numShards, null, 0f, System.currentTimeMillis(), null); } private QueryShardContext createShardContext(Version indexVersionCreated, IndexReader reader, - String fieldName, DocValuesType dvType, int numShards) { + String fieldName, DocValuesType dvType) { MappedFieldType fieldType = new MappedFieldType(fieldName, true, false, dvType != null, TextSearchInfo.NONE, Collections.emptyMap()) { @@ -131,7 +131,7 @@ public Query existsQuery(QueryShardContext context) { QueryShardContext context = mock(QueryShardContext.class); when(context.getFieldType(fieldName)).thenReturn(fieldType); when(context.getIndexReader()).thenReturn(reader); - IndexSettings indexSettings = createIndexSettings(indexVersionCreated, numShards); + IndexSettings indexSettings = createIndexSettings(indexVersionCreated); when(context.getIndexSettings()).thenReturn(indexSettings); if (dvType != null) { IndexNumericFieldData fd = mock(IndexNumericFieldData.class); @@ -196,15 +196,15 @@ public void testToFilterSimple() throws IOException { } try (IndexReader reader = DirectoryReader.open(dir)) { QueryShardContext context = - createShardContext(Version.CURRENT, reader, "_id", DocValuesType.SORTED_NUMERIC, 1); + createShardContext(Version.CURRENT, reader, "_id", null); SliceBuilder builder = new SliceBuilder(5, 10); - Query query = builder.toFilter(createRequest(0), context); + Query query = builder.toFilter(createRequest(0, 1), context); assertThat(query, instanceOf(TermsSliceQuery.class)); - assertThat(builder.toFilter(createRequest(0), context), equalTo(query)); + assertThat(builder.toFilter(createRequest(0, 1), context), equalTo(query)); try (IndexReader newReader = DirectoryReader.open(dir)) { when(context.getIndexReader()).thenReturn(newReader); - assertThat(builder.toFilter(createRequest(0), context), equalTo(query)); + assertThat(builder.toFilter(createRequest(0, 1), context), equalTo(query)); } } } @@ -216,25 +216,25 @@ public void testToFilterRandom() throws IOException { } try (IndexReader reader = DirectoryReader.open(dir)) { QueryShardContext context = - createShardContext(Version.CURRENT, reader, "field", DocValuesType.SORTED_NUMERIC, 1); + createShardContext(Version.CURRENT, reader, "field", DocValuesType.SORTED_NUMERIC); SliceBuilder builder = new SliceBuilder("field", 5, 10); - Query query = builder.toFilter(createRequest(0), context); + Query query = builder.toFilter(createRequest(0, 1), context); assertThat(query, instanceOf(DocValuesSliceQuery.class)); - assertThat(builder.toFilter(createRequest(0), context), equalTo(query)); + assertThat(builder.toFilter(createRequest(0, 1), context), equalTo(query)); try (IndexReader newReader = DirectoryReader.open(dir)) { when(context.getIndexReader()).thenReturn(newReader); - assertThat(builder.toFilter(createRequest(0), context), equalTo(query)); + assertThat(builder.toFilter(createRequest(0, 1), context), equalTo(query)); } // numSlices > numShards - int numSlices = randomIntBetween(10, 100); - int numShards = randomIntBetween(1, 9); + int numSlices = 100;//randomIntBetween(10, 100); + int numShards = 10;//randomIntBetween(1, 9); Map numSliceMap = new HashMap<>(); for (int i = 0; i < numSlices; i++) { for (int j = 0; j < numShards; j++) { SliceBuilder slice = new SliceBuilder("_id", i, numSlices); - context = createShardContext(Version.CURRENT, reader, "_id", DocValuesType.SORTED, numShards); - Query q = slice.toFilter(createRequest(j), context); + context = createShardContext(Version.CURRENT, reader, "_id", null); + Query q = slice.toFilter(createRequest(j, numShards), context); if (q instanceof TermsSliceQuery || q instanceof MatchAllDocsQuery) { AtomicInteger count = numSliceMap.get(j); if (count == null) { @@ -263,8 +263,8 @@ public void testToFilterRandom() throws IOException { for (int i = 0; i < numSlices; i++) { for (int j = 0; j < numShards; j++) { SliceBuilder slice = new SliceBuilder("_id", i, numSlices); - context = createShardContext(Version.CURRENT, reader, "_id", DocValuesType.SORTED, numShards); - Query q = slice.toFilter(createRequest(j), context); + context = createShardContext(Version.CURRENT, reader, "_id", null); + Query q = slice.toFilter(createRequest(j, numShards), context); if (q instanceof MatchNoDocsQuery == false) { assertThat(q, instanceOf(MatchAllDocsQuery.class)); targetShards.add(j); @@ -280,8 +280,8 @@ public void testToFilterRandom() throws IOException { for (int i = 0; i < numSlices; i++) { for (int j = 0; j < numShards; j++) { SliceBuilder slice = new SliceBuilder("_id", i, numSlices); - context = createShardContext(Version.CURRENT, reader, "_id", DocValuesType.SORTED, numShards); - Query q = slice.toFilter(createRequest(j), context); + context = createShardContext(Version.CURRENT, reader, "_id", null); + Query q = slice.toFilter(createRequest(j, numShards), context); if (i == j) { assertThat(q, instanceOf(MatchAllDocsQuery.class)); } else { @@ -298,10 +298,10 @@ public void testInvalidField() throws IOException { writer.commit(); } try (IndexReader reader = DirectoryReader.open(dir)) { - QueryShardContext context = createShardContext(Version.CURRENT, reader, "field", null, 1); + QueryShardContext context = createShardContext(Version.CURRENT, reader, "field", null); SliceBuilder builder = new SliceBuilder("field", 5, 10); IllegalArgumentException exc = expectThrows(IllegalArgumentException.class, - () -> builder.toFilter(createRequest(0), context)); + () -> builder.toFilter(createRequest(0, 1), context)); assertThat(exc.getMessage(), containsString("cannot load numeric doc values")); } } From 02ae5402bf3bbbaf00492f248a846619d38a3c68 Mon Sep 17 00:00:00 2001 From: jimczi Date: Wed, 2 Dec 2020 01:05:47 +0100 Subject: [PATCH 4/4] restore randomization --- .../org/elasticsearch/search/slice/SliceBuilderTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java b/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java index b17cadc82c975..9fc918d6de0cf 100644 --- a/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java +++ b/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java @@ -227,8 +227,8 @@ public void testToFilterRandom() throws IOException { } // numSlices > numShards - int numSlices = 100;//randomIntBetween(10, 100); - int numShards = 10;//randomIntBetween(1, 9); + int numSlices = randomIntBetween(10, 100); + int numShards = randomIntBetween(1, 9); Map numSliceMap = new HashMap<>(); for (int i = 0; i < numSlices; i++) { for (int j = 0; j < numShards; j++) {