diff --git a/docs/reference/frozen-indices.asciidoc b/docs/reference/frozen-indices.asciidoc index 60f07b58bcb7d..4dab2e0da3bf1 100644 --- a/docs/reference/frozen-indices.asciidoc +++ b/docs/reference/frozen-indices.asciidoc @@ -90,10 +90,10 @@ GET /twitter/_search?q=user:kimchy&ignore_throttled=false [IMPORTANT] ================================ While frozen indices are slow to search, they can be pre-filtered efficiently. The request parameter `pre_filter_shard_size` specifies -a threshold that, when exceeded, will enforce a round-trip to pre-filter search shards that cannot possibly match. -This filter phase can limit the number of shards significantly. For instance, if a date range filter is applied, then all indices (frozen or unfrozen) that do not contain documents within the date range can be skipped efficiently. -The default value for `pre_filter_shard_size` is `128` but it's recommended to set it to `1` when searching frozen indices. There is no -significant overhead associated with this pre-filter phase. +a threshold that, when exceeded, will enforce a round-trip to pre-filter search shards that cannot possibly match. Whenever not explicitly +set, the parameter is automatically adjusted to `1` for read-only indices, and to `128` for write indices. This filter phase can limit the +number of shards significantly. For instance, if a date range filter is applied, then all indices that do not contain documents within the +date range can be skipped efficiently. ================================ [role="xpack"] diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java index 602853e10b292..eecd619dcf247 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchRequest.java @@ -60,7 +60,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false")); - public static final int DEFAULT_PRE_FILTER_SHARD_SIZE = 128; + public static final int DEFAULT_PRE_FILTER_SHARD_SIZE = -1; public static final int DEFAULT_BATCHED_REDUCE_SIZE = 512; private static final long DEFAULT_ABSOLUTE_START_MILLIS = -1; diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index d4832fb0d7a10..bbf6bb83ce068 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -474,67 +474,69 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea searchRequest.indices()); routingMap = routingMap == null ? Collections.emptyMap() : Collections.unmodifiableMap(routingMap); Map concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState); - - if (shouldSplitIndices(searchRequest)) { - //Execute two separate searches when we can, so that indices that are being written to are searched as quickly as possible. - //Otherwise their search context would need to stay open for too long between the query and the fetch phase, due to other - //indices (possibly slower) being searched at the same time. - List writeIndicesList = new ArrayList<>(); - List readOnlyIndicesList = new ArrayList<>(); - splitIndices(indices, clusterState, writeIndicesList, readOnlyIndicesList); - String[] writeIndices = writeIndicesList.toArray(new String[0]); - String[] readOnlyIndices = readOnlyIndicesList.toArray(new String[0]); - - if (readOnlyIndices.length == 0) { - executeSearch(task, timeProvider, searchRequest, localIndices, writeIndices, routingMap, - aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters); - } else if (writeIndices.length == 0 && remoteShardIterators.isEmpty()) { - executeSearch(task, timeProvider, searchRequest, localIndices, readOnlyIndices, routingMap, - aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters); - } else { - //Split the search in two whenever throttled indices are searched together with ordinary indices (local or remote), so - //that we don't keep the search context open for too long between query and fetch for ordinary indices due to slow indices. - CountDown countDown = new CountDown(2); - AtomicReference exceptions = new AtomicReference<>(); - SearchResponseMerger searchResponseMerger = createSearchResponseMerger(searchRequest.source(), timeProvider, - searchService::createReduceContext); - CountDownActionListener countDownActionListener = - new CountDownActionListener<>(countDown, exceptions, listener) { - @Override - void innerOnResponse(SearchResponse searchResponse) { - searchResponseMerger.add(searchResponse); - } - - @Override - SearchResponse createFinalResponse() { - return searchResponseMerger.getMergedResponse(clusters); - } - }; - - //Note that the indices set to the new SearchRequest won't be retrieved from it, as they have been already resolved and - //will be provided separately to executeSearch. - SearchRequest writeIndicesRequest = SearchRequest.subSearchRequest(searchRequest, writeIndices, - RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false); - executeSearch(task, timeProvider, writeIndicesRequest, localIndices, writeIndices, routingMap, - aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, countDownActionListener, - SearchResponse.Clusters.EMPTY); - - //Note that the indices set to the new SearchRequest won't be retrieved from it, as they have been already resolved and - //will be provided separately to executeSearch. - SearchRequest readOnlyIndicesRequest = SearchRequest.subSearchRequest(searchRequest, readOnlyIndices, - RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false); - executeSearch(task, timeProvider, readOnlyIndicesRequest, localIndices, readOnlyIndices, routingMap, - aliasFilter, concreteIndexBoosts, Collections.emptyList(), (alias, id) -> null, clusterState, countDownActionListener, - SearchResponse.Clusters.EMPTY); - } - } else { + List writeIndicesList = new ArrayList<>(); + List readOnlyIndicesList = new ArrayList<>(); + splitIndices(indices, clusterState, writeIndicesList, readOnlyIndicesList); + String[] writeIndices = writeIndicesList.toArray(new String[0]); + String[] readOnlyIndices = readOnlyIndicesList.toArray(new String[0]); + //Execute two separate searches when we can, so that indices that are being written to are searched as quickly as possible. + //Otherwise their search context would need to stay open for too long between the query and the fetch phase, due to other + //(possibly slower) indices being searched at the same time. + //Note that remote shards are considered write indices, although we don't really know as we don't have their metadata. + if (readOnlyIndices.length == 0 + || (writeIndices.length == 0 && remoteShardIterators.isEmpty()) + || shouldSplitSearchExecution(searchRequest) == false) { String[] concreteIndices = Arrays.stream(indices).map(Index::getName).toArray(String[]::new); + searchRequest.setPreFilterShardSize(computePreFilterShardSize(searchRequest.getPreFilterShardSize(), + writeIndices.length > 0 || remoteShardIterators.isEmpty() == false)); executeSearch(task, timeProvider, searchRequest, localIndices, concreteIndices, routingMap, aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters); + } else { + CountDown countDown = new CountDown(2); + AtomicReference exceptions = new AtomicReference<>(); + SearchResponseMerger searchResponseMerger = createSearchResponseMerger(searchRequest.source(), timeProvider, + searchService::createReduceContext); + CountDownActionListener countDownActionListener = + new CountDownActionListener<>(countDown, exceptions, listener) { + @Override + void innerOnResponse(SearchResponse searchResponse) { + searchResponseMerger.add(searchResponse); + } + + @Override + SearchResponse createFinalResponse() { + return searchResponseMerger.getMergedResponse(clusters); + } + }; + + //Note that the indices set to the new SearchRequests won't be retrieved from them, as they have been already resolved and + //will be provided separately to executeSearch. + SearchRequest writeIndicesRequest = SearchRequest.subSearchRequest(searchRequest, writeIndices, + RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false); + writeIndicesRequest.setPreFilterShardSize(computePreFilterShardSize(searchRequest.getPreFilterShardSize(), true)); + executeSearch(task, timeProvider, writeIndicesRequest, localIndices, writeIndices, routingMap, + aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, countDownActionListener, + SearchResponse.Clusters.EMPTY); + SearchRequest readOnlyIndicesRequest = SearchRequest.subSearchRequest(searchRequest, readOnlyIndices, + RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false); + readOnlyIndicesRequest.setPreFilterShardSize(computePreFilterShardSize(searchRequest.getPreFilterShardSize(), false)); + executeSearch(task, timeProvider, readOnlyIndicesRequest, localIndices, readOnlyIndices, routingMap, + aliasFilter, concreteIndexBoosts, Collections.emptyList(), (alias, id) -> null, clusterState, countDownActionListener, + SearchResponse.Clusters.EMPTY); + } + } + + static int computePreFilterShardSize(int providedPreFilterShardSize, boolean writeIndices) { + if (providedPreFilterShardSize != SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE) { + return providedPreFilterShardSize; + } + if (writeIndices) { + return 128; } + return 1; } - static boolean shouldSplitIndices(SearchRequest searchRequest) { + static boolean shouldSplitSearchExecution(SearchRequest searchRequest) { return searchRequest.scroll() == null && searchRequest.searchType() != DFS_QUERY_THEN_FETCH && (searchRequest.source() == null || searchRequest.source().size() != 0); } diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java b/server/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java index 1aef4aa5254a4..6db1862152177 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java @@ -41,7 +41,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; -import java.util.List; import java.util.Set; import static org.elasticsearch.rest.RestRequest.Method.GET; @@ -92,7 +91,7 @@ public static MultiSearchRequest parseRequest(RestRequest restRequest, boolean a multiRequest.maxConcurrentSearchRequests(restRequest.paramAsInt("max_concurrent_searches", 0)); } - int preFilterShardSize = restRequest.paramAsInt("pre_filter_shard_size", SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE); + final Integer maxConcurrentShardRequests; if (restRequest.hasParam("max_concurrent_shard_requests")) { @@ -108,11 +107,11 @@ public static MultiSearchRequest parseRequest(RestRequest restRequest, boolean a RestSearchAction.checkRestTotalHits(restRequest, searchRequest); multiRequest.add(searchRequest); }); - List requests = multiRequest.requests(); - preFilterShardSize = Math.max(1, preFilterShardSize / (requests.size()+1)); - for (SearchRequest request : requests) { - // preserve if it's set on the request - request.setPreFilterShardSize(Math.min(preFilterShardSize, request.getPreFilterShardSize())); + int preFilterShardSize = restRequest.paramAsInt("pre_filter_shard_size", SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE); + for (SearchRequest request : multiRequest.requests()) { + if (preFilterShardSize != SearchRequest.DEFAULT_PRE_FILTER_SHARD_SIZE) { + request.setPreFilterShardSize(preFilterShardSize); + } if (maxConcurrentShardRequests != null) { request.setMaxConcurrentShardRequests(maxConcurrentShardRequests); } diff --git a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java index 95695bec4f0c1..f21d03e54f847 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java @@ -123,7 +123,9 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r final int batchedReduceSize = request.paramAsInt("batched_reduce_size", searchRequest.getBatchedReduceSize()); searchRequest.setBatchedReduceSize(batchedReduceSize); - searchRequest.setPreFilterShardSize(request.paramAsInt("pre_filter_shard_size", searchRequest.getPreFilterShardSize())); + if (request.hasParam("pre_filter_shard_size")) { + searchRequest.setPreFilterShardSize(request.paramAsInt("pre_filter_shard_size", searchRequest.getPreFilterShardSize())); + } if (request.hasParam("max_concurrent_shard_requests")) { // only set if we have the parameter since we auto adjust the max concurrency on the coordinator diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java index fa6160839d2a9..9fe182b7136ad 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java @@ -235,4 +235,61 @@ public void testSplitIndices() { assertTrue(response.isAcknowledged()); } } + + public void testAutoPreFilterShardSize() { + int numReadOnlyShards = 4; + int readOnlySkippedShards = numReadOnlyShards - 1; + { + CreateIndexResponse response = client().admin().indices().prepareCreate("readonly") + .setSettings(Settings.builder().put("index.number_of_shards", numReadOnlyShards)).get(); + assertTrue(response.isAcknowledged()); + } + { + CreateIndexResponse response = client().admin().indices().prepareCreate("write").get(); + assertTrue(response.isAcknowledged()); + } + RangeQueryBuilder rangeQueryBuilder = new RangeQueryBuilder("rank").gt(30); + { + SearchResponse searchResponse = client().prepareSearch("readonly", "write").setQuery(rangeQueryBuilder).get(); + assertEquals(5, searchResponse.getTotalShards()); + assertEquals(0, searchResponse.getSkippedShards()); + } + { + Settings settings = Settings.builder().put("index.blocks.read_only", "true").build(); + AcknowledgedResponse response = client().admin().indices().prepareUpdateSettings("readonly").setSettings(settings).get(); + assertTrue(response.isAcknowledged()); + } + try { + { + SearchResponse searchResponse = client().prepareSearch("write", "readonly").setQuery(rangeQueryBuilder).get(); + assertEquals(5, searchResponse.getTotalShards()); + assertEquals(readOnlySkippedShards, searchResponse.getSkippedShards()); + } + { + SearchResponse searchResponse = client().prepareSearch("write").setQuery(rangeQueryBuilder).get(); + assertEquals(1, searchResponse.getTotalShards()); + assertEquals(0, searchResponse.getSkippedShards()); + } + { + SearchResponse searchResponse = client().prepareSearch("readonly").setQuery(rangeQueryBuilder).get(); + assertEquals(4, searchResponse.getTotalShards()); + assertEquals(readOnlySkippedShards, searchResponse.getSkippedShards()); + } + { + SearchResponse searchResponse = client().prepareSearch("write", "readonly").setSize(0).setQuery(rangeQueryBuilder).get(); + assertEquals(5, searchResponse.getTotalShards()); + assertEquals(0, searchResponse.getSkippedShards()); + } + { + //size 0 makes us not split the search execution, yet we execute can_match when we search only read-only indices + SearchResponse searchResponse = client().prepareSearch("readonly").setSize(0).setQuery(rangeQueryBuilder).get(); + assertEquals(4, searchResponse.getTotalShards()); + assertEquals(readOnlySkippedShards, searchResponse.getSkippedShards()); + } + } finally { + Settings settings = Settings.builder().put("index.blocks.read_only", "false").build(); + AcknowledgedResponse response = client().admin().indices().prepareUpdateSettings("readonly").setSettings(settings).get(); + assertTrue(response.isAcknowledged()); + } + } } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 60078486335c9..a6c961e9949a3 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -845,32 +845,32 @@ public void testShouldMinimizeRoundtrips() throws Exception { public void testShouldSplitIndices() { { SearchRequest searchRequest = new SearchRequest(); - assertTrue(TransportSearchAction.shouldSplitIndices(searchRequest)); + assertTrue(TransportSearchAction.shouldSplitSearchExecution(searchRequest)); } { SearchRequest searchRequest = new SearchRequest(); searchRequest.source(new SearchSourceBuilder()); - assertTrue(TransportSearchAction.shouldSplitIndices(searchRequest)); + assertTrue(TransportSearchAction.shouldSplitSearchExecution(searchRequest)); } { SearchRequest searchRequest = new SearchRequest(); searchRequest.source(new SearchSourceBuilder().size(randomIntBetween(1, 100))); - assertTrue(TransportSearchAction.shouldSplitIndices(searchRequest)); + assertTrue(TransportSearchAction.shouldSplitSearchExecution(searchRequest)); } { SearchRequest searchRequest = new SearchRequest(); searchRequest.scroll("5s"); - assertFalse(TransportSearchAction.shouldSplitIndices(searchRequest)); + assertFalse(TransportSearchAction.shouldSplitSearchExecution(searchRequest)); } { SearchRequest searchRequest = new SearchRequest(); searchRequest.source(new SearchSourceBuilder().size(0)); - assertFalse(TransportSearchAction.shouldSplitIndices(searchRequest)); + assertFalse(TransportSearchAction.shouldSplitSearchExecution(searchRequest)); } { SearchRequest searchRequest = new SearchRequest(); searchRequest.searchType(SearchType.DFS_QUERY_THEN_FETCH); - assertFalse(TransportSearchAction.shouldSplitIndices(searchRequest)); + assertFalse(TransportSearchAction.shouldSplitSearchExecution(searchRequest)); } } @@ -912,4 +912,11 @@ public void testSplitIndices() { assertEquals(readOnlyIndices, expectedReadOnly); } } + + public void testComputePreFilterShardSize() { + assertEquals(128, TransportSearchAction.computePreFilterShardSize(-1, true)); + assertEquals(1, TransportSearchAction.computePreFilterShardSize(-1, false)); + int provided = randomIntBetween(1, Integer.MAX_VALUE); + assertEquals(provided, TransportSearchAction.computePreFilterShardSize(provided, randomBoolean())); + } }