diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index a7c0a785c7fce..d4832fb0d7a10 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -29,6 +29,7 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; @@ -472,10 +473,89 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea Map> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(), searchRequest.indices()); routingMap = routingMap == null ? Collections.emptyMap() : Collections.unmodifiableMap(routingMap); - String[] concreteIndices = new String[indices.length]; - for (int i = 0; i < indices.length; i++) { - concreteIndices[i] = indices[i].getName(); + Map concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState); + + if (shouldSplitIndices(searchRequest)) { + //Execute two separate searches when we can, so that indices that are being written to are searched as quickly as possible. + //Otherwise their search context would need to stay open for too long between the query and the fetch phase, due to other + //indices (possibly slower) being searched at the same time. + List writeIndicesList = new ArrayList<>(); + List readOnlyIndicesList = new ArrayList<>(); + splitIndices(indices, clusterState, writeIndicesList, readOnlyIndicesList); + String[] writeIndices = writeIndicesList.toArray(new String[0]); + String[] readOnlyIndices = readOnlyIndicesList.toArray(new String[0]); + + if (readOnlyIndices.length == 0) { + executeSearch(task, timeProvider, searchRequest, localIndices, writeIndices, routingMap, + aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters); + } else if (writeIndices.length == 0 && remoteShardIterators.isEmpty()) { + executeSearch(task, timeProvider, searchRequest, localIndices, readOnlyIndices, routingMap, + aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters); + } else { + //Split the search in two whenever throttled indices are searched together with ordinary indices (local or remote), so + //that we don't keep the search context open for too long between query and fetch for ordinary indices due to slow indices. + CountDown countDown = new CountDown(2); + AtomicReference exceptions = new AtomicReference<>(); + SearchResponseMerger searchResponseMerger = createSearchResponseMerger(searchRequest.source(), timeProvider, + searchService::createReduceContext); + CountDownActionListener countDownActionListener = + new CountDownActionListener<>(countDown, exceptions, listener) { + @Override + void innerOnResponse(SearchResponse searchResponse) { + searchResponseMerger.add(searchResponse); + } + + @Override + SearchResponse createFinalResponse() { + return searchResponseMerger.getMergedResponse(clusters); + } + }; + + //Note that the indices set to the new SearchRequest won't be retrieved from it, as they have been already resolved and + //will be provided separately to executeSearch. + SearchRequest writeIndicesRequest = SearchRequest.subSearchRequest(searchRequest, writeIndices, + RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false); + executeSearch(task, timeProvider, writeIndicesRequest, localIndices, writeIndices, routingMap, + aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, countDownActionListener, + SearchResponse.Clusters.EMPTY); + + //Note that the indices set to the new SearchRequest won't be retrieved from it, as they have been already resolved and + //will be provided separately to executeSearch. + SearchRequest readOnlyIndicesRequest = SearchRequest.subSearchRequest(searchRequest, readOnlyIndices, + RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, timeProvider.getAbsoluteStartMillis(), false); + executeSearch(task, timeProvider, readOnlyIndicesRequest, localIndices, readOnlyIndices, routingMap, + aliasFilter, concreteIndexBoosts, Collections.emptyList(), (alias, id) -> null, clusterState, countDownActionListener, + SearchResponse.Clusters.EMPTY); + } + } else { + String[] concreteIndices = Arrays.stream(indices).map(Index::getName).toArray(String[]::new); + executeSearch(task, timeProvider, searchRequest, localIndices, concreteIndices, routingMap, + aliasFilter, concreteIndexBoosts, remoteShardIterators, remoteConnections, clusterState, listener, clusters); + } + } + + static boolean shouldSplitIndices(SearchRequest searchRequest) { + return searchRequest.scroll() == null && searchRequest.searchType() != DFS_QUERY_THEN_FETCH + && (searchRequest.source() == null || searchRequest.source().size() != 0); + } + + static void splitIndices(Index[] indices, ClusterState clusterState, List writeIndices, List readOnlyIndices) { + for (Index index : indices) { + ClusterBlockException writeBlock = clusterState.blocks().indexBlockedException(ClusterBlockLevel.WRITE, index.getName()); + if (writeBlock == null) { + writeIndices.add(index.getName()); + } else { + readOnlyIndices.add(index.getName()); + } } + } + + private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, + OriginalIndices localIndices, String[] concreteIndices, Map> routingMap, + Map aliasFilter, Map concreteIndexBoosts, + List remoteShardIterators, BiFunction remoteConnections, + ClusterState clusterState, ActionListener listener, SearchResponse.Clusters clusters) { + Map nodeSearchCounts = searchTransportService.getPendingSearchRequests(); GroupShardsIterator localShardsIterator = clusterService.operationRouting().searchShards(clusterState, concreteIndices, routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), nodeSearchCounts); @@ -484,8 +564,6 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea failIfOverShardCountLimit(clusterService, shardIterators.size()); - Map concreteIndexBoosts = resolveIndexBoosts(searchRequest, clusterState); - // optimize search type for cases where there is only one shard group to search on if (shardIterators.size() == 1) { // if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shard @@ -498,11 +576,9 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea if (searchRequest.isSuggestOnly()) { // disable request cache if we have only suggest searchRequest.requestCache(false); - switch (searchRequest.searchType()) { - case DFS_QUERY_THEN_FETCH: - // convert to Q_T_F if we have only suggest - searchRequest.searchType(QUERY_THEN_FETCH); - break; + if (searchRequest.searchType() == DFS_QUERY_THEN_FETCH) { + // convert to Q_T_F if we have only suggest + searchRequest.searchType(QUERY_THEN_FETCH); } } @@ -611,22 +687,16 @@ private static void failIfOverShardCountLimit(ClusterService clusterService, int } } - abstract static class CCSActionListener implements ActionListener { - private final String clusterAlias; - private final boolean skipUnavailable; + abstract static class CountDownActionListener implements ActionListener { private final CountDown countDown; - private final AtomicInteger skippedClusters; private final AtomicReference exceptions; - private final ActionListener originalListener; + private final ActionListener delegateListener; - CCSActionListener(String clusterAlias, boolean skipUnavailable, CountDown countDown, AtomicInteger skippedClusters, - AtomicReference exceptions, ActionListener originalListener) { - this.clusterAlias = clusterAlias; - this.skipUnavailable = skipUnavailable; + CountDownActionListener(CountDown countDown, AtomicReference exceptions, + ActionListener delegateListener) { this.countDown = countDown; - this.skippedClusters = skippedClusters; this.exceptions = exceptions; - this.originalListener = originalListener; + this.delegateListener = delegateListener; } @Override @@ -637,26 +707,7 @@ public final void onResponse(Response response) { abstract void innerOnResponse(Response response); - @Override - public final void onFailure(Exception e) { - if (skipUnavailable) { - skippedClusters.incrementAndGet(); - } else { - Exception exception = e; - if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias) == false) { - exception = wrapRemoteClusterFailure(clusterAlias, e); - } - if (exceptions.compareAndSet(null, exception) == false) { - exceptions.accumulateAndGet(exception, (previous, current) -> { - current.addSuppressed(previous); - return current; - }); - } - } - maybeFinish(); - } - - private void maybeFinish() { + final void maybeFinish() { if (countDown.countDown()) { Exception exception = exceptions.get(); if (exception == null) { @@ -664,17 +715,56 @@ private void maybeFinish() { try { response = createFinalResponse(); } catch(Exception e) { - originalListener.onFailure(e); + delegateListener.onFailure(e); return; } - originalListener.onResponse(response); + delegateListener.onResponse(response); } else { - originalListener.onFailure(exceptions.get()); + delegateListener.onFailure(exceptions.get()); } } } abstract FinalResponse createFinalResponse(); + + @Override + public void onFailure(Exception e) { + if (exceptions.compareAndSet(null, e) == false) { + exceptions.accumulateAndGet(e, (previous, current) -> { + current.addSuppressed(previous); + return current; + }); + } + maybeFinish(); + } + } + + abstract static class CCSActionListener extends CountDownActionListener { + private final String clusterAlias; + private final boolean skipUnavailable; + private final AtomicInteger skippedClusters; + + CCSActionListener(String clusterAlias, boolean skipUnavailable, CountDown countDown, AtomicInteger skippedClusters, + AtomicReference exceptions, ActionListener originalListener) { + super(countDown, exceptions, originalListener); + this.clusterAlias = clusterAlias; + this.skipUnavailable = skipUnavailable; + this.skippedClusters = skippedClusters; + } + + @Override + public final void onFailure(Exception e) { + if (skipUnavailable) { + skippedClusters.incrementAndGet(); + maybeFinish(); + } else { + Exception exception = e; + if (RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY.equals(clusterAlias) == false) { + exception = wrapRemoteClusterFailure(clusterAlias, e); + } + super.onFailure(exception); + } + } } private static RemoteTransportException wrapRemoteClusterFailure(String clusterAlias, Exception e) { diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java index 10f252c30dc3b..fa6160839d2a9 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java @@ -19,11 +19,14 @@ package org.elasticsearch.action.search; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; @@ -174,4 +177,62 @@ public void testFinalReduce() { assertEquals(2, longTerms.getBuckets().size()); } } + + public void testSplitIndices() { + { + CreateIndexResponse response = client().admin().indices().prepareCreate("write").get(); + assertTrue(response.isAcknowledged()); + } + { + CreateIndexResponse response = client().admin().indices().prepareCreate("readonly").get(); + assertTrue(response.isAcknowledged()); + } + { + SearchResponse response = client().prepareSearch("readonly").get(); + assertEquals(1, response.getTotalShards()); + assertEquals(1, response.getSuccessfulShards()); + assertEquals(1, response.getNumReducePhases()); + } + { + SearchResponse response = client().prepareSearch("write").get(); + assertEquals(1, response.getTotalShards()); + assertEquals(1, response.getSuccessfulShards()); + assertEquals(1, response.getNumReducePhases()); + } + { + SearchResponse response = client().prepareSearch("readonly", "write").get(); + assertEquals(2, response.getTotalShards()); + assertEquals(2, response.getSuccessfulShards()); + assertEquals(1, response.getNumReducePhases()); + } + { + Settings settings = Settings.builder().put("index.blocks.read_only", "true").build(); + AcknowledgedResponse response = client().admin().indices().prepareUpdateSettings("readonly").setSettings(settings).get(); + assertTrue(response.isAcknowledged()); + } + try { + { + SearchResponse response = client().prepareSearch("readonly").get(); + assertEquals(1, response.getTotalShards()); + assertEquals(1, response.getSuccessfulShards()); + assertEquals(1, response.getNumReducePhases()); + } + { + SearchResponse response = client().prepareSearch("write").get(); + assertEquals(1, response.getTotalShards()); + assertEquals(1, response.getSuccessfulShards()); + assertEquals(1, response.getNumReducePhases()); + } + { + SearchResponse response = client().prepareSearch("readonly", "write").get(); + assertEquals(2, response.getTotalShards()); + assertEquals(2, response.getSuccessfulShards()); + assertEquals(3, response.getNumReducePhases()); + } + } finally { + Settings settings = Settings.builder().put("index.blocks.read_only", "false").build(); + AcknowledgedResponse response = client().admin().indices().prepareUpdateSettings("readonly").setSettings(settings).get(); + assertTrue(response.isAcknowledged()); + } + } } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 369d71f05ffb8..60078486335c9 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -29,6 +29,10 @@ import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup; import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse; import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.GroupShardsIterator; import org.elasticsearch.cluster.routing.GroupShardsIteratorTests; @@ -837,4 +841,75 @@ public void testShouldMinimizeRoundtrips() throws Exception { assertFalse(TransportSearchAction.shouldMinimizeRoundtrips(searchRequest)); } } + + public void testShouldSplitIndices() { + { + SearchRequest searchRequest = new SearchRequest(); + assertTrue(TransportSearchAction.shouldSplitIndices(searchRequest)); + } + { + SearchRequest searchRequest = new SearchRequest(); + searchRequest.source(new SearchSourceBuilder()); + assertTrue(TransportSearchAction.shouldSplitIndices(searchRequest)); + } + { + SearchRequest searchRequest = new SearchRequest(); + searchRequest.source(new SearchSourceBuilder().size(randomIntBetween(1, 100))); + assertTrue(TransportSearchAction.shouldSplitIndices(searchRequest)); + } + { + SearchRequest searchRequest = new SearchRequest(); + searchRequest.scroll("5s"); + assertFalse(TransportSearchAction.shouldSplitIndices(searchRequest)); + } + { + SearchRequest searchRequest = new SearchRequest(); + searchRequest.source(new SearchSourceBuilder().size(0)); + assertFalse(TransportSearchAction.shouldSplitIndices(searchRequest)); + } + { + SearchRequest searchRequest = new SearchRequest(); + searchRequest.searchType(SearchType.DFS_QUERY_THEN_FETCH); + assertFalse(TransportSearchAction.shouldSplitIndices(searchRequest)); + } + } + + public void testSplitIndices() { + int numIndices = randomIntBetween(1, 10); + Index[] indices = new Index[numIndices]; + for (int i = 0; i < numIndices; i++) { + String indexName = randomAlphaOfLengthBetween(5, 10); + indices[i] = new Index(indexName, indexName + "-uuid"); + } + { + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).build(); + List writeIndices = new ArrayList<>(); + List readOnlyIndices = new ArrayList<>(); + TransportSearchAction.splitIndices(indices, clusterState, writeIndices, readOnlyIndices); + assertEquals(0, readOnlyIndices.size()); + assertEquals(numIndices, writeIndices.size()); + } + { + List expectedWrite = new ArrayList<>(); + List expectedReadOnly = new ArrayList<>(); + ClusterBlocks.Builder blocksBuilder = ClusterBlocks.builder(); + for (Index index : indices) { + if (randomBoolean()) { + blocksBuilder.addIndexBlock(index.getName(), IndexMetaData.INDEX_WRITE_BLOCK); + expectedReadOnly.add(index.getName()); + } else if(randomBoolean() ){ + blocksBuilder.addIndexBlock(index.getName(), IndexMetaData.INDEX_READ_ONLY_BLOCK); + expectedReadOnly.add(index.getName()); + } else { + expectedWrite.add(index.getName()); + } + } + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).blocks(blocksBuilder).build(); + List writeIndices = new ArrayList<>(); + List readOnlyIndices = new ArrayList<>(); + TransportSearchAction.splitIndices(indices, clusterState, writeIndices, readOnlyIndices); + assertEquals(writeIndices, expectedWrite); + assertEquals(readOnlyIndices, expectedReadOnly); + } + } } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/indices.freeze/10_basic.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/indices.freeze/10_basic.yml index 4ba49e53308d2..4a5f713ea74a5 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/indices.freeze/10_basic.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/indices.freeze/10_basic.yml @@ -77,6 +77,25 @@ - match: {hits.total: 0} +- do: + index: + index: ordinary + id: "1" + body: { "foo": "Hello: 1" } + refresh: wait_for + +- do: + search: + rest_total_hits_as_int: true + index: [test, ordinary] + ignore_throttled: false + body: + query: + match: + foo: hello + +- match: {hits.total: 3} + --- "Test index options": - do: