Skip to content

Commit 01f4153

Browse files
committed
Add finalReduce flag to SearchRequest
WIth elastic#37000 we made sure that fnial reduction is automatically disabled whenever a localClusterAlias is provided with a SearchRequest. While working on elastic#37838, we found a scenario where we do need to set a localClusterAlias yet we would like to perform a final reduction in the remote cluster: when searching on a single remote cluster. This commit adds support for a separate finalReduce flag to SearchRequest and makes use of it in TransportSearchAction in case we are searching against a single remote cluster. This also makes sure that num_reduce_phases is correct when searching against a single remote cluster: it makes little sense to return `num_reduce_phases` set to `2`, which looks especially weird in case the search was performed against a single remote shard. We should perform one reduction phase only in this case and `num_reduce_phases` should reflect that.
1 parent b7de8e1 commit 01f4153

File tree

13 files changed

+230
-78
lines changed

13 files changed

+230
-78
lines changed

docs/reference/modules/cross-cluster-search.asciidoc

-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ GET /cluster_one:twitter/_search
6565
{
6666
"took": 150,
6767
"timed_out": false,
68-
"num_reduce_phases": 2,
6968
"_shards": {
7069
"total": 1,
7170
"successful": 1,

qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/10_basic.yml

+9
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
terms:
3737
field: f1.keyword
3838

39+
- match: { num_reduce_phases: 3 }
3940
- match: {_clusters.total: 2}
4041
- match: {_clusters.successful: 2}
4142
- match: {_clusters.skipped: 0}
@@ -63,6 +64,7 @@
6364
terms:
6465
field: f1.keyword
6566

67+
- match: { num_reduce_phases: 3 }
6668
- match: {_clusters.total: 2}
6769
- match: {_clusters.successful: 2}
6870
- match: {_clusters.skipped: 0}
@@ -83,6 +85,7 @@
8385
terms:
8486
field: f1.keyword
8587

88+
- is_false: num_reduce_phases
8689
- match: {_clusters.total: 1}
8790
- match: {_clusters.successful: 1}
8891
- match: {_clusters.skipped: 0}
@@ -103,6 +106,7 @@
103106
terms:
104107
field: f1.keyword
105108

109+
- is_false: num_reduce_phases
106110
- is_false: _clusters
107111
- match: { _shards.total: 2 }
108112
- match: { hits.total: 5}
@@ -133,6 +137,7 @@
133137
rest_total_hits_as_int: true
134138
index: test_remote_cluster:test_index
135139

140+
- is_false: num_reduce_phases
136141
- match: {_clusters.total: 1}
137142
- match: {_clusters.successful: 1}
138143
- match: {_clusters.skipped: 0}
@@ -162,6 +167,7 @@
162167
rest_total_hits_as_int: true
163168
index: "*:test_index"
164169

170+
- match: { num_reduce_phases: 3 }
165171
- match: {_clusters.total: 2}
166172
- match: {_clusters.successful: 2}
167173
- match: {_clusters.skipped: 0}
@@ -176,6 +182,7 @@
176182
rest_total_hits_as_int: true
177183
index: my_remote_cluster:aliased_test_index
178184

185+
- is_false: num_reduce_phases
179186
- match: {_clusters.total: 1}
180187
- match: {_clusters.successful: 1}
181188
- match: {_clusters.skipped: 0}
@@ -192,6 +199,7 @@
192199
rest_total_hits_as_int: true
193200
index: my_remote_cluster:aliased_test_index,my_remote_cluster:field_caps_index_1
194201

202+
- is_false: num_reduce_phases
195203
- match: {_clusters.total: 1}
196204
- match: {_clusters.successful: 1}
197205
- match: {_clusters.skipped: 0}
@@ -208,6 +216,7 @@
208216
rest_total_hits_as_int: true
209217
index: "my_remote_cluster:single_doc_index"
210218

219+
- is_false: num_reduce_phases
211220
- match: {_clusters.total: 1}
212221
- match: {_clusters.successful: 1}
213222
- match: {_clusters.skipped: 0}

qa/multi-cluster-search/src/test/resources/rest-api-spec/test/multi_cluster/40_scroll.yml

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
query:
1313
match_all: {}
1414

15+
- is_false: num_reduce_phases
1516
- match: {_clusters.total: 1}
1617
- match: {_clusters.successful: 1}
1718
- match: {_clusters.skipped: 0}
@@ -28,6 +29,7 @@
2829
rest_total_hits_as_int: true
2930
body: { "scroll_id": "$scroll_id", "scroll": "1m"}
3031

32+
- is_false: num_reduce_phases
3133
- is_false: _clusters
3234
- match: {hits.total: 6 }
3335
- length: {hits.hits: 2 }

server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -714,20 +714,18 @@ InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResu
714714
final boolean hasAggs = source != null && source.aggregations() != null;
715715
final boolean hasTopDocs = source == null || source.size() != 0;
716716
final int trackTotalHitsUpTo = resolveTrackTotalHits(request);
717-
final boolean finalReduce = request.getLocalClusterAlias() == null;
718-
719717
if (isScrollRequest == false && (hasAggs || hasTopDocs)) {
720718
// no incremental reduce if scroll is used - we only hit a single shard or sometimes more...
721719
if (request.getBatchedReduceSize() < numShards) {
722720
// only use this if there are aggs and if there are more shards than we should reduce at once
723721
return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs,
724-
trackTotalHitsUpTo, finalReduce);
722+
trackTotalHitsUpTo, request.isFinalReduce());
725723
}
726724
}
727725
return new InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult>(numShards) {
728726
@Override
729727
ReducedQueryPhase reduce() {
730-
return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHitsUpTo, finalReduce);
728+
return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHitsUpTo, request.isFinalReduce());
731729
}
732730
};
733731
}

server/src/main/java/org/elasticsearch/action/search/SearchRequest.java

+38-12
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
6767

6868
private final String localClusterAlias;
6969
private final long absoluteStartMillis;
70+
private final boolean finalReduce;
7071

7172
private SearchType searchType = SearchType.DEFAULT;
7273

@@ -102,13 +103,15 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
102103
public SearchRequest() {
103104
this.localClusterAlias = null;
104105
this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
106+
this.finalReduce = true;
105107
}
106108

107109
/**
108110
* Constructs a new search request from the provided search request
109111
*/
110112
public SearchRequest(SearchRequest searchRequest) {
111-
this(searchRequest, searchRequest.indices, searchRequest.localClusterAlias, searchRequest.absoluteStartMillis);
113+
this(searchRequest, searchRequest.indices, searchRequest.localClusterAlias,
114+
searchRequest.absoluteStartMillis, searchRequest.finalReduce);
112115
}
113116

114117
/**
@@ -132,25 +135,30 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) {
132135
}
133136

134137
/**
135-
* Creates a new search request by providing the search request to copy all fields from, the indices to search against,
136-
* the alias of the cluster where it will be executed, as well as the start time in milliseconds from the epoch time.
137-
* Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search request performing local reduction
138-
* on each cluster. The coordinating CCS node provides the original search request, the indices to search against as well as the
139-
* alias to prefix index names with in the returned search results, and the absolute start time to be used on the remote clusters
140-
* to ensure that the same value is used.
138+
* Creates a new search request by providing the search request to copy all fields from, the indices to search against, the alias of
139+
* the cluster where it will be executed, as well as the start time in milliseconds from the epoch time and whether the reduction
140+
* should be final or not. Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search request
141+
* performing reduction on each cluster in order to minimize network round-trips between the coordinating node and the remote clusters.
142+
*
143+
* @param originalSearchRequest the original search request
144+
* @param indices the indices to search against
145+
* @param localClusterAlias the alias to prefix index names with in the returned search results
146+
* @param absoluteStartMillis the absolute start time to be used on the remote clusters to ensure that the same value is used
147+
* @param finalReduce whether the reduction should be final or not
141148
*/
142149
static SearchRequest withLocalReduction(SearchRequest originalSearchRequest, String[] indices,
143-
String localClusterAlias, long absoluteStartMillis) {
150+
String localClusterAlias, long absoluteStartMillis, boolean finalReduce) {
144151
Objects.requireNonNull(originalSearchRequest, "search request must not be null");
145152
validateIndices(indices);
146153
Objects.requireNonNull(localClusterAlias, "cluster alias must not be null");
147154
if (absoluteStartMillis < 0) {
148155
throw new IllegalArgumentException("absoluteStartMillis must not be negative but was [" + absoluteStartMillis + "]");
149156
}
150-
return new SearchRequest(originalSearchRequest, indices, localClusterAlias, absoluteStartMillis);
157+
return new SearchRequest(originalSearchRequest, indices, localClusterAlias, absoluteStartMillis, finalReduce);
151158
}
152159

153-
private SearchRequest(SearchRequest searchRequest, String[] indices, String localClusterAlias, long absoluteStartMillis) {
160+
private SearchRequest(SearchRequest searchRequest, String[] indices, String localClusterAlias, long absoluteStartMillis,
161+
boolean finalReduce) {
154162
this.allowPartialSearchResults = searchRequest.allowPartialSearchResults;
155163
this.batchedReduceSize = searchRequest.batchedReduceSize;
156164
this.ccsMinimizeRoundtrips = searchRequest.ccsMinimizeRoundtrips;
@@ -167,6 +175,7 @@ private SearchRequest(SearchRequest searchRequest, String[] indices, String loca
167175
this.types = searchRequest.types;
168176
this.localClusterAlias = localClusterAlias;
169177
this.absoluteStartMillis = absoluteStartMillis;
178+
this.finalReduce = finalReduce;
170179
}
171180

172181
/**
@@ -203,6 +212,12 @@ public SearchRequest(StreamInput in) throws IOException {
203212
localClusterAlias = null;
204213
absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
205214
}
215+
//TODO move to the 6_7_0 branch once backported to 6.x
216+
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
217+
finalReduce = in.readBoolean();
218+
} else {
219+
finalReduce = true;
220+
}
206221
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
207222
ccsMinimizeRoundtrips = in.readBoolean();
208223
}
@@ -232,6 +247,10 @@ public void writeTo(StreamOutput out) throws IOException {
232247
out.writeVLong(absoluteStartMillis);
233248
}
234249
}
250+
//TODO move to the 6_7_0 branch once backported to 6.x
251+
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
252+
out.writeBoolean(finalReduce);
253+
}
235254
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
236255
out.writeBoolean(ccsMinimizeRoundtrips);
237256
}
@@ -277,11 +296,18 @@ String getLocalClusterAlias() {
277296
return localClusterAlias;
278297
}
279298

299+
/**
300+
* Returns whether the reduction phase that will be performed needs to be final or not.
301+
*/
302+
boolean isFinalReduce() {
303+
return finalReduce;
304+
}
305+
280306
/**
281307
* Returns the current time in milliseconds from the time epoch, to be used for the execution of this search request. Used to
282308
* ensure that the same value, determined by the coordinating node, is used on all nodes involved in the execution of the search
283-
* request. When created through {@link #withLocalReduction(SearchRequest, String[], String, long)}, this method returns the provided
284-
* current time, otherwise it will return {@link System#currentTimeMillis()}.
309+
* request. When created through {@link #withLocalReduction(SearchRequest, String[], String, long, boolean)}, this method returns
310+
* the provided current time, otherwise it will return {@link System#currentTimeMillis()}.
285311
*
286312
*/
287313
long getOrCreateAbsoluteStartMillis() {

server/src/main/java/org/elasticsearch/action/search/SearchResponse.java

+12
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.elasticsearch.action.search;
2121

22+
import org.apache.lucene.search.TotalHits;
2223
import org.elasticsearch.Version;
2324
import org.elasticsearch.action.ActionResponse;
2425
import org.elasticsearch.common.Nullable;
@@ -35,8 +36,10 @@
3536
import org.elasticsearch.common.xcontent.XContentParser.Token;
3637
import org.elasticsearch.rest.RestStatus;
3738
import org.elasticsearch.rest.action.RestActions;
39+
import org.elasticsearch.search.SearchHit;
3840
import org.elasticsearch.search.SearchHits;
3941
import org.elasticsearch.search.aggregations.Aggregations;
42+
import org.elasticsearch.search.aggregations.InternalAggregations;
4043
import org.elasticsearch.search.internal.InternalSearchResponse;
4144
import org.elasticsearch.search.profile.ProfileShardResult;
4245
import org.elasticsearch.search.profile.SearchProfileShardResults;
@@ -47,6 +50,7 @@
4750
import java.util.List;
4851
import java.util.Map;
4952
import java.util.Objects;
53+
import java.util.function.Supplier;
5054

5155
import static org.elasticsearch.action.search.ShardSearchFailure.readShardSearchFailure;
5256
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
@@ -497,4 +501,12 @@ public String toString() {
497501
return "Clusters{total=" + total + ", successful=" + successful + ", skipped=" + skipped + '}';
498502
}
499503
}
504+
505+
static SearchResponse empty(Supplier<Long> tookInMillisSupplier, Clusters clusters) {
506+
SearchHits searchHits = new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), Float.NaN);
507+
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(searchHits,
508+
InternalAggregations.EMPTY, null, null, false, null, 0);
509+
return new SearchResponse(internalSearchResponse, null, 0, 0, 0, tookInMillisSupplier.get(),
510+
ShardSearchFailure.EMPTY_ARRAY, clusters);
511+
}
500512
}

server/src/main/java/org/elasticsearch/action/search/SearchResponseMerger.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,7 @@ SearchResponse getMergedResponse(Clusters clusters) {
115115
//if the search is only across remote clusters, none of them are available, and all of them have skip_unavailable set to true,
116116
//we end up calling merge without anything to merge, we just return an empty search response
117117
if (searchResponses.size() == 0) {
118-
SearchHits searchHits = new SearchHits(new SearchHit[0], new TotalHits(0L, TotalHits.Relation.EQUAL_TO), Float.NaN);
119-
InternalSearchResponse internalSearchResponse = new InternalSearchResponse(searchHits,
120-
InternalAggregations.EMPTY, null, null, false, null, 0);
121-
return new SearchResponse(internalSearchResponse, null, 0, 0, 0, searchTimeProvider.buildTookInMillis(),
122-
ShardSearchFailure.EMPTY_ARRAY, clusters);
118+
return SearchResponse.empty(searchTimeProvider::buildTookInMillis, clusters);
123119
}
124120
int totalShards = 0;
125121
int skippedShards = 0;

0 commit comments

Comments
 (0)