Skip to content

Commit e81804c

Browse files
authored
Add a shard filter search phase to pre-filter shards based on query rewriting (#25658)
Today if we search across a large amount of shards we hit every shard. Yet, it's quite common to search across an index pattern for time based indices but filtering will exclude all results outside a certain time range ie. `now-3d`. While the search can potentially hit hundreds of shards the majority of the shards might yield 0 results since there is not document that is within this date range. Kibana for instance does this regularly but used `_field_stats` to optimize the indexes they need to query. Now with the deprecation of `_field_stats` and it's upcoming removal a single dashboard in kibana can potentially turn into searches hitting hundreds or thousands of shards and that can easily cause search rejections even though the most of the requests are very likely super cheap and only need a query rewriting to early terminate with 0 results. This change adds a pre-filter phase for searches that can, if the number of shards are higher than a the `pre_filter_shard_size` threshold (defaults to 128 shards), fan out to the shards and check if the query can potentially match any documents at all. While false positives are possible, a negative response means that no matches are possible. These requests are not subject to rejection and can greatly reduce the number of shards a request needs to hit. The approach here is preferable to the kibana approach with field stats since it correctly handles aliases and uses the correct threadpools to execute these requests. Further it's completely transparent to the user and improves scalability of elasticsearch in general on large clusters.
1 parent 86e9438 commit e81804c

File tree

54 files changed

+1114
-122
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+1114
-122
lines changed

client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/TransportNoopSearchAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,6 @@ protected void doExecute(SearchRequest request, ActionListener<SearchResponse> l
5353
new SearchHit[0], 0L, 0.0f),
5454
new InternalAggregations(Collections.emptyList()),
5555
new Suggest(Collections.emptyList()),
56-
new SearchProfileShardResults(Collections.emptyMap()), false, false, 1), "", 1, 1, 0, new ShardSearchFailure[0]));
56+
new SearchProfileShardResults(Collections.emptyMap()), false, false, 1), "", 1, 1, 0, 0, new ShardSearchFailure[0]));
5757
}
5858
}

client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ public void testInfo() throws IOException {
153153
public void testSearchScroll() throws IOException {
154154
Header[] headers = randomHeaders(random(), "Header");
155155
SearchResponse mockSearchResponse = new SearchResponse(new SearchResponseSections(SearchHits.empty(), InternalAggregations.EMPTY,
156-
null, false, false, null, 1), randomAlphaOfLengthBetween(5, 10), 5, 5, 100, new ShardSearchFailure[0]);
156+
null, false, false, null, 1), randomAlphaOfLengthBetween(5, 10), 5, 5, 0, 100, new ShardSearchFailure[0]);
157157
mockResponse(mockSearchResponse);
158158
SearchResponse searchResponse = restHighLevelClient.searchScroll(new SearchScrollRequest(randomAlphaOfLengthBetween(5, 10)),
159159
headers);

core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 33 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.elasticsearch.cluster.routing.GroupShardsIterator;
3232
import org.elasticsearch.common.Nullable;
3333
import org.elasticsearch.common.util.concurrent.AtomicArray;
34-
import org.elasticsearch.index.shard.ShardId;
3534
import org.elasticsearch.search.SearchPhaseResult;
3635
import org.elasticsearch.search.SearchShardTarget;
3736
import org.elasticsearch.search.internal.AliasFilter;
@@ -67,6 +66,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
6766
private final SetOnce<AtomicArray<ShardSearchFailure>> shardFailures = new SetOnce<>();
6867
private final Object shardFailuresMutex = new Object();
6968
private final AtomicInteger successfulOps = new AtomicInteger();
69+
private final AtomicInteger skippedOps = new AtomicInteger();
7070
private final TransportSearchAction.SearchTimeProvider timeProvider;
7171

7272

@@ -107,7 +107,7 @@ public final void start() {
107107
if (getNumShards() == 0) {
108108
//no search shards to search on, bail with empty response
109109
//(it happens with search across _all with no indices around and consistent with broadcast operations)
110-
listener.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, buildTookInMillis(),
110+
listener.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, 0, buildTookInMillis(),
111111
ShardSearchFailure.EMPTY_ARRAY));
112112
return;
113113
}
@@ -169,35 +169,35 @@ private ShardSearchFailure[] buildShardFailures() {
169169

170170
public final void onShardFailure(final int shardIndex, @Nullable SearchShardTarget shardTarget, Exception e) {
171171
// we don't aggregate shard failures on non active shards (but do keep the header counts right)
172-
if (TransportActions.isShardNotAvailableException(e)) {
173-
return;
174-
}
175-
AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();
176-
// lazily create shard failures, so we can early build the empty shard failure list in most cases (no failures)
177-
if (shardFailures == null) { // this is double checked locking but it's fine since SetOnce uses a volatile read internally
178-
synchronized (shardFailuresMutex) {
179-
shardFailures = this.shardFailures.get(); // read again otherwise somebody else has created it?
180-
if (shardFailures == null) { // still null so we are the first and create a new instance
181-
shardFailures = new AtomicArray<>(getNumShards());
182-
this.shardFailures.set(shardFailures);
172+
if (TransportActions.isShardNotAvailableException(e) == false) {
173+
AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();
174+
// lazily create shard failures, so we can early build the empty shard failure list in most cases (no failures)
175+
if (shardFailures == null) { // this is double checked locking but it's fine since SetOnce uses a volatile read internally
176+
synchronized (shardFailuresMutex) {
177+
shardFailures = this.shardFailures.get(); // read again otherwise somebody else has created it?
178+
if (shardFailures == null) { // still null so we are the first and create a new instance
179+
shardFailures = new AtomicArray<>(getNumShards());
180+
this.shardFailures.set(shardFailures);
181+
}
183182
}
184183
}
185-
}
186-
ShardSearchFailure failure = shardFailures.get(shardIndex);
187-
if (failure == null) {
188-
shardFailures.set(shardIndex, new ShardSearchFailure(e, shardTarget));
189-
} else {
190-
// the failure is already present, try and not override it with an exception that is less meaningless
191-
// for example, getting illegal shard state
192-
if (TransportActions.isReadOverrideException(e)) {
184+
ShardSearchFailure failure = shardFailures.get(shardIndex);
185+
if (failure == null) {
193186
shardFailures.set(shardIndex, new ShardSearchFailure(e, shardTarget));
187+
} else {
188+
// the failure is already present, try and not override it with an exception that is less meaningless
189+
// for example, getting illegal shard state
190+
if (TransportActions.isReadOverrideException(e)) {
191+
shardFailures.set(shardIndex, new ShardSearchFailure(e, shardTarget));
192+
}
194193
}
195-
}
196194

197-
if (results.hasResult(shardIndex)) {
198-
assert failure == null : "shard failed before but shouldn't: " + failure;
199-
successfulOps.decrementAndGet(); // if this shard was successful before (initial phase) we have to adjust the counter
195+
if (results.hasResult(shardIndex)) {
196+
assert failure == null : "shard failed before but shouldn't: " + failure;
197+
successfulOps.decrementAndGet(); // if this shard was successful before (initial phase) we have to adjust the counter
198+
}
200199
}
200+
results.consumeShardFailure(shardIndex);
201201
}
202202

203203
/**
@@ -264,7 +264,7 @@ public final SearchRequest getRequest() {
264264
@Override
265265
public final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {
266266
return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(),
267-
buildTookInMillis(), buildShardFailures());
267+
skippedOps.get(), buildTookInMillis(), buildShardFailures());
268268
}
269269

270270
@Override
@@ -313,4 +313,11 @@ public final ShardSearchTransportRequest buildShardSearchRequest(SearchShardIter
313313
* @param context the search context for the next phase
314314
*/
315315
protected abstract SearchPhase getNextPhase(SearchPhaseResults<Result> results, SearchPhaseContext context);
316+
317+
@Override
318+
protected void skipShard(SearchShardIterator iterator) {
319+
super.skipShard(iterator);
320+
successfulOps.incrementAndGet();
321+
skippedOps.incrementAndGet();
322+
}
316323
}
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.action.search;
20+
21+
import org.apache.logging.log4j.Logger;
22+
import org.apache.lucene.util.FixedBitSet;
23+
import org.elasticsearch.action.ActionListener;
24+
import org.elasticsearch.cluster.routing.GroupShardsIterator;
25+
import org.elasticsearch.cluster.routing.ShardRouting;
26+
import org.elasticsearch.search.internal.AliasFilter;
27+
import org.elasticsearch.transport.Transport;
28+
29+
import java.util.ArrayList;
30+
import java.util.Collections;
31+
import java.util.Iterator;
32+
import java.util.List;
33+
import java.util.Map;
34+
import java.util.concurrent.Executor;
35+
import java.util.function.BiFunction;
36+
import java.util.function.Function;
37+
import java.util.stream.Stream;
38+
39+
/**
40+
* This search phrase can be used as an initial search phase to pre-filter search shards based on query rewriting.
41+
* The queries are rewritten against the shards and based on the rewrite result shards might be able to be excluded
42+
* from the search. The extra round trip to the search shards is very cheap and is not subject to rejections
43+
* which allows to fan out to more shards at the same time without running into rejections even if we are hitting a
44+
* large portion of the clusters indices.
45+
*/
46+
final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<SearchTransportService.CanMatchResponse> {
47+
48+
private final Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory;
49+
private final GroupShardsIterator<SearchShardIterator> shardsIts;
50+
51+
CanMatchPreFilterSearchPhase(Logger logger, SearchTransportService searchTransportService,
52+
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
53+
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
54+
Executor executor, SearchRequest request,
55+
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
56+
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
57+
SearchTask task, Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory) {
58+
super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request,
59+
listener,
60+
shardsIts, timeProvider, clusterStateVersion, task, new BitSetSearchPhaseResults(shardsIts.size()));
61+
this.phaseFactory = phaseFactory;
62+
this.shardsIts = shardsIts;
63+
}
64+
65+
@Override
66+
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
67+
SearchActionListener<SearchTransportService.CanMatchResponse> listener) {
68+
getSearchTransport().sendCanMatch(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
69+
buildShardSearchRequest(shardIt), getTask(), listener);
70+
}
71+
72+
@Override
73+
protected SearchPhase getNextPhase(SearchPhaseResults<SearchTransportService.CanMatchResponse> results,
74+
SearchPhaseContext context) {
75+
76+
return phaseFactory.apply(getIterator((BitSetSearchPhaseResults) results, shardsIts));
77+
}
78+
79+
private GroupShardsIterator<SearchShardIterator> getIterator(BitSetSearchPhaseResults results,
80+
GroupShardsIterator<SearchShardIterator> shardsIts) {
81+
int cardinality = results.getNumPossibleMatches();
82+
FixedBitSet possibleMatches = results.getPossibleMatches();
83+
if (cardinality == 0) {
84+
// this is a special case where we have no hit but we need to get at least one search response in order
85+
// to produce a valid search result with all the aggs etc.
86+
possibleMatches.set(0);
87+
}
88+
int i = 0;
89+
for (SearchShardIterator iter : shardsIts) {
90+
if (possibleMatches.get(i++)) {
91+
iter.reset();
92+
} else {
93+
iter.resetAndSkip();
94+
}
95+
}
96+
return shardsIts;
97+
}
98+
99+
private static final class BitSetSearchPhaseResults extends InitialSearchPhase.
100+
SearchPhaseResults<SearchTransportService.CanMatchResponse> {
101+
102+
private final FixedBitSet possibleMatches;
103+
private int numPossibleMatches;
104+
105+
BitSetSearchPhaseResults(int size) {
106+
super(size);
107+
possibleMatches = new FixedBitSet(size);
108+
}
109+
110+
@Override
111+
void consumeResult(SearchTransportService.CanMatchResponse result) {
112+
if (result.canMatch()) {
113+
consumeShardFailure(result.getShardIndex());
114+
}
115+
}
116+
117+
@Override
118+
boolean hasResult(int shardIndex) {
119+
return false; // unneeded
120+
}
121+
122+
@Override
123+
synchronized void consumeShardFailure(int shardIndex) {
124+
// we have to carry over shard failures in order to account for them in the response.
125+
possibleMatches.set(shardIndex);
126+
numPossibleMatches++;
127+
}
128+
129+
130+
synchronized int getNumPossibleMatches() {
131+
return numPossibleMatches;
132+
}
133+
134+
synchronized FixedBitSet getPossibleMatches() {
135+
return possibleMatches;
136+
}
137+
138+
@Override
139+
Stream<SearchTransportService.CanMatchResponse> getSuccessfulResults() {
140+
return Stream.empty();
141+
}
142+
}
143+
}

core/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,16 @@
4141
* @see CountedCollector#onFailure(int, SearchShardTarget, Exception)
4242
*/
4343
final class DfsQueryPhase extends SearchPhase {
44-
private final InitialSearchPhase.SearchPhaseResults<SearchPhaseResult> queryResult;
44+
private final InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> queryResult;
4545
private final SearchPhaseController searchPhaseController;
4646
private final AtomicArray<DfsSearchResult> dfsSearchResults;
47-
private final Function<InitialSearchPhase.SearchPhaseResults<SearchPhaseResult>, SearchPhase> nextPhaseFactory;
47+
private final Function<InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult>, SearchPhase> nextPhaseFactory;
4848
private final SearchPhaseContext context;
4949
private final SearchTransportService searchTransportService;
5050

5151
DfsQueryPhase(AtomicArray<DfsSearchResult> dfsSearchResults,
5252
SearchPhaseController searchPhaseController,
53-
Function<InitialSearchPhase.SearchPhaseResults<SearchPhaseResult>, SearchPhase> nextPhaseFactory,
53+
Function<InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult>, SearchPhase> nextPhaseFactory,
5454
SearchPhaseContext context) {
5555
super("dfs_query");
5656
this.queryResult = searchPhaseController.newSearchPhaseResults(context.getRequest(), context.getNumShards());

core/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ final class FetchSearchPhase extends SearchPhase {
6969
}
7070
this.fetchResults = new AtomicArray<>(resultConsumer.getNumShards());
7171
this.searchPhaseController = searchPhaseController;
72-
this.queryResults = resultConsumer.results;
72+
this.queryResults = resultConsumer.getAtomicArray();
7373
this.nextPhaseFactory = nextPhaseFactory;
7474
this.context = context;
7575
this.logger = context.getLogger();
@@ -105,7 +105,8 @@ private void innerRun() throws IOException {
105105
-> moveToNextPhase(searchPhaseController, scrollId, reducedQueryPhase, queryAndFetchOptimization ?
106106
queryResults : fetchResults);
107107
if (queryAndFetchOptimization) {
108-
assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null;
108+
assert phaseResults.isEmpty() || phaseResults.get(0).fetchResult() != null : "phaseResults empty [" + phaseResults.isEmpty()
109+
+ "], single result: " + phaseResults.get(0).fetchResult();
109110
// query AND fetch optimization
110111
finishPhase.run();
111112
} else {

0 commit comments

Comments
 (0)