Skip to content

Commit bcf4ac7

Browse files
authored
Add finalReduce flag to SearchRequest (6.x) (#38180)
Backport of #38104 to 6.x This commit adds support for a separate finalReduce flag to SearchRequest and makes use of it in SearchPhaseController instead of disabling final reduction whenever a cluster alias is provided. Relates to #38104
1 parent 5122ea3 commit bcf4ac7

File tree

5 files changed

+94
-26
lines changed

5 files changed

+94
-26
lines changed

server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -712,19 +712,19 @@ InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> newSearchPhaseResu
712712
final boolean hasAggs = source != null && source.aggregations() != null;
713713
final boolean hasTopDocs = source == null || source.size() != 0;
714714
final boolean trackTotalHits = source == null || source.trackTotalHits();
715-
final boolean finalReduce = request.getLocalClusterAlias() == null;
716715

717716
if (isScrollRequest == false && (hasAggs || hasTopDocs)) {
718717
// no incremental reduce if scroll is used - we only hit a single shard or sometimes more...
719718
if (request.getBatchedReduceSize() < numShards) {
720719
// only use this if there are aggs and if there are more shards than we should reduce at once
721-
return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs, finalReduce);
720+
return new QueryPhaseResultConsumer(this, numShards, request.getBatchedReduceSize(), hasTopDocs, hasAggs,
721+
request.isFinalReduce());
722722
}
723723
}
724724
return new InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult>(numShards) {
725725
@Override
726726
ReducedQueryPhase reduce() {
727-
return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHits, finalReduce);
727+
return reducedQueryPhase(results.asList(), isScrollRequest, trackTotalHits, request.isFinalReduce());
728728
}
729729
};
730730
}

server/src/main/java/org/elasticsearch/action/search/SearchRequest.java

+21-5
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
6969

7070
private String localClusterAlias;
7171
private long absoluteStartMillis;
72+
private boolean finalReduce;
7273

7374
private SearchType searchType = SearchType.DEFAULT;
7475

@@ -102,6 +103,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
102103
public SearchRequest() {
103104
this.localClusterAlias = null;
104105
this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
106+
this.finalReduce = true;
105107
}
106108

107109
/**
@@ -123,6 +125,7 @@ public SearchRequest(SearchRequest searchRequest) {
123125
this.types = searchRequest.types;
124126
this.localClusterAlias = searchRequest.localClusterAlias;
125127
this.absoluteStartMillis = searchRequest.absoluteStartMillis;
128+
this.finalReduce = searchRequest.finalReduce;
126129
}
127130

128131
/**
@@ -147,16 +150,18 @@ public SearchRequest(String[] indices, SearchSourceBuilder source) {
147150

148151
/**
149152
* Creates a new search request by providing the alias of the cluster where it will be executed, as well as the current time in
150-
* milliseconds from the epoch time. Used when a {@link SearchRequest} is created and executed as part of a cross-cluster search
151-
* request performing local reduction on each cluster. The coordinating CCS node provides the alias to prefix index names with in
152-
* the returned search results, and the current time to be used on the remote clusters to ensure that the same value is used.
153+
* milliseconds from the epoch time and whether the reduction should be final or not. Used when a {@link SearchRequest} is created
154+
* and executed as part of a cross-cluster search request performing reduction on each remote cluster. The coordinating CCS node
155+
* provides the alias to prefix index names with in the returned search results, the current time to be used on the remote clusters
156+
* to ensure that the same value is used, and determines whether the reduction phase should be final or not.
153157
*/
154-
SearchRequest(String localClusterAlias, long absoluteStartMillis) {
158+
SearchRequest(String localClusterAlias, long absoluteStartMillis, boolean finalReduce) {
155159
this.localClusterAlias = Objects.requireNonNull(localClusterAlias, "cluster alias must not be null");
156160
if (absoluteStartMillis < 0) {
157161
throw new IllegalArgumentException("absoluteStartMillis must not be negative but was [" + absoluteStartMillis + "]");
158162
}
159163
this.absoluteStartMillis = absoluteStartMillis;
164+
this.finalReduce = finalReduce;
160165
}
161166

162167
@Override
@@ -195,10 +200,17 @@ String getLocalClusterAlias() {
195200
return localClusterAlias;
196201
}
197202

203+
/**
204+
* Returns whether the reduction phase that will be performed needs to be final or not.
205+
*/
206+
boolean isFinalReduce() {
207+
return finalReduce;
208+
}
209+
198210
/**
199211
* Returns the current time in milliseconds from the time epoch, to be used for the execution of this search request. Used to
200212
* ensure that the same value, determined by the coordinating node, is used on all nodes involved in the execution of the search
201-
* request. When created through {@link #SearchRequest(String, long)}, this method returns the provided current time, otherwise
213+
* request. When created through {@link #SearchRequest(String, long, boolean)}, this method returns the provided current time, otherwise
202214
* it will return {@link System#currentTimeMillis()}.
203215
*
204216
*/
@@ -518,12 +530,15 @@ public void readFrom(StreamInput in) throws IOException {
518530
localClusterAlias = in.readOptionalString();
519531
if (localClusterAlias != null) {
520532
absoluteStartMillis = in.readVLong();
533+
finalReduce = in.readBoolean();
521534
} else {
522535
absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
536+
finalReduce = true;
523537
}
524538
} else {
525539
localClusterAlias = null;
526540
absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS;
541+
finalReduce = true;
527542
}
528543
}
529544

@@ -554,6 +569,7 @@ public void writeTo(StreamOutput out) throws IOException {
554569
out.writeOptionalString(localClusterAlias);
555570
if (localClusterAlias != null) {
556571
out.writeVLong(absoluteStartMillis);
572+
out.writeBoolean(finalReduce);
557573
}
558574
}
559575
}

server/src/test/java/org/elasticsearch/action/search/SearchPhaseControllerTests.java

+11-8
Original file line numberDiff line numberDiff line change
@@ -306,9 +306,13 @@ private static AtomicArray<SearchPhaseResult> generateFetchResults(int nShards,
306306
return fetchResults;
307307
}
308308

309+
private static SearchRequest randomSearchRequest() {
310+
return randomBoolean() ? new SearchRequest() : new SearchRequest("remote", 0, randomBoolean());
311+
}
312+
309313
public void testConsumer() {
310314
int bufferSize = randomIntBetween(2, 3);
311-
SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
315+
SearchRequest request = randomSearchRequest();
312316
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
313317
request.setBatchedReduceSize(bufferSize);
314318
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer = searchPhaseController.newSearchPhaseResults(request, 3);
@@ -366,7 +370,7 @@ public void testConsumerConcurrently() throws InterruptedException {
366370
int expectedNumResults = randomIntBetween(1, 100);
367371
int bufferSize = randomIntBetween(2, 200);
368372

369-
SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
373+
SearchRequest request = randomSearchRequest();
370374
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")));
371375
request.setBatchedReduceSize(bufferSize);
372376
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer =
@@ -410,7 +414,7 @@ public void testConsumerConcurrently() throws InterruptedException {
410414
public void testConsumerOnlyAggs() {
411415
int expectedNumResults = randomIntBetween(1, 100);
412416
int bufferSize = randomIntBetween(2, 200);
413-
SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
417+
SearchRequest request = randomSearchRequest();
414418
request.source(new SearchSourceBuilder().aggregation(AggregationBuilders.avg("foo")).size(0));
415419
request.setBatchedReduceSize(bufferSize);
416420
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer =
@@ -444,7 +448,7 @@ public void testConsumerOnlyAggs() {
444448
public void testConsumerOnlyHits() {
445449
int expectedNumResults = randomIntBetween(1, 100);
446450
int bufferSize = randomIntBetween(2, 200);
447-
SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
451+
SearchRequest request = randomSearchRequest();
448452
if (randomBoolean()) {
449453
request.source(new SearchSourceBuilder().size(randomIntBetween(1, 10)));
450454
}
@@ -475,8 +479,7 @@ public void testConsumerOnlyHits() {
475479

476480
private void assertFinalReduction(SearchRequest searchRequest) {
477481
assertThat(reductions.size(), greaterThanOrEqualTo(1));
478-
//the last reduction step was the final one only if no cluster alias was provided with the search request
479-
assertEquals(searchRequest.getLocalClusterAlias() == null, reductions.get(reductions.size() - 1));
482+
assertEquals(searchRequest.isFinalReduce(), reductions.get(reductions.size() - 1));
480483
}
481484

482485
public void testNewSearchPhaseResults() {
@@ -548,7 +551,7 @@ public void testReduceTopNWithFromOffset() {
548551
public void testConsumerSortByField() {
549552
int expectedNumResults = randomIntBetween(1, 100);
550553
int bufferSize = randomIntBetween(2, 200);
551-
SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
554+
SearchRequest request = randomSearchRequest();
552555
int size = randomIntBetween(1, 10);
553556
request.setBatchedReduceSize(bufferSize);
554557
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer =
@@ -583,7 +586,7 @@ public void testConsumerSortByField() {
583586
public void testConsumerFieldCollapsing() {
584587
int expectedNumResults = randomIntBetween(30, 100);
585588
int bufferSize = randomIntBetween(2, 200);
586-
SearchRequest request = randomBoolean() ? new SearchRequest() : new SearchRequest("remote");
589+
SearchRequest request = randomSearchRequest();
587590
int size = randomIntBetween(5, 10);
588591
request.setBatchedReduceSize(bufferSize);
589592
InitialSearchPhase.ArraySearchPhaseResults<SearchPhaseResult> consumer =

server/src/test/java/org/elasticsearch/action/search/SearchRequestTests.java

+7-4
Original file line numberDiff line numberDiff line change
@@ -52,15 +52,15 @@ protected SearchRequest createSearchRequest() throws IOException {
5252
return super.createSearchRequest();
5353
}
5454
//clusterAlias and absoluteStartMillis do not have public getters/setters hence we randomize them only in this test specifically.
55-
SearchRequest searchRequest = new SearchRequest(randomAlphaOfLengthBetween(5, 10), randomNonNegativeLong());
55+
SearchRequest searchRequest = new SearchRequest(randomAlphaOfLengthBetween(5, 10), randomNonNegativeLong(), randomBoolean());
5656
RandomSearchRequestGenerator.randomSearchRequest(searchRequest, this::createSearchSourceBuilder);
5757
return searchRequest;
5858
}
5959

6060
public void testClusterAliasValidation() {
61-
expectThrows(NullPointerException.class, () -> new SearchRequest(null, 0));
62-
expectThrows(IllegalArgumentException.class, () -> new SearchRequest("", -1));
63-
SearchRequest searchRequest = new SearchRequest("", 0);
61+
expectThrows(NullPointerException.class, () -> new SearchRequest(null, 0, randomBoolean()));
62+
expectThrows(IllegalArgumentException.class, () -> new SearchRequest("", -1, randomBoolean()));
63+
SearchRequest searchRequest = new SearchRequest("", 0, randomBoolean());
6464
assertNull(searchRequest.validate());
6565
}
6666

@@ -79,9 +79,11 @@ public void testClusterAliasSerialization() throws IOException {
7979
if (version.before(Version.V_6_7_0)) {
8080
assertNull(deserializedRequest.getLocalClusterAlias());
8181
assertAbsoluteStartMillisIsCurrentTime(deserializedRequest);
82+
assertTrue(deserializedRequest.isFinalReduce());
8283
} else {
8384
assertEquals(searchRequest.getLocalClusterAlias(), deserializedRequest.getLocalClusterAlias());
8485
assertEquals(searchRequest.getOrCreateAbsoluteStartMillis(), deserializedRequest.getOrCreateAbsoluteStartMillis());
86+
assertEquals(searchRequest.isFinalReduce(), deserializedRequest.isFinalReduce());
8587
}
8688
}
8789

@@ -94,6 +96,7 @@ public void testReadFromPre6_7_0() throws IOException {
9496
assertArrayEquals(new String[]{"index"}, searchRequest.indices());
9597
assertNull(searchRequest.getLocalClusterAlias());
9698
assertAbsoluteStartMillisIsCurrentTime(searchRequest);
99+
assertTrue(searchRequest.isFinalReduce());
97100
}
98101
}
99102

server/src/test/java/org/elasticsearch/action/search/TransportSearchActionSingleNodeTests.java

+52-6
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,25 @@
2626
import org.elasticsearch.index.query.RangeQueryBuilder;
2727
import org.elasticsearch.rest.RestStatus;
2828
import org.elasticsearch.search.SearchHit;
29+
import org.elasticsearch.search.aggregations.Aggregations;
30+
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
31+
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
32+
import org.elasticsearch.search.aggregations.support.ValueType;
2933
import org.elasticsearch.search.builder.SearchSourceBuilder;
3034
import org.elasticsearch.test.ESSingleNodeTestCase;
3135

3236
public class TransportSearchActionSingleNodeTests extends ESSingleNodeTestCase {
3337

3438
public void testLocalClusterAlias() {
35-
long nowInMillis = System.currentTimeMillis();
39+
long nowInMillis = randomLongBetween(0, Long.MAX_VALUE);
3640
IndexRequest indexRequest = new IndexRequest("test", "type", "1");
3741
indexRequest.source("field", "value");
3842
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
3943
IndexResponse indexResponse = client().index(indexRequest).actionGet();
4044
assertEquals(RestStatus.CREATED, indexResponse.status());
4145

4246
{
43-
SearchRequest searchRequest = new SearchRequest("local", nowInMillis);
47+
SearchRequest searchRequest = new SearchRequest("local", nowInMillis, randomBoolean());
4448
SearchResponse searchResponse = client().search(searchRequest).actionGet();
4549
assertEquals(1, searchResponse.getHits().getTotalHits());
4650
SearchHit[] hits = searchResponse.getHits().getHits();
@@ -51,7 +55,7 @@ public void testLocalClusterAlias() {
5155
assertEquals("1", hit.getId());
5256
}
5357
{
54-
SearchRequest searchRequest = new SearchRequest("", nowInMillis);
58+
SearchRequest searchRequest = new SearchRequest("", nowInMillis, randomBoolean());
5559
SearchResponse searchResponse = client().search(searchRequest).actionGet();
5660
assertEquals(1, searchResponse.getHits().getTotalHits());
5761
SearchHit[] hits = searchResponse.getHits().getHits();
@@ -90,19 +94,19 @@ public void testAbsoluteStartMillis() {
9094
assertEquals(0, searchResponse.getTotalShards());
9195
}
9296
{
93-
SearchRequest searchRequest = new SearchRequest("", 0);
97+
SearchRequest searchRequest = new SearchRequest("", 0, randomBoolean());
9498
SearchResponse searchResponse = client().search(searchRequest).actionGet();
9599
assertEquals(2, searchResponse.getHits().getTotalHits());
96100
}
97101
{
98-
SearchRequest searchRequest = new SearchRequest("", 0);
102+
SearchRequest searchRequest = new SearchRequest("", 0, randomBoolean());
99103
searchRequest.indices("<test-{now/d}>");
100104
SearchResponse searchResponse = client().search(searchRequest).actionGet();
101105
assertEquals(1, searchResponse.getHits().getTotalHits());
102106
assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex());
103107
}
104108
{
105-
SearchRequest searchRequest = new SearchRequest("", 0);
109+
SearchRequest searchRequest = new SearchRequest("", 0, randomBoolean());
106110
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
107111
RangeQueryBuilder rangeQuery = new RangeQueryBuilder("date");
108112
rangeQuery.gte("1970-01-01");
@@ -114,4 +118,46 @@ public void testAbsoluteStartMillis() {
114118
assertEquals("test-1970.01.01", searchResponse.getHits().getHits()[0].getIndex());
115119
}
116120
}
121+
122+
public void testFinalReduce() {
123+
long nowInMillis = randomLongBetween(0, Long.MAX_VALUE);
124+
{
125+
IndexRequest indexRequest = new IndexRequest("test", "type", "1");
126+
indexRequest.source("price", 10);
127+
IndexResponse indexResponse = client().index(indexRequest).actionGet();
128+
assertEquals(RestStatus.CREATED, indexResponse.status());
129+
}
130+
{
131+
IndexRequest indexRequest = new IndexRequest("test", "type", "2");
132+
indexRequest.source("price", 100);
133+
IndexResponse indexResponse = client().index(indexRequest).actionGet();
134+
assertEquals(RestStatus.CREATED, indexResponse.status());
135+
}
136+
client().admin().indices().prepareRefresh("test").get();
137+
138+
SearchSourceBuilder source = new SearchSourceBuilder();
139+
source.size(0);
140+
TermsAggregationBuilder terms = new TermsAggregationBuilder("terms", ValueType.NUMERIC);
141+
terms.field("price");
142+
terms.size(1);
143+
source.aggregation(terms);
144+
145+
{
146+
SearchRequest searchRequest = randomBoolean() ? new SearchRequest().source(source)
147+
: new SearchRequest("remote", nowInMillis, true).source(source);
148+
SearchResponse searchResponse = client().search(searchRequest).actionGet();
149+
assertEquals(2, searchResponse.getHits().getTotalHits());
150+
Aggregations aggregations = searchResponse.getAggregations();
151+
LongTerms longTerms = aggregations.get("terms");
152+
assertEquals(1, longTerms.getBuckets().size());
153+
}
154+
{
155+
SearchRequest searchRequest = new SearchRequest("remote", nowInMillis, false).source(source);
156+
SearchResponse searchResponse = client().search(searchRequest).actionGet();
157+
assertEquals(2, searchResponse.getHits().getTotalHits());
158+
Aggregations aggregations = searchResponse.getAggregations();
159+
LongTerms longTerms = aggregations.get("terms");
160+
assertEquals(2, longTerms.getBuckets().size());
161+
}
162+
}
117163
}

0 commit comments

Comments
 (0)