Skip to content

Automatically adjust pre_filter_shard_size to 1 for readonly indices #43377

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions docs/reference/frozen-indices.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ GET /twitter/_search?q=user:kimchy&ignore_throttled=false
[IMPORTANT]
================================
While frozen indices are slow to search, they can be pre-filtered efficiently. The request parameter `pre_filter_shard_size` specifies
a threshold that, when exceeded, will enforce a round-trip to pre-filter search shards that cannot possibly match.
This filter phase can limit the number of shards significantly. For instance, if a date range filter is applied, then all indices (frozen or unfrozen) that do not contain documents within the date range can be skipped efficiently.
The default value for `pre_filter_shard_size` is `128` but it's recommended to set it to `1` when searching frozen indices. There is no
significant overhead associated with this pre-filter phase.
a threshold that, when exceeded, will enforce a round-trip to pre-filter search shards that cannot possibly match. Whenever not explicitly
set, the parameter is automatically adjusted to `1` for read-only indices, and to `128` for write indices. This filter phase can limit the
number of shards significantly. For instance, if a date range filter is applied, then all indices that do not contain documents within the
date range can be skipped efficiently.
================================

[role="xpack"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest

private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));

public static final int DEFAULT_PRE_FILTER_SHARD_SIZE = 128;
public static final int DEFAULT_PRE_FILTER_SHARD_SIZE = -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we serialize this value using read/writeVInt so -1 is not a very good default since it will always use 5 bytes. This shouldn't affect anything but we also disallow setting this value to a negative number explicitly in SearchRequest so it will not be possible for users to restore the default value in a SearchRequest. I wonder if it would be simpler to use an Integer and readOptionaVInt to make the intent clear ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point: I follow the bytes reasoning more than the resetting issue, cause generally we do not allow null values either when validating requests, hence you need to create a new request to go back to the default values. I am not a big fan of null values, so I have a slight preference for -1 but I can also change that

public static final int DEFAULT_BATCHED_REDUCE_SIZE = 512;

private static final long DEFAULT_ABSOLUTE_START_MILLIS = -1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,67 +474,69 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
searchRequest.indices());
routingMap = routingMap == null ? Collections.emptyMap() : Collections.unmodifiableMap(routingMap);
Map<String, Float> concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState);

if (shouldSplitIndices(searchRequest)) {
//Execute two separate searches when we can, so that indices that are being written to are searched as quickly as possible.
//Otherwise their search context would need to stay open for too long between the query and the fetch phase, due to other
//indices (possibly slower) being searched at the same time.
List<String> writeIndicesList = new ArrayList<>();
List<String> readOnlyIndicesList = new ArrayList<>();
splitIndices(indices, clusterState, writeIndicesList, readOnlyIndicesList);
String[] writeIndices = writeIndicesList.toArray(new String[0]);
String[] readOnlyIndices = readOnlyIndicesList.toArray(new String[0]);

if (readOnlyIndices.length == 0) {
executeSearch(task, timeProvider, searchRequest, localIndices, writeIndices, routingMap,
aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters);
} else if (writeIndices.length == 0 && remoteShardIterators.isEmpty()) {
executeSearch(task, timeProvider, searchRequest, localIndices, readOnlyIndices, routingMap,
aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters);
} else {
//Split the search in two whenever throttled indices are searched together with ordinary indices (local or remote), so
//that we don't keep the search context open for too long between query and fetch for ordinary indices due to slow indices.
CountDown countDown = new CountDown(2);
AtomicReference<Exception> exceptions = new AtomicReference<>();
SearchResponseMerger searchResponseMerger = createSearchResponseMerger(searchRequest.source(), timeProvider,
searchService::createReduceContext);
CountDownActionListener<SearchResponse, SearchResponse> countDownActionListener =
new CountDownActionListener<>(countDown, exceptions, listener) {
@Override
void innerOnResponse(SearchResponse searchResponse) {
searchResponseMerger.add(searchResponse);
}

@Override
SearchResponse createFinalResponse() {
return searchResponseMerger.getMergedResponse(clusters);
}
};

//Note that the indices set to the new SearchRequest won't be retrieved from it, as they have been already resolved and
//will be provided separately to executeSearch.
SearchRequest writeIndicesRequest = SearchRequest.subSearchRequest(searchRequest, writeIndices,
RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false);
executeSearch(task, timeProvider, writeIndicesRequest, localIndices, writeIndices, routingMap,
aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, countDownActionListener,
SearchResponse.Clusters.EMPTY);

//Note that the indices set to the new SearchRequest won't be retrieved from it, as they have been already resolved and
//will be provided separately to executeSearch.
SearchRequest readOnlyIndicesRequest = SearchRequest.subSearchRequest(searchRequest, readOnlyIndices,
RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false);
executeSearch(task, timeProvider, readOnlyIndicesRequest, localIndices, readOnlyIndices, routingMap,
aliasFilter, concreteIndexBoosts, Collections.emptyList(), (alias, id) -> null, clusterState, countDownActionListener,
SearchResponse.Clusters.EMPTY);
}
} else {
List<String> writeIndicesList = new ArrayList<>();
List<String> readOnlyIndicesList = new ArrayList<>();
splitIndices(indices, clusterState, writeIndicesList, readOnlyIndicesList);
String[] writeIndices = writeIndicesList.toArray(new String[0]);
String[] readOnlyIndices = readOnlyIndicesList.toArray(new String[0]);
//Execute two separate searches when we can, so that indices that are being written to are searched as quickly as possible.
//Otherwise their search context would need to stay open for too long between the query and the fetch phase, due to other
//(possibly slower) indices being searched at the same time.
//Note that remote shards are considered write indices, although we don't really know as we don't have their metadata.
if (readOnlyIndices.length == 0
|| (writeIndices.length == 0 && remoteShardIterators.isEmpty())
|| shouldSplitSearchExecution(searchRequest) == false) {
String[] concreteIndices = Arrays.stream(indices).map(Index::getName).toArray(String[]::new);
searchRequest.setPreFilterShardSize(computePreFilterShardSize(searchRequest.getPreFilterShardSize(),
writeIndices.length > 0 || remoteShardIterators.isEmpty() == false));
executeSearch(task, timeProvider, searchRequest, localIndices, concreteIndices, routingMap,
aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters);
} else {
CountDown countDown = new CountDown(2);
AtomicReference<Exception> exceptions = new AtomicReference<>();
SearchResponseMerger searchResponseMerger = createSearchResponseMerger(searchRequest.source(), timeProvider,
searchService::createReduceContext);
CountDownActionListener<SearchResponse, SearchResponse> countDownActionListener =
new CountDownActionListener<>(countDown, exceptions, listener) {
@Override
void innerOnResponse(SearchResponse searchResponse) {
searchResponseMerger.add(searchResponse);
}

@Override
SearchResponse createFinalResponse() {
return searchResponseMerger.getMergedResponse(clusters);
}
};

//Note that the indices set to the new SearchRequests won't be retrieved from them, as they have been already resolved and
//will be provided separately to executeSearch.
SearchRequest writeIndicesRequest = SearchRequest.subSearchRequest(searchRequest, writeIndices,
RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false);
writeIndicesRequest.setPreFilterShardSize(computePreFilterShardSize(searchRequest.getPreFilterShardSize(), true));
executeSearch(task, timeProvider, writeIndicesRequest, localIndices, writeIndices, routingMap,
aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, countDownActionListener,
SearchResponse.Clusters.EMPTY);
SearchRequest readOnlyIndicesRequest = SearchRequest.subSearchRequest(searchRequest, readOnlyIndices,
RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false);
readOnlyIndicesRequest.setPreFilterShardSize(computePreFilterShardSize(searchRequest.getPreFilterShardSize(), false));
executeSearch(task, timeProvider, readOnlyIndicesRequest, localIndices, readOnlyIndices, routingMap,
aliasFilter, concreteIndexBoosts, Collections.emptyList(), (alias, id) -> null, clusterState, countDownActionListener,
SearchResponse.Clusters.EMPTY);
}
}

static int computePreFilterShardSize(int providedPreFilterShardSize, boolean writeIndices) {
if (providedPreFilterShardSize != SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE) {
return providedPreFilterShardSize;
}
if (writeIndices) {
return 128;
}
return 1;
}

static boolean shouldSplitIndices(SearchRequest searchRequest) {
static boolean shouldSplitSearchExecution(SearchRequest searchRequest) {
return searchRequest.scroll() == null && searchRequest.searchType() != DFS_QUERY_THEN_FETCH
&& (searchRequest.source() == null || searchRequest.source().size() != 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import static org.elasticsearch.rest.RestRequest.Method.GET;
Expand Down Expand Up @@ -92,7 +91,7 @@ public static MultiSearchRequest parseRequest(RestRequest restRequest, boolean a
multiRequest.maxConcurrentSearchRequests(restRequest.paramAsInt("max_concurrent_searches", 0));
}

int preFilterShardSize = restRequest.paramAsInt("pre_filter_shard_size", SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE);


final Integer maxConcurrentShardRequests;
if (restRequest.hasParam("max_concurrent_shard_requests")) {
Expand All @@ -108,11 +107,11 @@ public static MultiSearchRequest parseRequest(RestRequest restRequest, boolean a
RestSearchAction.checkRestTotalHits(restRequest, searchRequest);
multiRequest.add(searchRequest);
});
List<SearchRequest> requests = multiRequest.requests();
preFilterShardSize = Math.max(1, preFilterShardSize / (requests.size()+1));
for (SearchRequest request : requests) {
// preserve if it's set on the request
request.setPreFilterShardSize(Math.min(preFilterShardSize, request.getPreFilterShardSize()));
int preFilterShardSize = restRequest.paramAsInt("pre_filter_shard_size", SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE);
for (SearchRequest request : multiRequest.requests()) {
if (preFilterShardSize != SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE) {
request.setPreFilterShardSize(preFilterShardSize);
}
if (maxConcurrentShardRequests != null) {
request.setMaxConcurrentShardRequests(maxConcurrentShardRequests);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r

final int batchedReduceSize = request.paramAsInt("batched_reduce_size", searchRequest.getBatchedReduceSize());
searchRequest.setBatchedReduceSize(batchedReduceSize);
searchRequest.setPreFilterShardSize(request.paramAsInt("pre_filter_shard_size", searchRequest.getPreFilterShardSize()));
if (request.hasParam("pre_filter_shard_size")) {
searchRequest.setPreFilterShardSize(request.paramAsInt("pre_filter_shard_size", searchRequest.getPreFilterShardSize()));
}

if (request.hasParam("max_concurrent_shard_requests")) {
// only set if we have the parameter since we auto adjust the max concurrency on the coordinator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,4 +235,61 @@ public void testSplitIndices() {
assertTrue(response.isAcknowledged());
}
}

public void testAutoPreFilterShardSize() {
int numReadOnlyShards = 4;
int readOnlySkippedShards = numReadOnlyShards - 1;
{
CreateIndexResponse response = client().admin().indices().prepareCreate("readonly")
.setSettings(Settings.builder().put("index.number_of_shards", numReadOnlyShards)).get();
assertTrue(response.isAcknowledged());
}
{
CreateIndexResponse response = client().admin().indices().prepareCreate("write").get();
assertTrue(response.isAcknowledged());
}
RangeQueryBuilder rangeQueryBuilder = new RangeQueryBuilder("rank").gt(30);
{
SearchResponse searchResponse = client().prepareSearch("readonly", "write").setQuery(rangeQueryBuilder).get();
assertEquals(5, searchResponse.getTotalShards());
assertEquals(0, searchResponse.getSkippedShards());
}
{
Settings settings = Settings.builder().put("index.blocks.read_only", "true").build();
AcknowledgedResponse response = client().admin().indices().prepareUpdateSettings("readonly").setSettings(settings).get();
assertTrue(response.isAcknowledged());
}
try {
{
SearchResponse searchResponse = client().prepareSearch("write", "readonly").setQuery(rangeQueryBuilder).get();
assertEquals(5, searchResponse.getTotalShards());
assertEquals(readOnlySkippedShards, searchResponse.getSkippedShards());
}
{
SearchResponse searchResponse = client().prepareSearch("write").setQuery(rangeQueryBuilder).get();
assertEquals(1, searchResponse.getTotalShards());
assertEquals(0, searchResponse.getSkippedShards());
}
{
SearchResponse searchResponse = client().prepareSearch("readonly").setQuery(rangeQueryBuilder).get();
assertEquals(4, searchResponse.getTotalShards());
assertEquals(readOnlySkippedShards, searchResponse.getSkippedShards());
}
{
SearchResponse searchResponse = client().prepareSearch("write", "readonly").setSize(0).setQuery(rangeQueryBuilder).get();
assertEquals(5, searchResponse.getTotalShards());
assertEquals(0, searchResponse.getSkippedShards());
}
{
//size 0 makes us not split the search execution, yet we execute can_match when we search only read-only indices
SearchResponse searchResponse = client().prepareSearch("readonly").setSize(0).setQuery(rangeQueryBuilder).get();
assertEquals(4, searchResponse.getTotalShards());
assertEquals(readOnlySkippedShards, searchResponse.getSkippedShards());
}
} finally {
Settings settings = Settings.builder().put("index.blocks.read_only", "false").build();
AcknowledgedResponse response = client().admin().indices().prepareUpdateSettings("readonly").setSettings(settings).get();
assertTrue(response.isAcknowledged());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -845,32 +845,32 @@ public void testShouldMinimizeRoundtrips() throws Exception {
public void testShouldSplitIndices() {
{
SearchRequest searchRequest = new SearchRequest();
assertTrue(TransportSearchAction.shouldSplitIndices(searchRequest));
assertTrue(TransportSearchAction.shouldSplitSearchExecution(searchRequest));
}
{
SearchRequest searchRequest = new SearchRequest();
searchRequest.source(new SearchSourceBuilder());
assertTrue(TransportSearchAction.shouldSplitIndices(searchRequest));
assertTrue(TransportSearchAction.shouldSplitSearchExecution(searchRequest));
}
{
SearchRequest searchRequest = new SearchRequest();
searchRequest.source(new SearchSourceBuilder().size(randomIntBetween(1, 100)));
assertTrue(TransportSearchAction.shouldSplitIndices(searchRequest));
assertTrue(TransportSearchAction.shouldSplitSearchExecution(searchRequest));
}
{
SearchRequest searchRequest = new SearchRequest();
searchRequest.scroll("5s");
assertFalse(TransportSearchAction.shouldSplitIndices(searchRequest));
assertFalse(TransportSearchAction.shouldSplitSearchExecution(searchRequest));
}
{
SearchRequest searchRequest = new SearchRequest();
searchRequest.source(new SearchSourceBuilder().size(0));
assertFalse(TransportSearchAction.shouldSplitIndices(searchRequest));
assertFalse(TransportSearchAction.shouldSplitSearchExecution(searchRequest));
}
{
SearchRequest searchRequest = new SearchRequest();
searchRequest.searchType(SearchType.DFS_QUERY_THEN_FETCH);
assertFalse(TransportSearchAction.shouldSplitIndices(searchRequest));
assertFalse(TransportSearchAction.shouldSplitSearchExecution(searchRequest));
}
}

Expand Down Expand Up @@ -912,4 +912,11 @@ public void testSplitIndices() {
assertEquals(readOnlyIndices, expectedReadOnly);
}
}

public void testComputePreFilterShardSize() {
assertEquals(128, TransportSearchAction.computePreFilterShardSize(-1, true));
assertEquals(1, TransportSearchAction.computePreFilterShardSize(-1, false));
int provided = randomIntBetween(1, Integer.MAX_VALUE);
assertEquals(provided, TransportSearchAction.computePreFilterShardSize(provided, randomBoolean()));
}
}