Skip to content

Commit e9c179a

Browse files
Introduce batched query execution and data-node side reduce (#121885) (#126563)
* Introduce batched query execution and data-node side reduce (#121885) This change moves the query phase a single roundtrip per node just like can_match or field_caps work already. A a result of executing multiple shard queries from a single request we can also partially reduce each node's query results on the data node side before responding to the coordinating node. As a result this change significantly reduces the impact of network latencies on the end-to-end query performance, reduces the amount of work done (memory and cpu) on the coordinating node and the network traffic by factors of up to the number of shards per data node! Benchmarking shows up to orders of magnitude improvements in heap and network traffic dimensions in querying across a larger number of shards. * Filter out empty top docs results before merging (#126385) `Lucene.EMPTY_TOP_DOCS` to identify empty to docs results. These were previously null results, but did not need to be send over transport as incremental reduction was performed only on the data node. Now it can happen that the coord node received a merge result with empty top docs, which has nothing interesting for merging, but that can lead to an exception because the type of the empty array does not match the type of other shards results, for instance if the query was sorted by field. To resolve this, we filter out empty top docs results before merging. Closes #126118 --------- Co-authored-by: Luca Cavanna <[email protected]>
1 parent 240d4dd commit e9c179a

File tree

26 files changed

+1170
-68
lines changed

26 files changed

+1170
-68
lines changed

docs/changelog/121885.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 121885
2+
summary: Introduce batched query execution and data-node side reduce
3+
area: Search
4+
type: enhancement
5+
issues: []

docs/changelog/126385.yaml

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 126385
2+
summary: Filter out empty top docs results before merging
3+
area: Search
4+
type: bug
5+
issues:
6+
- 126118

qa/smoke-test-http/src/internalClusterTest/java/org/elasticsearch/http/SearchErrorTraceIT.java

+9
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.action.search.MultiSearchRequest;
1717
import org.elasticsearch.action.search.SearchRequest;
1818
import org.elasticsearch.client.Request;
19+
import org.elasticsearch.common.settings.Settings;
1920
import org.elasticsearch.common.util.CollectionUtils;
2021
import org.elasticsearch.plugins.Plugin;
2122
import org.elasticsearch.search.ErrorTraceHelper;
@@ -24,6 +25,7 @@
2425
import org.elasticsearch.test.MockLog;
2526
import org.elasticsearch.test.transport.MockTransportService;
2627
import org.elasticsearch.xcontent.XContentType;
28+
import org.junit.After;
2729
import org.junit.Before;
2830
import org.junit.BeforeClass;
2931

@@ -50,6 +52,13 @@ public static void setDebugLogLevel() {
5052
@Before
5153
public void setupMessageListener() {
5254
hasStackTrace = ErrorTraceHelper.setupErrorTraceListener(internalCluster());
55+
// TODO: make this test work with batched query execution by enhancing ErrorTraceHelper.setupErrorTraceListener
56+
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
57+
}
58+
59+
@After
60+
public void resetSettings() {
61+
updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey()));
5362
}
5463

5564
private void setupIndexWithDocs() {

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/search/120_batch_reduce_size.yml

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
setup:
2+
- skip:
3+
awaits_fix: "TODO fix this test, the response with batched execution is not deterministic enough for the available matchers"
4+
25
- do:
36
indices.create:
47
index: test_1
@@ -48,7 +51,6 @@ setup:
4851
batched_reduce_size: 2
4952
body: { "size" : 0, "aggs" : { "str_terms" : { "terms" : { "field" : "str" } } } }
5053

51-
- match: { num_reduce_phases: 4 }
5254
- match: { hits.total: 3 }
5355
- length: { aggregations.str_terms.buckets: 2 }
5456
- match: { aggregations.str_terms.buckets.0.key: "abc" }

server/src/internalClusterTest/java/org/elasticsearch/action/IndicesRequestIT.java

+2-9
Original file line numberDiff line numberDiff line change
@@ -574,11 +574,8 @@ public void testSearchQueryThenFetch() throws Exception {
574574
);
575575

576576
clearInterceptedActions();
577-
assertIndicesSubset(
578-
Arrays.asList(searchRequest.indices()),
579-
SearchTransportService.QUERY_ACTION_NAME,
580-
SearchTransportService.FETCH_ID_ACTION_NAME
581-
);
577+
assertIndicesSubset(Arrays.asList(searchRequest.indices()), true, SearchTransportService.QUERY_ACTION_NAME);
578+
assertIndicesSubset(Arrays.asList(searchRequest.indices()), SearchTransportService.FETCH_ID_ACTION_NAME);
582579
}
583580

584581
public void testSearchDfsQueryThenFetch() throws Exception {
@@ -631,10 +628,6 @@ private static void assertIndicesSubset(List<String> indices, String... actions)
631628
assertIndicesSubset(indices, false, actions);
632629
}
633630

634-
private static void assertIndicesSubsetOptionalRequests(List<String> indices, String... actions) {
635-
assertIndicesSubset(indices, true, actions);
636-
}
637-
638631
private static void assertIndicesSubset(List<String> indices, boolean optional, String... actions) {
639632
// indices returned by each bulk shard request need to be a subset of the original indices
640633
for (String action : actions) {

server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/node/tasks/TasksIT.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import org.elasticsearch.index.query.QueryBuilders;
4242
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
4343
import org.elasticsearch.plugins.Plugin;
44+
import org.elasticsearch.search.SearchService;
4445
import org.elasticsearch.search.builder.SearchSourceBuilder;
4546
import org.elasticsearch.tasks.RemovedTaskListener;
4647
import org.elasticsearch.tasks.Task;
@@ -352,6 +353,8 @@ public void testTransportBulkTasks() {
352353
}
353354

354355
public void testSearchTaskDescriptions() {
356+
// TODO: enhance this test to also check the tasks created by batched query execution
357+
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
355358
registerTaskManagerListeners(TransportSearchAction.TYPE.name()); // main task
356359
registerTaskManagerListeners(TransportSearchAction.TYPE.name() + "[*]"); // shard task
357360
createIndex("test");
@@ -398,7 +401,7 @@ public void testSearchTaskDescriptions() {
398401
// assert that all task descriptions have non-zero length
399402
assertThat(taskInfo.description().length(), greaterThan(0));
400403
}
401-
404+
updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey()));
402405
}
403406

404407
public void testSearchTaskHeaderLimit() {

server/src/internalClusterTest/java/org/elasticsearch/action/search/TransportSearchIT.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.elasticsearch.rest.RestStatus;
4141
import org.elasticsearch.search.DocValueFormat;
4242
import org.elasticsearch.search.SearchHit;
43+
import org.elasticsearch.search.SearchService;
4344
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
4445
import org.elasticsearch.search.aggregations.AggregationBuilder;
4546
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
@@ -445,6 +446,7 @@ public void testSearchIdle() throws Exception {
445446
}
446447

447448
public void testCircuitBreakerReduceFail() throws Exception {
449+
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
448450
int numShards = randomIntBetween(1, 10);
449451
indexSomeDocs("test", numShards, numShards * 3);
450452

@@ -518,7 +520,9 @@ public void onFailure(Exception exc) {
518520
}
519521
assertBusy(() -> assertThat(requestBreakerUsed(), equalTo(0L)));
520522
} finally {
521-
updateClusterSettings(Settings.builder().putNull("indices.breaker.request.limit"));
523+
updateClusterSettings(
524+
Settings.builder().putNull("indices.breaker.request.limit").putNull(SearchService.BATCHED_QUERY_PHASE.getKey())
525+
);
522526
}
523527
}
524528

server/src/internalClusterTest/java/org/elasticsearch/search/SearchCancellationIT.java

+3
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.action.search.TransportSearchAction;
2424
import org.elasticsearch.action.search.TransportSearchScrollAction;
2525
import org.elasticsearch.common.Strings;
26+
import org.elasticsearch.common.settings.Settings;
2627
import org.elasticsearch.core.TimeValue;
2728
import org.elasticsearch.script.Script;
2829
import org.elasticsearch.script.ScriptType;
@@ -239,6 +240,8 @@ public void testCancelMultiSearch() throws Exception {
239240
}
240241

241242
public void testCancelFailedSearchWhenPartialResultDisallowed() throws Exception {
243+
// TODO: make this test compatible with batched execution, currently the exceptions are slightly different with batched
244+
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
242245
// Have at least two nodes so that we have parallel execution of two request guaranteed even if max concurrent requests per node
243246
// are limited to 1
244247
internalCluster().ensureAtLeastNumDataNodes(2);

server/src/internalClusterTest/java/org/elasticsearch/search/aggregations/bucket/TermsDocCountErrorIT.java

+15
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,15 @@
1313
import org.elasticsearch.action.search.SearchResponse;
1414
import org.elasticsearch.cluster.metadata.IndexMetadata;
1515
import org.elasticsearch.common.settings.Settings;
16+
import org.elasticsearch.search.SearchService;
1617
import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
1718
import org.elasticsearch.search.aggregations.BucketOrder;
1819
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
1920
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
2021
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory.ExecutionMode;
2122
import org.elasticsearch.test.ESIntegTestCase;
23+
import org.junit.After;
24+
import org.junit.Before;
2225

2326
import java.io.IOException;
2427
import java.util.ArrayList;
@@ -50,6 +53,18 @@ public static String randomExecutionHint() {
5053

5154
private static int numRoutingValues;
5255

56+
@Before
57+
public void disableBatchedExecution() {
58+
// TODO: it's practically impossible to get a 100% deterministic test with batched execution unfortunately, adjust this test to
59+
// still do something useful with batched execution (i.e. use somewhat relaxed assertions)
60+
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
61+
}
62+
63+
@After
64+
public void resetSettings() {
65+
updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey()));
66+
}
67+
5368
@Override
5469
public void setupSuiteScopeCluster() throws Exception {
5570
assertAcked(indicesAdmin().prepareCreate("idx").setMapping(STRING_FIELD_NAME, "type=keyword").get());

server/src/main/java/org/elasticsearch/TransportVersions.java

+1
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ static TransportVersion def(int id) {
203203
public static final TransportVersion REMOTE_EXCEPTION_8_19 = def(8_841_0_16);
204204
public static final TransportVersion AMAZON_BEDROCK_TASK_SETTINGS_8_19 = def(8_841_0_17);
205205
public static final TransportVersion SEMANTIC_TEXT_CHUNKING_CONFIG_8_19 = def(8_841_0_18);
206+
public static final TransportVersion BATCHED_QUERY_PHASE_VERSION_BACKPORT_8_X = def(8_841_0_19);
206207

207208
/*
208209
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

+18-11
Original file line numberDiff line numberDiff line change
@@ -65,33 +65,33 @@
6565
* distributed frequencies
6666
*/
6767
abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> extends SearchPhase {
68-
private static final float DEFAULT_INDEX_BOOST = 1.0f;
68+
protected static final float DEFAULT_INDEX_BOOST = 1.0f;
6969
private final Logger logger;
7070
private final NamedWriteableRegistry namedWriteableRegistry;
71-
private final SearchTransportService searchTransportService;
71+
protected final SearchTransportService searchTransportService;
7272
private final Executor executor;
7373
private final ActionListener<SearchResponse> listener;
74-
private final SearchRequest request;
74+
protected final SearchRequest request;
7575

7676
/**
7777
* Used by subclasses to resolve node ids to DiscoveryNodes.
7878
**/
7979
private final BiFunction<String, String, Transport.Connection> nodeIdToConnection;
80-
private final SearchTask task;
80+
protected final SearchTask task;
8181
protected final SearchPhaseResults<Result> results;
8282
private final long clusterStateVersion;
8383
private final TransportVersion minTransportVersion;
84-
private final Map<String, AliasFilter> aliasFilter;
85-
private final Map<String, Float> concreteIndexBoosts;
84+
protected final Map<String, AliasFilter> aliasFilter;
85+
protected final Map<String, Float> concreteIndexBoosts;
8686
private final SetOnce<AtomicArray<ShardSearchFailure>> shardFailures = new SetOnce<>();
8787
private final Object shardFailuresMutex = new Object();
8888
private final AtomicBoolean hasShardResponse = new AtomicBoolean(false);
8989
private final AtomicInteger successfulOps;
90-
private final SearchTimeProvider timeProvider;
90+
protected final SearchTimeProvider timeProvider;
9191
private final SearchResponse.Clusters clusters;
9292

9393
protected final List<SearchShardIterator> shardsIts;
94-
private final SearchShardIterator[] shardIterators;
94+
protected final SearchShardIterator[] shardIterators;
9595
private final AtomicInteger outstandingShards;
9696
private final int maxConcurrentRequestsPerNode;
9797
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
@@ -231,10 +231,17 @@ protected final void run() {
231231
onPhaseDone();
232232
return;
233233
}
234+
if (shardsIts.isEmpty()) {
235+
return;
236+
}
234237
final Map<SearchShardIterator, Integer> shardIndexMap = Maps.newHashMapWithExpectedSize(shardIterators.length);
235238
for (int i = 0; i < shardIterators.length; i++) {
236239
shardIndexMap.put(shardIterators[i], i);
237240
}
241+
doRun(shardIndexMap);
242+
}
243+
244+
protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
238245
doCheckNoMissingShards(getName(), request, shardsIts);
239246
Version version = request.minCompatibleShardNode();
240247
if (version != null && Version.CURRENT.minimumCompatibilityVersion().equals(version) == false) {
@@ -275,7 +282,7 @@ private boolean checkMinimumVersion(List<SearchShardIterator> shardsIts) {
275282
return true;
276283
}
277284

278-
private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
285+
protected final void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
279286
if (throttleConcurrentRequests) {
280287
var pendingExecutions = pendingExecutionsPerNode.computeIfAbsent(
281288
shard.getNodeId(),
@@ -315,7 +322,7 @@ public void onFailure(Exception e) {
315322
executePhaseOnShard(shardIt, connection, shardListener);
316323
}
317324

318-
private void failOnUnavailable(int shardIndex, SearchShardIterator shardIt) {
325+
protected final void failOnUnavailable(int shardIndex, SearchShardIterator shardIt) {
319326
SearchShardTarget unassignedShard = new SearchShardTarget(null, shardIt.shardId(), shardIt.getClusterAlias());
320327
onShardFailure(shardIndex, unassignedShard, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
321328
}
@@ -422,7 +429,7 @@ private ShardSearchFailure[] buildShardFailures() {
422429
return failures;
423430
}
424431

425-
private void onShardFailure(final int shardIndex, SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) {
432+
protected final void onShardFailure(final int shardIndex, SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) {
426433
// we always add the shard failure for a specific shard instance
427434
// we do make sure to clean it on a successful response from a shard
428435
onShardFailure(shardIndex, shard, e);

server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ public void onFailure(Exception e) {
344344
}
345345
}
346346

347-
private record SendingTarget(@Nullable String clusterAlias, @Nullable String nodeId) {}
347+
public record SendingTarget(@Nullable String clusterAlias, @Nullable String nodeId) {}
348348

349349
private CanMatchNodeRequest createCanMatchRequest(Map.Entry<SendingTarget, List<SearchShardIterator>> entry) {
350350
final SearchShardIterator first = entry.getValue().get(0);

0 commit comments

Comments
 (0)