From c896b1191a054ad132177e1d29617070b71aa900 Mon Sep 17 00:00:00 2001 From: Jim Ferenczi Date: Mon, 7 Dec 2020 09:16:36 +0100 Subject: [PATCH 1/2] Adds a consistent shard index to ShardSearchRequest (#65706) * Adds a consistent shard index to ShardSearchRequest This change ensures that the shard index that is used to tiebreak documents with identical sort remains consistent between two requests that target the same shards. The index is now always computed from the natural order of the shards in the search request. This change also adds the consistent shard index to the ShardSearchRequest. That allows the slice builder to use this information to build more balanced slice query. Relates #56828 --- .../search/AbstractSearchAsyncAction.java | 37 +++--- .../search/CanMatchPreFilterSearchPhase.java | 8 +- .../action/search/FetchSearchPhase.java | 2 +- .../SearchDfsQueryThenFetchAsyncAction.java | 7 +- .../action/search/SearchPhaseContext.java | 6 +- .../action/search/SearchPhaseController.java | 12 +- .../SearchQueryThenFetchAsyncAction.java | 7 +- .../action/search/TransportSearchAction.java | 22 ++-- .../search/DefaultSearchContext.java | 2 +- .../search/internal/ShardSearchRequest.java | 67 +++++----- .../search/slice/SliceBuilder.java | 66 ++-------- .../AbstractSearchAsyncActionTests.java | 9 +- .../CanMatchPreFilterSearchPhaseTests.java | 10 +- .../action/search/MockSearchPhaseContext.java | 2 +- .../action/search/SearchAsyncActionTests.java | 5 - .../SearchQueryThenFetchAsyncActionTests.java | 2 +- .../search/DefaultSearchContextTests.java | 3 +- .../search/SearchServiceTests.java | 72 +++++------ .../internal/ShardSearchRequestTests.java | 8 +- .../search/query/QuerySearchResultTests.java | 2 +- .../search/slice/SliceBuilderTests.java | 114 ++++++------------ .../index/engine/FrozenIndexTests.java | 24 ++-- 22 files changed, 196 insertions(+), 291 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index a504e209dc96d..18fa9968eeec4 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; @@ -48,10 +49,9 @@ import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -83,7 +83,6 @@ abstract class AbstractSearchAsyncAction exten private final ClusterState clusterState; private final Map aliasFilter; private final Map concreteIndexBoosts; - private final Map> indexRoutings; private final SetOnce> shardFailures = new SetOnce<>(); private final Object shardFailuresMutex = new Object(); private final AtomicBoolean hasShardResponse = new AtomicBoolean(false); @@ -94,6 +93,7 @@ abstract class AbstractSearchAsyncAction exten protected final GroupShardsIterator toSkipShardsIts; protected final GroupShardsIterator shardsIts; + private final Map shardItIndexMap; private final int expectedTotalOps; private final AtomicInteger totalOps = new AtomicInteger(); private final int maxConcurrentRequestsPerNode; @@ -106,7 +106,6 @@ abstract class AbstractSearchAsyncAction exten AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService, BiFunction nodeIdToConnection, Map aliasFilter, Map concreteIndexBoosts, - Map> indexRoutings, Executor executor, SearchRequest request, ActionListener listener, GroupShardsIterator shardsIts, SearchTimeProvider timeProvider, ClusterState clusterState, @@ -124,6 +123,17 @@ abstract class AbstractSearchAsyncAction exten } this.toSkipShardsIts = new GroupShardsIterator<>(toSkipIterators); this.shardsIts = new GroupShardsIterator<>(iterators); + this.shardItIndexMap = new HashMap<>(); + + // we compute the shard index based on the natural order of the shards + // that participate in the search request. This means that this number is + // consistent between two requests that target the same shards. + List naturalOrder = new ArrayList<>(iterators); + CollectionUtil.timSort(naturalOrder); + for (int i = 0; i < naturalOrder.size(); i++) { + shardItIndexMap.put(naturalOrder.get(i), i); + } + // we need to add 1 for non active partition, since we count it in the total. This means for each shard in the iterator we sum up // it's number of active shards but use 1 as the default if no replica of a shard is active at this point. // on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result @@ -143,7 +153,6 @@ abstract class AbstractSearchAsyncAction exten this.clusterState = clusterState; this.concreteIndexBoosts = concreteIndexBoosts; this.aliasFilter = aliasFilter; - this.indexRoutings = indexRoutings; this.results = resultConsumer; this.clusters = clusters; } @@ -210,10 +219,13 @@ public final void run() { throw new SearchPhaseExecutionException(getName(), msg, null, ShardSearchFailure.EMPTY_ARRAY); } } - for (int index = 0; index < shardsIts.size(); index++) { - final SearchShardIterator shardRoutings = shardsIts.get(index); + + for (int i = 0; i < shardsIts.size(); i++) { + final SearchShardIterator shardRoutings = shardsIts.get(i); assert shardRoutings.skip() == false; - performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull()); + assert shardItIndexMap.containsKey(shardRoutings); + int shardIndex = shardItIndexMap.get(shardRoutings); + performPhaseOnShard(shardIndex, shardRoutings, shardRoutings.nextOrNull()); } } } @@ -651,15 +663,12 @@ public final void onFailure(Exception e) { } @Override - public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shardIt) { + public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shardIt, int shardIndex) { AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID()); assert filter != null; float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST); - String indexName = shardIt.shardId().getIndex().getName(); - final String[] routings = indexRoutings.getOrDefault(indexName, Collections.emptySet()) - .toArray(new String[0]); - ShardSearchRequest shardRequest = new ShardSearchRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(), - filter, indexBoost, timeProvider.getAbsoluteStartMillis(), shardIt.getClusterAlias(), routings, + ShardSearchRequest shardRequest = new ShardSearchRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), shardIndex, + getNumShards(), filter, indexBoost, timeProvider.getAbsoluteStartMillis(), shardIt.getClusterAlias(), shardIt.getSearchContextId(), shardIt.getSearchContextKeepAlive()); // if we already received a search result we can inform the shard that it // can return a null response if the request rewrites to match none rather diff --git a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index 663cb861cc047..a1dfc602fb916 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -36,7 +36,6 @@ import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.Executor; import java.util.function.BiFunction; import java.util.function.Function; @@ -63,14 +62,13 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction nodeIdToConnection, Map aliasFilter, Map concreteIndexBoosts, - Map> indexRoutings, Executor executor, SearchRequest request, ActionListener listener, GroupShardsIterator shardsIts, TransportSearchAction.SearchTimeProvider timeProvider, ClusterState clusterState, SearchTask task, Function, SearchPhase> phaseFactory, SearchResponse.Clusters clusters) { //We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests - super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings, + super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener, shardsIts, timeProvider, clusterState, task, new CanMatchSearchPhaseResults(shardsIts.size()), shardsIts.size(), clusters); this.phaseFactory = phaseFactory; @@ -86,7 +84,7 @@ public void addReleasable(Releasable releasable) { protected void executePhaseOnShard(SearchShardIterator shardIt, SearchShardTarget shard, SearchActionListener listener) { getSearchTransport().sendCanMatch(getConnection(shard.getClusterAlias(), shard.getNodeId()), - buildShardSearchRequest(shardIt), getTask(), listener); + buildShardSearchRequest(shardIt, listener.requestIndex), getTask(), listener); } @Override @@ -149,7 +147,7 @@ private static Comparator shardComparator(GroupShardsIterator[] minAndMaxes, SortOrder order) { final Comparator comparator = Comparator.comparing(index -> minAndMaxes[index], MinAndMax.getComparator(order)); - return comparator.thenComparing(index -> shardsIts.get(index).shardId()); + return comparator.thenComparing(index -> shardsIts.get(index)); } private static final class CanMatchSearchPhaseResults extends SearchPhaseResults { diff --git a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java index 55d40a023d592..94baf2a379e83 100644 --- a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java @@ -153,7 +153,7 @@ private void innerRun() throws Exception { ShardFetchSearchRequest fetchSearchRequest = createFetchRequest(queryResult.queryResult().getContextId(), i, entry, lastEmittedDocPerShard, searchShardTarget.getOriginalIndices(), queryResult.getShardSearchRequest(), queryResult.getRescoreDocIds()); - executeFetch(i, searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(), + executeFetch(queryResult.getShardIndex(), searchShardTarget, counter, fetchSearchRequest, queryResult.queryResult(), connection); } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java index b53e635d9866a..12d81578e2b56 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java @@ -32,7 +32,6 @@ import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.Executor; import java.util.function.BiFunction; @@ -45,14 +44,14 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction SearchDfsQueryThenFetchAsyncAction(final Logger logger, final SearchTransportService searchTransportService, final BiFunction nodeIdToConnection, final Map aliasFilter, - final Map concreteIndexBoosts, final Map> indexRoutings, + final Map concreteIndexBoosts, final SearchPhaseController searchPhaseController, final Executor executor, final QueryPhaseResultConsumer queryPhaseResultConsumer, final SearchRequest request, final ActionListener listener, final GroupShardsIterator shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider, final ClusterState clusterState, final SearchTask task, SearchResponse.Clusters clusters) { - super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings, + super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener, shardsIts, timeProvider, clusterState, task, new ArraySearchPhaseResults<>(shardsIts.size()), request.getMaxConcurrentShardRequests(), clusters); @@ -68,7 +67,7 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction protected void executePhaseOnShard(final SearchShardIterator shardIt, final SearchShardTarget shard, final SearchActionListener listener) { getSearchTransport().sendExecuteDfs(getConnection(shard.getClusterAlias(), shard.getNodeId()), - buildShardSearchRequest(shardIt) , getTask(), listener); + buildShardSearchRequest(shardIt, listener.requestIndex) , getTask(), listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java index e56100dc5287f..93463921d7e99 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseContext.java @@ -115,8 +115,12 @@ default void sendReleaseSearchContext(ShardSearchContextId contextId, /** * Builds an request for the initial search phase. + * + * @param shardIt the target {@link SearchShardIterator} + * @param shardIndex the index of the shard that is used in the coordinator node to + * tiebreak results with identical sort values */ - ShardSearchRequest buildShardSearchRequest(SearchShardIterator shardIt); + ShardSearchRequest buildShardSearchRequest(SearchShardIterator shardIt, int shardIndex); /** * Processes the phase transition from on phase to another. This method handles all errors that happen during the initial run execution diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index a6c5964c9470c..8f962b0210edc 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -428,9 +428,8 @@ ReducedQueryPhase reducedQueryPhase(Collection quer throw new IllegalStateException(errorMsg); } validateMergeSortValueFormats(queryResults); - final QuerySearchResult firstResult = queryResults.stream().findFirst().get().queryResult(); - final boolean hasSuggest = firstResult.suggest() != null; - final boolean hasProfileResults = firstResult.hasProfileResults(); + final boolean hasSuggest = queryResults.stream().anyMatch(res -> res.queryResult().suggest() != null); + final boolean hasProfileResults = queryResults.stream().anyMatch(res -> res.queryResult().hasProfileResults()); // count the total (we use the query result provider here, since we might not get any hits (we scrolled past them)) final Map> groupedSuggestions = hasSuggest ? new HashMap<>() : Collections.emptyMap(); @@ -438,11 +437,16 @@ ReducedQueryPhase reducedQueryPhase(Collection quer : Collections.emptyMap(); int from = 0; int size = 0; + DocValueFormat[] sortValueFormats = null; for (SearchPhaseResult entry : queryResults) { QuerySearchResult result = entry.queryResult(); from = result.from(); // sorted queries can set the size to 0 if they have enough competitive hits. size = Math.max(result.size(), size); + if (result.sortValueFormats() != null) { + sortValueFormats = result.sortValueFormats(); + } + if (hasSuggest) { assert result.suggest() != null; for (Suggestion> suggestion : result.suggest()) { @@ -477,7 +481,7 @@ ReducedQueryPhase reducedQueryPhase(Collection quer final TotalHits totalHits = topDocsStats.getTotalHits(); return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.getMaxScore(), topDocsStats.timedOut, topDocsStats.terminatedEarly, reducedSuggest, aggregations, shardResults, sortedTopDocs, - firstResult.sortValueFormats(), numReducePhases, size, from, false); + sortValueFormats, numReducePhases, size, from, false); } private static InternalAggregations reduceAggs(InternalAggregation.ReduceContextBuilder aggReduceContextBuilder, diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 79f5e5ca9571e..2ed9d4762800a 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -33,7 +33,6 @@ import org.elasticsearch.transport.Transport; import java.util.Map; -import java.util.Set; import java.util.concurrent.Executor; import java.util.function.BiFunction; @@ -52,14 +51,14 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction nodeIdToConnection, final Map aliasFilter, - final Map concreteIndexBoosts, final Map> indexRoutings, + final Map concreteIndexBoosts, final SearchPhaseController searchPhaseController, final Executor executor, final QueryPhaseResultConsumer resultConsumer, final SearchRequest request, final ActionListener listener, final GroupShardsIterator shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider, ClusterState clusterState, SearchTask task, SearchResponse.Clusters clusters) { - super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings, + super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener, shardsIts, timeProvider, clusterState, task, resultConsumer, request.getMaxConcurrentShardRequests(), clusters); this.topDocsSize = getTopDocsSize(request); @@ -79,7 +78,7 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction listener) { - ShardSearchRequest request = rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt)); + ShardSearchRequest request = rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt, listener.requestIndex)); getSearchTransport().sendExecuteQuery(getConnection(shard.getClusterAlias(), shard.getNodeId()), request, getTask(), listener); } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 4097ddc74d1f4..d818bd5096c53 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -235,11 +235,11 @@ public AbstractSearchAsyncAction asyncSearchAction( SearchTask task, SearchRequest searchRequest, Executor executor, GroupShardsIterator shardsIts, SearchTimeProvider timeProvider, BiFunction connectionLookup, ClusterState clusterState, Map aliasFilter, - Map concreteIndexBoosts, Map> indexRoutings, - ActionListener listener, boolean preFilter, ThreadPool threadPool, SearchResponse.Clusters clusters) { + Map concreteIndexBoosts, ActionListener listener, + boolean preFilter, ThreadPool threadPool, SearchResponse.Clusters clusters) { return new AbstractSearchAsyncAction( actionName, logger, searchTransportService, connectionLookup, aliasFilter, concreteIndexBoosts, - indexRoutings, executor, searchRequest, listener, shardsIts, timeProvider, clusterState, task, + executor, searchRequest, listener, shardsIts, timeProvider, clusterState, task, new ArraySearchPhaseResults<>(shardsIts.size()), 1, clusters) { @Override protected void executePhaseOnShard(SearchShardIterator shardIt, SearchShardTarget shard, @@ -610,13 +610,11 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea // of just for the _search api final List localShardIterators; final Map aliasFilter; - final Map> indexRoutings; final String[] concreteLocalIndices; if (searchContext != null) { assert searchRequest.pointInTimeBuilder() != null; aliasFilter = searchContext.aliasFilter(); - indexRoutings = Collections.emptyMap(); concreteLocalIndices = localIndices == null ? new String[0] : localIndices.indices(); localShardIterators = getLocalLocalShardsIteratorFromPointInTime(clusterState, localIndices, searchRequest.getLocalClusterAlias(), searchContext, searchRequest.pointInTimeBuilder().getKeepAlive()); @@ -638,7 +636,6 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), localIndices)) .collect(Collectors.toList()); aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap); - indexRoutings = routingMap; } final GroupShardsIterator shardIterators = mergeShardsIterators(localShardIterators, remoteShardIterators); @@ -673,7 +670,7 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea localShardIterators.size() + remoteShardIterators.size()); searchAsyncActionProvider.asyncSearchAction( task, searchRequest, asyncSearchExecutor, shardIterators, timeProvider, connectionLookup, clusterState, - Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, indexRoutings, listener, + Collections.unmodifiableMap(aliasFilter), concreteIndexBoosts, listener, preFilterSearchShards, threadPool, clusters).start(); } @@ -747,8 +744,7 @@ AbstractSearchAsyncAction asyncSearchAction( SearchTask task, SearchRequest searchRequest, Executor executor, GroupShardsIterator shardIterators, SearchTimeProvider timeProvider, BiFunction connectionLookup, ClusterState clusterState, Map aliasFilter, Map concreteIndexBoosts, - Map> indexRoutings, ActionListener listener, boolean preFilter, - ThreadPool threadPool, SearchResponse.Clusters clusters); + ActionListener listener, boolean preFilter, ThreadPool threadPool, SearchResponse.Clusters clusters); } private AbstractSearchAsyncAction searchAsyncAction( @@ -761,14 +757,13 @@ private AbstractSearchAsyncAction searchAsyncAction ClusterState clusterState, Map aliasFilter, Map concreteIndexBoosts, - Map> indexRoutings, ActionListener listener, boolean preFilter, ThreadPool threadPool, SearchResponse.Clusters clusters) { if (preFilter) { return new CanMatchPreFilterSearchPhase(logger, searchTransportService, connectionLookup, - aliasFilter, concreteIndexBoosts, indexRoutings, executor, searchRequest, listener, shardIterators, + aliasFilter, concreteIndexBoosts, executor, searchRequest, listener, shardIterators, timeProvider, clusterState, task, (iter) -> { AbstractSearchAsyncAction action = searchAsyncAction( task, @@ -780,7 +775,6 @@ private AbstractSearchAsyncAction searchAsyncAction clusterState, aliasFilter, concreteIndexBoosts, - indexRoutings, listener, false, threadPool, @@ -800,12 +794,12 @@ public void run() { switch (searchRequest.searchType()) { case DFS_QUERY_THEN_FETCH: searchAsyncAction = new SearchDfsQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup, - aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, + aliasFilter, concreteIndexBoosts, searchPhaseController, executor, queryResultConsumer, searchRequest, listener, shardIterators, timeProvider, clusterState, task, clusters); break; case QUERY_THEN_FETCH: searchAsyncAction = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, connectionLookup, - aliasFilter, concreteIndexBoosts, indexRoutings, searchPhaseController, executor, queryResultConsumer, + aliasFilter, concreteIndexBoosts, searchPhaseController, executor, queryResultConsumer, searchRequest, listener, shardIterators, timeProvider, clusterState, task, clusters); break; default: diff --git a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java index 86e2276aa94d8..c542439ebe509 100644 --- a/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java @@ -286,7 +286,7 @@ public Query buildFilteredQuery(Query query) { } if (sliceBuilder != null) { - Query slicedQuery = sliceBuilder.toFilter(clusterService, request, queryShardContext, minNodeVersion); + Query slicedQuery = sliceBuilder.toFilter(request, queryShardContext); if (slicedQuery instanceof MatchNoDocsQuery) { return slicedQuery; } else { diff --git a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java index d7e2a4e01aee7..4d2269ac97093 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchRequest.java @@ -71,6 +71,7 @@ public class ShardSearchRequest extends TransportRequest implements IndicesRequest { private final String clusterAlias; private final ShardId shardId; + private final int shardIndex; private final int numberOfShards; private final SearchType searchType; private final Scroll scroll; @@ -79,8 +80,6 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque private final Boolean requestCache; private final long nowInMillis; private final boolean allowPartialSearchResults; - private final String[] indexRoutings; - private final String preference; private final OriginalIndices originalIndices; private boolean canReturnNullResponseIfMatchNoDocs; @@ -95,29 +94,30 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque public ShardSearchRequest(OriginalIndices originalIndices, SearchRequest searchRequest, ShardId shardId, + int shardIndex, int numberOfShards, AliasFilter aliasFilter, float indexBoost, long nowInMillis, - @Nullable String clusterAlias, - String[] indexRoutings) { - this(originalIndices, searchRequest, shardId, numberOfShards, aliasFilter, - indexBoost, nowInMillis, clusterAlias, indexRoutings, null, null); + @Nullable String clusterAlias) { + this(originalIndices, searchRequest, shardId, shardIndex, numberOfShards, aliasFilter, + indexBoost, nowInMillis, clusterAlias, null, null); } public ShardSearchRequest(OriginalIndices originalIndices, SearchRequest searchRequest, ShardId shardId, + int shardIndex, int numberOfShards, AliasFilter aliasFilter, float indexBoost, long nowInMillis, @Nullable String clusterAlias, - String[] indexRoutings, ShardSearchContextId readerId, TimeValue keepAlive) { this(originalIndices, shardId, + shardIndex, numberOfShards, searchRequest.searchType(), searchRequest.source(), @@ -126,8 +126,6 @@ public ShardSearchRequest(OriginalIndices originalIndices, aliasFilter, indexBoost, searchRequest.allowPartialSearchResults(), - indexRoutings, - searchRequest.preference(), searchRequest.scroll(), nowInMillis, clusterAlias, @@ -142,12 +140,13 @@ public ShardSearchRequest(ShardId shardId, String[] types, long nowInMillis, AliasFilter aliasFilter) { - this(OriginalIndices.NONE, shardId, -1, SearchType.QUERY_THEN_FETCH, null, types, - null, aliasFilter, 1.0f, false, Strings.EMPTY_ARRAY, null, null, nowInMillis, null, null, null); + this(OriginalIndices.NONE, shardId, -1, -1, SearchType.QUERY_THEN_FETCH, null, Strings.EMPTY_ARRAY, null, + aliasFilter, 1.0f, false, null, nowInMillis, null, null, null); } private ShardSearchRequest(OriginalIndices originalIndices, ShardId shardId, + int shardIndex, int numberOfShards, SearchType searchType, SearchSourceBuilder source, @@ -156,14 +155,13 @@ private ShardSearchRequest(OriginalIndices originalIndices, AliasFilter aliasFilter, float indexBoost, boolean allowPartialSearchResults, - String[] indexRoutings, - String preference, Scroll scroll, long nowInMillis, @Nullable String clusterAlias, ShardSearchContextId readerId, TimeValue keepAlive) { this.shardId = shardId; + this.shardIndex = shardIndex; this.numberOfShards = numberOfShards; this.searchType = searchType; this.source = source; @@ -172,8 +170,6 @@ private ShardSearchRequest(OriginalIndices originalIndices, this.aliasFilter = aliasFilter; this.indexBoost = indexBoost; this.allowPartialSearchResults = allowPartialSearchResults; - this.indexRoutings = indexRoutings; - this.preference = preference; this.scroll = scroll; this.nowInMillis = nowInMillis; this.clusterAlias = clusterAlias; @@ -187,6 +183,7 @@ public ShardSearchRequest(StreamInput in) throws IOException { super(in); shardId = new ShardId(in); searchType = SearchType.fromId(in.readByte()); + shardIndex = in.getVersion().onOrAfter(Version.V_7_11_0) ? in.readVInt() : -1; numberOfShards = in.readVInt(); scroll = in.readOptionalWriteable(Scroll::new); source = in.readOptionalWriteable(SearchSourceBuilder::new); @@ -203,12 +200,9 @@ public ShardSearchRequest(StreamInput in) throws IOException { } else { allowPartialSearchResults = false; } - if (in.getVersion().onOrAfter(Version.V_6_4_0)) { - indexRoutings = in.readStringArray(); - preference = in.readOptionalString(); - } else { - indexRoutings = Strings.EMPTY_ARRAY; - preference = null; + if (in.getVersion().before(Version.V_7_11_0)) { + in.readStringArray(); + in.readOptionalString(); } if (in.getVersion().onOrAfter(Version.V_7_7_0)) { canReturnNullResponseIfMatchNoDocs = in.readBoolean(); @@ -230,6 +224,7 @@ public ShardSearchRequest(StreamInput in) throws IOException { public ShardSearchRequest(ShardSearchRequest clone) { this.shardId = clone.shardId; + this.shardIndex = clone.shardIndex; this.searchType = clone.searchType; this.numberOfShards = clone.numberOfShards; this.scroll = clone.scroll; @@ -241,8 +236,6 @@ public ShardSearchRequest(ShardSearchRequest clone) { this.requestCache = clone.requestCache; this.clusterAlias = clone.clusterAlias; this.allowPartialSearchResults = clone.allowPartialSearchResults; - this.indexRoutings = clone.indexRoutings; - this.preference = clone.preference; this.canReturnNullResponseIfMatchNoDocs = clone.canReturnNullResponseIfMatchNoDocs; this.bottomSortValues = clone.bottomSortValues; this.originalIndices = clone.originalIndices; @@ -260,7 +253,10 @@ public void writeTo(StreamOutput out) throws IOException { protected final void innerWriteTo(StreamOutput out, boolean asKey) throws IOException { shardId.writeTo(out); out.writeByte(searchType.id()); - if (!asKey) { + if (asKey == false) { + if (out.getVersion().onOrAfter(Version.V_7_11_0)) { + out.writeVInt(shardIndex); + } out.writeVInt(numberOfShards); } out.writeOptionalWriteable(scroll); @@ -278,11 +274,9 @@ protected final void innerWriteTo(StreamOutput out, boolean asKey) throws IOExce } else if (out.getVersion().onOrAfter(Version.V_6_3_0)) { out.writeOptionalBoolean(allowPartialSearchResults); } - if (asKey == false) { - if (out.getVersion().onOrAfter(Version.V_6_4_0)) { - out.writeStringArray(indexRoutings); - out.writeOptionalString(preference); - } + if (asKey == false && out.getVersion().before(Version.V_7_11_0)) { + out.writeStringArray(Strings.EMPTY_ARRAY); + out.writeOptionalString(null); } if (out.getVersion().onOrAfter(Version.V_7_7_0) && asKey == false) { out.writeBoolean(canReturnNullResponseIfMatchNoDocs); @@ -334,6 +328,13 @@ public void source(SearchSourceBuilder source) { this.source = source; } + /** + * Returns the index of the shard that is used to tiebreak documents with identical sort values. + */ + public int shardIndex() { + return shardIndex; + } + public int numberOfShards() { return numberOfShards; } @@ -362,14 +363,6 @@ public Scroll scroll() { return scroll; } - public String[] indexRoutings() { - return indexRoutings; - } - - public String preference() { - return preference; - } - /** * Sets the bottom sort values that can be used by the searcher to filter documents * that are after it. This value is computed by coordinating nodes that throttles the diff --git a/server/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java b/server/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java index c2325d400d499..bf1da6936d217 100644 --- a/server/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java @@ -23,17 +23,12 @@ import org.apache.lucene.search.MatchNoDocsQuery; import org.apache.lucene.search.Query; import org.elasticsearch.Version; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.ShardIterator; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.logging.DeprecationLogger; -import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -46,21 +41,23 @@ import org.elasticsearch.search.internal.ShardSearchRequest; import java.io.IOException; -import java.util.Collections; -import java.util.Map; import java.util.Objects; -import java.util.Set; /** * A slice builder allowing to split a scroll in multiple partitions. +<<<<<<< HEAD * If the provided field is the "_uid" it uses a {@link org.elasticsearch.search.slice.TermsSliceQuery} * to do the slicing. The slicing is done at the shard level first and then each shard is split into multiple slices. +======= + * If the provided field is the "_id" it uses a {@link TermsSliceQuery} to do the slicing. + * The slicing is done at the shard level first and then each shard is split into multiple slices. +>>>>>>> 9eb9f92b36d... Adds a consistent shard index to ShardSearchRequest (#65706) * For instance if the number of shards is equal to 2 and the user requested 4 slices * then the slices 0 and 2 are assigned to the first shard and the slices 1 and 3 are assigned to the second shard. * This way the total number of bitsets that we need to build on each shard is bounded by the number of slices * (instead of {@code numShards*numSlices}). * Otherwise the provided field must be a numeric and doc_values must be enabled. In that case a - * {@link org.elasticsearch.search.slice.DocValuesSliceQuery} is used to filter the results. + * {@link DocValuesSliceQuery} is used to filter the results. */ public class SliceBuilder implements Writeable, ToXContentObject { @@ -217,44 +214,14 @@ public int hashCode() { * @param context Additional information needed to build the query */ @SuppressWarnings("rawtypes") - public Query toFilter(ClusterService clusterService, ShardSearchRequest request, QueryShardContext context, Version minNodeVersion) { + public Query toFilter(ShardSearchRequest request, QueryShardContext context) { final MappedFieldType type = context.getFieldType(field); if (type == null) { throw new IllegalArgumentException("field " + field + " not found"); } - int shardId = request.shardId().id(); - int numShards = context.getIndexSettings().getNumberOfShards(); - if (minNodeVersion.onOrAfter(Version.V_6_4_0) && - (request.preference() != null || request.indexRoutings().length > 0)) { - GroupShardsIterator group = buildShardIterator(clusterService, request); - assert group.size() <= numShards : "index routing shards: " + group.size() + - " cannot be greater than total number of shards: " + numShards; - if (group.size() < numShards) { - /** - * The routing of this request targets a subset of the shards of this index so we need to we retrieve - * the original {@link GroupShardsIterator} and compute the request shard id and number of - * shards from it. - * This behavior has been added in {@link Version#V_6_4_0} so if there is another node in the cluster - * with an older version we use the original shard id and number of shards in order to ensure that all - * slices use the same numbers. - */ - numShards = group.size(); - int ord = 0; - shardId = -1; - // remap the original shard id with its index (position) in the sorted shard iterator. - for (ShardIterator it : group) { - assert it.shardId().getIndex().equals(request.shardId().getIndex()); - if (request.shardId().equals(it.shardId())) { - shardId = ord; - break; - } - ++ord; - } - assert shardId != -1 : "shard id: " + request.shardId().getId() + " not found in index shard routing"; - } - } - + int shardIndex = request.shardIndex() != -1 ? request.shardIndex() : request.shardId().id(); + int numShards = request.shardIndex() != -1 ? request.numberOfShards() : context.getIndexSettings().getNumberOfShards(); String field = this.field; boolean useTermQuery = false; if ("_uid".equals(field)) { @@ -287,7 +254,7 @@ public Query toFilter(ClusterService clusterService, ShardSearchRequest request, // first we check if the slice is responsible of this shard int targetShard = id % numShards; - if (targetShard != shardId) { + if (targetShard != shardIndex) { // the shard is not part of this slice, we can skip it. return new MatchNoDocsQuery("this shard is not part of the slice"); } @@ -312,7 +279,7 @@ public Query toFilter(ClusterService clusterService, ShardSearchRequest request, // the number of shards is greater than the number of slices // check if the shard is assigned to the slice - int targetSlice = shardId % max; + int targetSlice = shardIndex % max; if (id != targetSlice) { // the shard is not part of this slice, we can skip it. return new MatchNoDocsQuery("this shard is not part of the slice"); @@ -320,17 +287,6 @@ public Query toFilter(ClusterService clusterService, ShardSearchRequest request, return new MatchAllDocsQuery(); } - /** - * Returns the {@link GroupShardsIterator} for the provided request. - */ - private GroupShardsIterator buildShardIterator(ClusterService clusterService, ShardSearchRequest request) { - final ClusterState state = clusterService.state(); - String[] indices = new String[] { request.shardId().getIndex().getName() }; - Map> routingMap = request.indexRoutings().length > 0 ? - Collections.singletonMap(indices[0], Sets.newHashSet(request.indexRoutings())) : null; - return clusterService.operationRouting().searchShards(state, indices, routingMap, request.preference()); - } - @Override public String toString() { return Strings.toString(this, true, true); diff --git a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index 46b1e057c41a2..93df828c2e697 100644 --- a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -26,7 +26,6 @@ import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.Tuple; -import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.index.Index; import org.elasticsearch.index.query.MatchAllQueryBuilder; import org.elasticsearch.index.shard.ShardId; @@ -87,7 +86,7 @@ private AbstractSearchAsyncAction createAction(SearchRequest return new AbstractSearchAsyncAction("test", logger, null, nodeIdToConnection, Collections.singletonMap("foo", new AliasFilter(new MatchAllQueryBuilder())), Collections.singletonMap("foo", 2.0f), - Collections.singletonMap("name", Sets.newHashSet("bar", "baz")), null, request, listener, + null, request, listener, new GroupShardsIterator<>( Collections.singletonList( new SearchShardIterator(null, null, Collections.emptyList(), null) @@ -142,21 +141,19 @@ private void runTestTook(final boolean controlled) { } public void testBuildShardSearchTransportRequest() { - SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(randomBoolean()).preference("_shards:1,3"); + SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(randomBoolean()); final AtomicLong expected = new AtomicLong(); AbstractSearchAsyncAction action = createAction(searchRequest, new ArraySearchPhaseResults<>(10), null, false, expected); String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10); SearchShardIterator iterator = new SearchShardIterator(clusterAlias, new ShardId(new Index("name", "foo"), 1), Collections.emptyList(), new OriginalIndices(new String[] {"name", "name1"}, IndicesOptions.strictExpand())); - ShardSearchRequest shardSearchTransportRequest = action.buildShardSearchRequest(iterator); + ShardSearchRequest shardSearchTransportRequest = action.buildShardSearchRequest(iterator, 10); assertEquals(IndicesOptions.strictExpand(), shardSearchTransportRequest.indicesOptions()); assertArrayEquals(new String[] {"name", "name1"}, shardSearchTransportRequest.indices()); assertEquals(new MatchAllQueryBuilder(), shardSearchTransportRequest.getAliasFilter().getQueryBuilder()); assertEquals(2.0f, shardSearchTransportRequest.indexBoost(), 0.0f); assertArrayEquals(new String[] {"name", "name1"}, shardSearchTransportRequest.indices()); - assertArrayEquals(new String[] {"bar", "baz"}, shardSearchTransportRequest.indexRoutings()); - assertEquals("_shards:1,3", shardSearchTransportRequest.preference()); assertEquals(clusterAlias, shardSearchTransportRequest.getClusterAlias()); } diff --git a/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java b/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java index b7c5bb33740dc..13796ed770b74 100644 --- a/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java @@ -93,7 +93,7 @@ public void sendCanMatch(Transport.Connection connection, ShardSearchRequest req searchTransportService, (clusterAlias, node) -> lookup.get(node), Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), - Collections.emptyMap(), Collections.emptyMap(), EsExecutors.newDirectExecutorService(), + Collections.emptyMap(), EsExecutors.newDirectExecutorService(), searchRequest, null, shardsIter, timeProvider, ClusterState.EMPTY_STATE, null, (iter) -> new SearchPhase("test") { @Override @@ -161,7 +161,7 @@ public void sendCanMatch(Transport.Connection connection, ShardSearchRequest req searchTransportService, (clusterAlias, node) -> lookup.get(node), Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), - Collections.emptyMap(), Collections.emptyMap(), EsExecutors.newDirectExecutorService(), + Collections.emptyMap(), EsExecutors.newDirectExecutorService(), searchRequest, null, shardsIter, timeProvider, ClusterState.EMPTY_STATE, null, (iter) -> new SearchPhase("test") { @Override @@ -223,7 +223,6 @@ public void sendCanMatch( (clusterAlias, node) -> lookup.get(node), Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), Collections.emptyMap(), - Collections.emptyMap(), EsExecutors.newDirectExecutorService(), searchRequest, null, @@ -241,7 +240,6 @@ public void sendCanMatch( }, aliasFilters, Collections.emptyMap(), - Collections.emptyMap(), executor, searchRequest, responseListener, @@ -328,7 +326,7 @@ public void sendCanMatch(Transport.Connection connection, ShardSearchRequest req searchTransportService, (clusterAlias, node) -> lookup.get(node), Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), - Collections.emptyMap(), Collections.emptyMap(), EsExecutors.newDirectExecutorService(), + Collections.emptyMap(), EsExecutors.newDirectExecutorService(), searchRequest, null, shardsIter, timeProvider, ClusterState.EMPTY_STATE, null, (iter) -> new SearchPhase("test") { @Override @@ -405,7 +403,7 @@ public void sendCanMatch(Transport.Connection connection, ShardSearchRequest req searchTransportService, (clusterAlias, node) -> lookup.get(node), Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), - Collections.emptyMap(), Collections.emptyMap(), EsExecutors.newDirectExecutorService(), + Collections.emptyMap(), EsExecutors.newDirectExecutorService(), searchRequest, null, shardsIter, timeProvider, ClusterState.EMPTY_STATE, null, (iter) -> new SearchPhase("test") { @Override diff --git a/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java b/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java index 96e9fe7a61c1f..d37dcf56ba06b 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java +++ b/server/src/test/java/org/elasticsearch/action/search/MockSearchPhaseContext.java @@ -118,7 +118,7 @@ public SearchTransportService getSearchTransport() { } @Override - public ShardSearchRequest buildShardSearchRequest(SearchShardIterator shardIt) { + public ShardSearchRequest buildShardSearchRequest(SearchShardIterator shardIt, int shardIndex) { Assert.fail("should not be called"); return null; } diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index 6bbd94b3427ab..02ba742ef2e94 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -105,7 +105,6 @@ public void testSkipSearchShards() throws InterruptedException { return lookup.get(node); }, aliasFilters, Collections.emptyMap(), - Collections.emptyMap(), null, request, responseListener, @@ -211,7 +210,6 @@ public void testLimitConcurrentShardRequests() throws InterruptedException { return lookup.get(node); }, aliasFilters, Collections.emptyMap(), - Collections.emptyMap(), null, request, responseListener, @@ -314,7 +312,6 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI return lookup.get(node); }, aliasFilters, Collections.emptyMap(), - Collections.emptyMap(), executor, request, responseListener, @@ -424,7 +421,6 @@ public void sendFreeContext(Transport.Connection connection, ShardSearchContextI return lookup.get(node); }, aliasFilters, Collections.emptyMap(), - Collections.emptyMap(), executor, request, responseListener, @@ -525,7 +521,6 @@ public void testAllowPartialResults() throws InterruptedException { return lookup.get(node); }, aliasFilters, Collections.emptyMap(), - Collections.emptyMap(), null, request, responseListener, diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java index 47a1b3f618d42..525ebb494e08b 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncActionTests.java @@ -157,7 +157,7 @@ public void sendExecuteQuery(Transport.Connection connection, ShardSearchRequest SearchQueryThenFetchAsyncAction action = new SearchQueryThenFetchAsyncAction(logger, searchTransportService, (clusterAlias, node) -> lookup.get(node), Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)), - Collections.emptyMap(), Collections.emptyMap(), controller, executor, + Collections.emptyMap(), controller, executor, resultConsumer, searchRequest, null, shardsIter, timeProvider, null, task, SearchResponse.Clusters.EMPTY) { @Override diff --git a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java index 3992b04f40e73..80fbe0398bd2a 100644 --- a/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java +++ b/server/src/test/java/org/elasticsearch/search/DefaultSearchContextTests.java @@ -84,6 +84,8 @@ public void testPreProcess() throws Exception { ShardId shardId = new ShardId("index", UUID.randomUUID().toString(), 1); when(shardSearchRequest.shardId()).thenReturn(shardId); when(shardSearchRequest.types()).thenReturn(new String[]{}); + when(shardSearchRequest.shardIndex()).thenReturn(shardId.id()); + when(shardSearchRequest.numberOfShards()).thenReturn(2); ThreadPool threadPool = new TestThreadPool(this.getClass().getName()); IndexShard indexShard = mock(IndexShard.class); @@ -223,7 +225,6 @@ protected Engine.Searcher acquireSearcherInternal(String source) { when(queryShardContext.getIndexSettings()).thenReturn(indexSettings); when(queryShardContext.getFieldType(anyString())).thenReturn(mock(MappedFieldType.class)); - when(shardSearchRequest.indexRoutings()).thenReturn(new String[0]); readerContext.close(); readerContext = new ReaderContext(newContextId(), indexService, indexShard, diff --git a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java index 6e1db770e239e..702ea562542c8 100644 --- a/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java +++ b/server/src/test/java/org/elasticsearch/search/SearchServiceTests.java @@ -313,8 +313,8 @@ public void onFailure(Exception e) { final boolean useScroll = randomBoolean(); service.executeQueryPhase( new ShardSearchRequest(OriginalIndices.NONE, useScroll ? scrollSearchRequest : searchRequest, - indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null), + indexShard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null), true, new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()), result); SearchPhaseResult searchPhaseResult = result.get(); @@ -381,8 +381,8 @@ public void testSearchWhileIndexDeletedDoesNotLeakSearchContext() throws Executi PlainActionFuture result = new PlainActionFuture<>(); service.executeQueryPhase( new ShardSearchRequest(OriginalIndices.NONE, useScroll ? scrollSearchRequest : searchRequest, - new ShardId(resolveIndex("index"), 0), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null), + new ShardId(resolveIndex("index"), 0), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null), randomBoolean(), new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()), result); try { @@ -412,9 +412,10 @@ public void testTimeout() throws IOException { OriginalIndices.NONE, searchRequest, indexShard.shardId(), + 0, 1, new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, -1, null, null); + 1.0f, -1, null); try (ReaderContext reader = createReaderContext(indexService, indexShard); SearchContext contextWithDefaultTimeout = service.createContext(reader, requestWithDefaultTimeout, null, randomBoolean())) { @@ -428,9 +429,10 @@ public void testTimeout() throws IOException { OriginalIndices.NONE, searchRequest, indexShard.shardId(), + 0, 1, new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, -1, null, null); + 1.0f, -1, null); try (ReaderContext reader = createReaderContext(indexService, indexShard); SearchContext context = service.createContext(reader, requestWithCustomTimeout, null, randomBoolean())) { // the search context should inherit the query timeout @@ -458,8 +460,8 @@ public void testMaxDocvalueFieldsSearch() throws IOException { searchRequest.source(searchSourceBuilder); searchSourceBuilder.docValueField("field1"); - final ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); + final ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null); try (ReaderContext reader = createReaderContext(indexService, indexShard); SearchContext context = service.createContext(reader, request, null, randomBoolean())) { assertNotNull(context); @@ -502,7 +504,7 @@ public void testMaxScriptFieldsSearch() throws IOException { new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap())); } final ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, - indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); + indexShard.shardId(), 0, 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null); try(ReaderContext reader = createReaderContext(indexService, indexShard)) { try (SearchContext context = service.createContext(reader, request, null, randomBoolean())) { @@ -534,8 +536,8 @@ public void testIgnoreScriptfieldIfSizeZero() throws IOException { new Script(ScriptType.INLINE, MockScriptEngine.NAME, CustomScriptPlugin.DUMMY_SCRIPT, Collections.emptyMap())); searchSourceBuilder.size(0); final ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, - searchRequest, indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), - 1.0f, -1, null, null); + searchRequest, indexShard.shardId(), 0, 1, new AliasFilter(null, Strings.EMPTY_ARRAY), + 1.0f, -1, null); try (ReaderContext reader = createReaderContext(indexService, indexShard); SearchContext context = service.createContext(reader, request, null, randomBoolean())) { assertEquals(0, context.scriptFields().fields().size()); @@ -686,7 +688,7 @@ private static class ShardScrollRequestTest extends ShardSearchRequest { ShardScrollRequestTest(ShardId shardId) { super(OriginalIndices.NONE, new SearchRequest().allowPartialSearchResults(true), - shardId, 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null); + shardId, 0, 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null); this.scroll = new Scroll(TimeValue.timeValueMinutes(1)); } @@ -703,47 +705,47 @@ public void testCanMatch() throws Exception { final IndexService indexService = indicesService.indexServiceSafe(resolveIndex("index")); final IndexShard indexShard = indexService.getShard(0); SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true); - assertTrue(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); + assertTrue(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null)).canMatch()); searchRequest.source(new SearchSourceBuilder()); - assertTrue(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); + assertTrue(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null)).canMatch()); searchRequest.source(new SearchSourceBuilder().query(new MatchAllQueryBuilder())); - assertTrue(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); + assertTrue(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null)).canMatch()); searchRequest.source(new SearchSourceBuilder().query(new MatchNoneQueryBuilder()) .aggregation(new TermsAggregationBuilder("test").userValueTypeHint(ValueType.STRING).minDocCount(0))); - assertTrue(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); + assertTrue(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null)).canMatch()); searchRequest.source(new SearchSourceBuilder().query(new MatchNoneQueryBuilder()) .aggregation(new GlobalAggregationBuilder("test"))); - assertTrue(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); + assertTrue(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null)).canMatch()); searchRequest.source(new SearchSourceBuilder().query(new MatchNoneQueryBuilder())); - assertFalse(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); + assertFalse(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null)).canMatch()); assertEquals(0, numWrapInvocations.get()); - ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); + ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null); /* * Checks that canMatch takes into account the alias filter */ // the source cannot be rewritten to a match_none searchRequest.indices("alias").source(new SearchSourceBuilder().query(new MatchAllQueryBuilder())); - assertFalse(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(new TermQueryBuilder("foo", "bar"), "alias"), 1f, -1, null, null)).canMatch()); + assertFalse(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 0, 1, + new AliasFilter(new TermQueryBuilder("foo", "bar"), "alias"), 1f, -1, null)).canMatch()); // the source can match and can be rewritten to a match_none, but not the alias filter final IndexResponse response = client().prepareIndex("index", "_doc", "1").setSource("id", "1").get(); assertEquals(RestStatus.CREATED, response.status()); searchRequest.indices("alias").source(new SearchSourceBuilder().query(new TermQueryBuilder("id", "1"))); - assertFalse(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 1, - new AliasFilter(new TermQueryBuilder("foo", "bar"), "alias"), 1f, -1, null, null)).canMatch()); + assertFalse(service.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), 0, 1, + new AliasFilter(new TermQueryBuilder("foo", "bar"), "alias"), 1f, -1, null)).canMatch()); CountDownLatch latch = new CountDownLatch(1); SearchShardTask task = new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()); @@ -815,8 +817,8 @@ public void testSetSearchThrottled() { iae.getMessage()); assertFalse(service.getIndicesService().indexServiceSafe(index).getIndexSettings().isSearchThrottled()); SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(false); - ShardSearchRequest req = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, new ShardId(index, 0), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null); + ShardSearchRequest req = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, new ShardId(index, 0), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null); Thread currentThread = Thread.currentThread(); // we still make sure can match is executed on the network thread service.canMatch(req, ActionListener.wrap(r -> assertSame(Thread.currentThread(), currentThread), e -> fail("unexpected"))); @@ -872,7 +874,7 @@ public void testCreateSearchContext() throws IOException { SearchRequest searchRequest = new SearchRequest(); searchRequest.allowPartialSearchResults(randomBoolean()); ShardSearchRequest request = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shardId, - indexService.numberOfShards(), AliasFilter.EMPTY, 1f, nowInMillis, clusterAlias, Strings.EMPTY_ARRAY); + 0, indexService.numberOfShards(), AliasFilter.EMPTY, 1f, nowInMillis, clusterAlias); try (DefaultSearchContext searchContext = service.createSearchContext(request, new TimeValue(System.currentTimeMillis()))) { SearchShardTarget searchShardTarget = searchContext.shardTarget(); QueryShardContext queryShardContext = searchContext.getQueryShardContext(); @@ -925,7 +927,7 @@ public void testMatchNoDocsEmptyResponse() throws InterruptedException { .source(new SearchSourceBuilder() .aggregation(AggregationBuilders.count("count").field("value"))); ShardSearchRequest shardRequest = new ShardSearchRequest(OriginalIndices.NONE, searchRequest, indexShard.shardId(), - 5, AliasFilter.EMPTY, 1.0f, 0, null, null); + 0, 5, AliasFilter.EMPTY, 1.0f, 0, null); SearchShardTask task = new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()); { @@ -1062,7 +1064,7 @@ public void testLookUpSearchContext() throws Exception { for (int i = 0; i < numContexts; i++) { ShardSearchRequest request = new ShardSearchRequest( OriginalIndices.NONE, new SearchRequest().allowPartialSearchResults(true), - indexShard.shardId(), 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null); + indexShard.shardId(), 0, 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null); final ReaderContext context = searchService.createAndPutReaderContext(request, indexService, indexShard, indexShard.acquireSearcherSupplier(), randomBoolean()); assertThat(context.id().getId(), equalTo((long) (i + 1))); diff --git a/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java b/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java index 5e105b9a02f32..e8cf74e89fbb0 100644 --- a/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java +++ b/server/src/test/java/org/elasticsearch/search/internal/ShardSearchRequestTests.java @@ -99,7 +99,6 @@ private ShardSearchRequest createShardSearchRequest() throws IOException { } else { filteringAliases = new AliasFilter(null, Strings.EMPTY_ARRAY); } - final String[] routings = generateRandomStringArray(5, 10, false, true); ShardSearchContextId shardSearchContextId = null; TimeValue keepAlive = null; if (randomBoolean()) { @@ -108,9 +107,10 @@ private ShardSearchRequest createShardSearchRequest() throws IOException { keepAlive = TimeValue.timeValueSeconds(randomIntBetween(0, 120)); } } + int numberOfShards = randomIntBetween(1, 100); ShardSearchRequest req = new ShardSearchRequest(new OriginalIndices(searchRequest), searchRequest, shardId, - randomIntBetween(1, 100), filteringAliases, randomBoolean() ? 1.0f : randomFloat(), - Math.abs(randomLong()), randomAlphaOfLengthBetween(3, 10), routings, shardSearchContextId, keepAlive); + randomIntBetween(1, numberOfShards), numberOfShards, filteringAliases, randomBoolean() ? 1.0f : randomFloat(), + Math.abs(randomLong()), randomAlphaOfLengthBetween(3, 10), shardSearchContextId, keepAlive); req.canReturnNullResponseIfMatchNoDocs(randomBoolean()); if (randomBoolean()) { req.setBottomSortValues(SearchSortValuesAndFormatsTests.randomInstance()); @@ -174,8 +174,6 @@ private static void assertEquals(ShardSearchRequest orig, ShardSearchRequest cop assertEquals(orig.searchType(), copy.searchType()); assertEquals(orig.shardId(), copy.shardId()); assertEquals(orig.numberOfShards(), copy.numberOfShards()); - assertArrayEquals(orig.indexRoutings(), copy.indexRoutings()); - assertEquals(orig.preference(), copy.preference()); assertEquals(orig.cacheKey(), copy.cacheKey()); assertNotSame(orig, copy); assertEquals(orig.getAliasFilter(), copy.getAliasFilter()); diff --git a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java index 2c25ba34a7701..ec07f74b7e70c 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java @@ -64,7 +64,7 @@ private static QuerySearchResult createTestInstance() throws Exception { ShardId shardId = new ShardId("index", "uuid", randomInt()); SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(randomBoolean()); ShardSearchRequest shardSearchRequest = new ShardSearchRequest(OriginalIndicesTests.randomOriginalIndices(), searchRequest, - shardId, 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, randomNonNegativeLong(), null, new String[0]); + shardId, 0, 1, new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, randomNonNegativeLong(), null); QuerySearchResult result = new QuerySearchResult(new ShardSearchContextId(UUIDs.base64UUID(), randomLong()), new SearchShardTarget("node", shardId, null, OriginalIndices.NONE), shardSearchRequest); if (randomBoolean()) { diff --git a/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java b/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java index 9424804920c8b..12dc7bbc29e0c 100644 --- a/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java +++ b/server/src/test/java/org/elasticsearch/search/slice/SliceBuilderTests.java @@ -32,16 +32,8 @@ import org.elasticsearch.Version; import org.elasticsearch.action.OriginalIndices; import org.elasticsearch.action.search.SearchRequest; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; -import org.elasticsearch.cluster.metadata.Metadata; -import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.cluster.routing.OperationRouting; -import org.elasticsearch.cluster.routing.PlainShardIterator; -import org.elasticsearch.cluster.routing.ShardIterator; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -72,7 +64,6 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; -import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -92,34 +83,33 @@ private static SliceBuilder serializedCopy(SliceBuilder original) throws IOExcep private static SliceBuilder mutate(SliceBuilder original) { switch (randomIntBetween(0, 2)) { - case 0: return new SliceBuilder(original.getField() + "_xyz", original.getId(), original.getMax()); - case 1: return new SliceBuilder(original.getField(), original.getId() - 1, original.getMax()); + case 0: + return new SliceBuilder(original.getField() + "_xyz", original.getId(), original.getMax()); + case 1: + return new SliceBuilder(original.getField(), original.getId() - 1, original.getMax()); case 2: - default: return new SliceBuilder(original.getField(), original.getId(), original.getMax() + 1); + default: + return new SliceBuilder(original.getField(), original.getId(), original.getMax() + 1); } } - private IndexSettings createIndexSettings(Version indexVersionCreated, int numShards) { + private IndexSettings createIndexSettings(Version indexVersionCreated) { Settings settings = Settings.builder() .put(IndexMetadata.SETTING_VERSION_CREATED, indexVersionCreated) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, numShards) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .build(); IndexMetadata indexState = IndexMetadata.builder("index").settings(settings).build(); return new IndexSettings(indexState, Settings.EMPTY); } - private ShardSearchRequest createRequest(int shardId) { - return createRequest(shardId, Strings.EMPTY_ARRAY, null); - } - - private ShardSearchRequest createRequest(int shardId, String[] routings, String preference) { - return new ShardSearchRequest(OriginalIndices.NONE, new SearchRequest().preference(preference).allowPartialSearchResults(true), - new ShardId("index", "index", shardId), 1, null, 0f, System.currentTimeMillis(), null, routings); + private ShardSearchRequest createRequest(int shardIndex, int numShards) { + return new ShardSearchRequest(OriginalIndices.NONE, new SearchRequest().allowPartialSearchResults(true), + new ShardId("index", "index", 0), shardIndex, numShards, null, 0f, System.currentTimeMillis(), null); } private QueryShardContext createShardContext(Version indexVersionCreated, IndexReader reader, - String fieldName, DocValuesType dvType, int numShards, int shardId) { + String fieldName, DocValuesType dvType) { MappedFieldType fieldType = new MappedFieldType(fieldName, true, false, dvType != null, TextSearchInfo.NONE, Collections.emptyMap()) { @@ -145,8 +135,7 @@ public Query existsQuery(QueryShardContext context) { QueryShardContext context = mock(QueryShardContext.class); when(context.getFieldType(fieldName)).thenReturn(fieldType); when(context.getIndexReader()).thenReturn(reader); - when(context.getShardId()).thenReturn(shardId); - IndexSettings indexSettings = createIndexSettings(indexVersionCreated, numShards); + IndexSettings indexSettings = createIndexSettings(indexVersionCreated); when(context.getIndexSettings()).thenReturn(indexSettings); if (dvType != null) { IndexNumericFieldData fd = mock(IndexNumericFieldData.class); @@ -211,15 +200,15 @@ public void testToFilterSimple() throws IOException { } try (IndexReader reader = DirectoryReader.open(dir)) { QueryShardContext context = - createShardContext(Version.CURRENT, reader, "_id", DocValuesType.SORTED_NUMERIC, 1,0); + createShardContext(Version.CURRENT, reader, "_id", null); SliceBuilder builder = new SliceBuilder(5, 10); - Query query = builder.toFilter(null, createRequest(0), context, Version.CURRENT); + Query query = builder.toFilter(createRequest(0, 1), context); assertThat(query, instanceOf(TermsSliceQuery.class)); - assertThat(builder.toFilter(null, createRequest(0), context, Version.CURRENT), equalTo(query)); + assertThat(builder.toFilter(createRequest(0, 1), context), equalTo(query)); try (IndexReader newReader = DirectoryReader.open(dir)) { when(context.getIndexReader()).thenReturn(newReader); - assertThat(builder.toFilter(null, createRequest(0), context, Version.CURRENT), equalTo(query)); + assertThat(builder.toFilter(createRequest(0, 1), context), equalTo(query)); } } } @@ -231,14 +220,14 @@ public void testToFilterRandom() throws IOException { } try (IndexReader reader = DirectoryReader.open(dir)) { QueryShardContext context = - createShardContext(Version.CURRENT, reader, "field", DocValuesType.SORTED_NUMERIC, 1,0); + createShardContext(Version.CURRENT, reader, "field", DocValuesType.SORTED_NUMERIC); SliceBuilder builder = new SliceBuilder("field", 5, 10); - Query query = builder.toFilter(null, createRequest(0), context, Version.CURRENT); + Query query = builder.toFilter(createRequest(0, 1), context); assertThat(query, instanceOf(DocValuesSliceQuery.class)); - assertThat(builder.toFilter(null, createRequest(0), context, Version.CURRENT), equalTo(query)); + assertThat(builder.toFilter(createRequest(0, 1), context), equalTo(query)); try (IndexReader newReader = DirectoryReader.open(dir)) { when(context.getIndexReader()).thenReturn(newReader); - assertThat(builder.toFilter(null, createRequest(0), context, Version.CURRENT), equalTo(query)); + assertThat(builder.toFilter(createRequest(0, 1), context), equalTo(query)); } // numSlices > numShards @@ -248,8 +237,8 @@ public void testToFilterRandom() throws IOException { for (int i = 0; i < numSlices; i++) { for (int j = 0; j < numShards; j++) { SliceBuilder slice = new SliceBuilder("_id", i, numSlices); - context = createShardContext(Version.CURRENT, reader, "_id", DocValuesType.SORTED, numShards, j); - Query q = slice.toFilter(null, createRequest(j), context, Version.CURRENT); + context = createShardContext(Version.CURRENT, reader, "_id", null); + Query q = slice.toFilter(createRequest(j, numShards), context); if (q instanceof TermsSliceQuery || q instanceof MatchAllDocsQuery) { AtomicInteger count = numSliceMap.get(j); if (count == null) { @@ -278,8 +267,8 @@ public void testToFilterRandom() throws IOException { for (int i = 0; i < numSlices; i++) { for (int j = 0; j < numShards; j++) { SliceBuilder slice = new SliceBuilder("_id", i, numSlices); - context = createShardContext(Version.CURRENT, reader, "_id", DocValuesType.SORTED, numShards, j); - Query q = slice.toFilter(null, createRequest(j), context, Version.CURRENT); + context = createShardContext(Version.CURRENT, reader, "_id", null); + Query q = slice.toFilter(createRequest(j, numShards), context); if (q instanceof MatchNoDocsQuery == false) { assertThat(q, instanceOf(MatchAllDocsQuery.class)); targetShards.add(j); @@ -295,8 +284,8 @@ public void testToFilterRandom() throws IOException { for (int i = 0; i < numSlices; i++) { for (int j = 0; j < numShards; j++) { SliceBuilder slice = new SliceBuilder("_id", i, numSlices); - context = createShardContext(Version.CURRENT, reader, "_id", DocValuesType.SORTED, numShards, j); - Query q = slice.toFilter(null, createRequest(j), context, Version.CURRENT); + context = createShardContext(Version.CURRENT, reader, "_id", null); + Query q = slice.toFilter(createRequest(j, numShards), context); if (i == j) { assertThat(q, instanceOf(MatchAllDocsQuery.class)); } else { @@ -313,10 +302,10 @@ public void testInvalidField() throws IOException { writer.commit(); } try (IndexReader reader = DirectoryReader.open(dir)) { - QueryShardContext context = createShardContext(Version.CURRENT, reader, "field", null, 1,0); + QueryShardContext context = createShardContext(Version.CURRENT, reader, "field", null); SliceBuilder builder = new SliceBuilder("field", 5, 10); IllegalArgumentException exc = expectThrows(IllegalArgumentException.class, - () -> builder.toFilter(null, createRequest(0), context, Version.CURRENT)); + () -> builder.toFilter(createRequest(0, 1), context)); assertThat(exc.getMessage(), containsString("cannot load numeric doc values")); } } @@ -327,11 +316,11 @@ public void testToFilterDeprecationMessage() throws IOException { writer.commit(); } try (IndexReader reader = DirectoryReader.open(dir)) { - QueryShardContext context = createShardContext(Version.V_6_3_0, reader, "_uid", null, 1,0); + QueryShardContext context = createShardContext(Version.V_6_3_0, reader, "_uid", null); SliceBuilder builder = new SliceBuilder("_uid", 5, 10); - Query query = builder.toFilter(null, createRequest(0), context, Version.CURRENT); + Query query = builder.toFilter(createRequest(0, 1), context); assertThat(query, instanceOf(TermsSliceQuery.class)); - assertThat(builder.toFilter(null, createRequest(0), context, Version.CURRENT), equalTo(query)); + assertThat(builder.toFilter(createRequest(0, 1), context), equalTo(query)); assertWarnings("Computing slices on the [_uid] field is deprecated for 6.x indices, use [_id] instead"); } } @@ -341,44 +330,13 @@ public void testSerializationBackcompat() throws IOException { assertEquals(IdFieldMapper.NAME, sliceBuilder.getField()); SliceBuilder copy62 = copyWriteable(sliceBuilder, - new NamedWriteableRegistry(Collections.emptyList()), - SliceBuilder::new, Version.V_6_2_0); + new NamedWriteableRegistry(Collections.emptyList()), + SliceBuilder::new, Version.V_6_2_0); assertEquals(sliceBuilder, copy62); SliceBuilder copy63 = copyWriteable(copy62, - new NamedWriteableRegistry(Collections.emptyList()), - SliceBuilder::new, Version.V_6_3_0); + new NamedWriteableRegistry(Collections.emptyList()), + SliceBuilder::new, Version.V_6_3_0); assertEquals(sliceBuilder, copy63); } - - public void testToFilterWithRouting() throws IOException { - Directory dir = new ByteBuffersDirectory(); - try (IndexWriter writer = new IndexWriter(dir, newIndexWriterConfig(new MockAnalyzer(random())))) { - writer.commit(); - } - ClusterService clusterService = mock(ClusterService.class); - ClusterState state = mock(ClusterState.class); - when(state.metadata()).thenReturn(Metadata.EMPTY_METADATA); - when(clusterService.state()).thenReturn(state); - OperationRouting routing = mock(OperationRouting.class); - GroupShardsIterator it = new GroupShardsIterator<>( - Collections.singletonList( - new PlainShardIterator(new ShardId("index", "index", 1), Collections.emptyList()) - ) - ); - when(routing.searchShards(any(), any(), any(), any())).thenReturn(it); - when(clusterService.operationRouting()).thenReturn(routing); - when(clusterService.getSettings()).thenReturn(Settings.EMPTY); - try (IndexReader reader = DirectoryReader.open(dir)) { - QueryShardContext context = createShardContext(Version.CURRENT, reader, "field", DocValuesType.SORTED, 5, 0); - SliceBuilder builder = new SliceBuilder("field", 6, 10); - String[] routings = new String[] { "foo" }; - Query query = builder.toFilter(clusterService, createRequest(1, routings, null), context, Version.CURRENT); - assertEquals(new DocValuesSliceQuery("field", 6, 10), query); - query = builder.toFilter(clusterService, createRequest(1, Strings.EMPTY_ARRAY, "foo"), context, Version.CURRENT); - assertEquals(new DocValuesSliceQuery("field", 6, 10), query); - query = builder.toFilter(clusterService, createRequest(1, Strings.EMPTY_ARRAY, "foo"), context, Version.V_6_2_0); - assertEquals(new DocValuesSliceQuery("field", 1, 2), query); - } - } } diff --git a/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java b/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java index 5a4e8dcf63a90..26f1f7669517a 100644 --- a/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java +++ b/x-pack/plugin/frozen-indices/src/internalClusterTest/java/org/elasticsearch/index/engine/FrozenIndexTests.java @@ -306,18 +306,18 @@ public void testCanMatch() throws IOException, ExecutionException, InterruptedEx assertFalse(indexService.getIndexSettings().isSearchThrottled()); SearchService searchService = getInstanceFromNode(SearchService.class); SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true); - assertTrue(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); + assertTrue(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null)).canMatch()); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); searchRequest.source(sourceBuilder); sourceBuilder.query(QueryBuilders.rangeQuery("field").gte("2010-01-03||+2d").lte("2010-01-04||+2d/d")); - assertTrue(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); + assertTrue(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null)).canMatch()); sourceBuilder.query(QueryBuilders.rangeQuery("field").gt("2010-01-06T02:00").lt("2010-01-07T02:00")); - assertFalse(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); + assertFalse(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null)).canMatch()); } XPackClient xPackClient = new XPackClient(client()); @@ -331,18 +331,18 @@ public void testCanMatch() throws IOException, ExecutionException, InterruptedEx assertTrue(indexService.getIndexSettings().isSearchThrottled()); SearchService searchService = getInstanceFromNode(SearchService.class); SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true); - assertTrue(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); + assertTrue(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null)).canMatch()); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(QueryBuilders.rangeQuery("field").gte("2010-01-03||+2d").lte("2010-01-04||+2d/d")); searchRequest.source(sourceBuilder); - assertTrue(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); + assertTrue(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null)).canMatch()); sourceBuilder.query(QueryBuilders.rangeQuery("field").gt("2010-01-06T02:00").lt("2010-01-07T02:00")); - assertFalse(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 1, - new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null, null)).canMatch()); + assertFalse(searchService.canMatch(new ShardSearchRequest(OriginalIndices.NONE, searchRequest, shard.shardId(), 0, 1, + new AliasFilter(null, Strings.EMPTY_ARRAY), 1f, -1, null)).canMatch()); IndicesStatsResponse response = client().admin().indices().prepareStats("index").clear().setRefresh(true).get(); assertEquals(0, response.getTotal().refresh.getTotal()); From 5d63e834e949cb17a3425434f5ffc2b1f76c062e Mon Sep 17 00:00:00 2001 From: jimczi Date: Mon, 7 Dec 2020 11:35:27 +0100 Subject: [PATCH 2/2] Fix javadocs conflict --- .../java/org/elasticsearch/search/slice/SliceBuilder.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java b/server/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java index bf1da6936d217..51d3e9085ef44 100644 --- a/server/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/slice/SliceBuilder.java @@ -45,13 +45,8 @@ /** * A slice builder allowing to split a scroll in multiple partitions. -<<<<<<< HEAD - * If the provided field is the "_uid" it uses a {@link org.elasticsearch.search.slice.TermsSliceQuery} + * If the provided field is the "_uid" it uses a {@link TermsSliceQuery} * to do the slicing. The slicing is done at the shard level first and then each shard is split into multiple slices. -======= - * If the provided field is the "_id" it uses a {@link TermsSliceQuery} to do the slicing. - * The slicing is done at the shard level first and then each shard is split into multiple slices. ->>>>>>> 9eb9f92b36d... Adds a consistent shard index to ShardSearchRequest (#65706) * For instance if the number of shards is equal to 2 and the user requested 4 slices * then the slices 0 and 2 are assigned to the first shard and the slices 1 and 3 are assigned to the second shard. * This way the total number of bitsets that we need to build on each shard is bounded by the number of slices