Skip to content

Introduce batched query execution and data-node side reduce #121885

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 101 commits into from
Mar 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
101 commits
Select commit Hold shift + click to select a range
8f4a650
Introduce batched query execution and data-node side reduce
original-brownbear Feb 6, 2025
adcad03
[CI] Auto commit changes from spotless
elasticsearchmachine Feb 6, 2025
1082dd8
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Feb 6, 2025
62f7c4f
fix test
original-brownbear Feb 6, 2025
643c4a2
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Feb 6, 2025
a79c3b3
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Feb 7, 2025
ce1d136
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Feb 7, 2025
df69ec3
shorter
original-brownbear Feb 7, 2025
65f9b36
[CI] Auto commit changes from spotless
elasticsearchmachine Feb 7, 2025
db88f8a
make ccs work, bring back skip
original-brownbear Feb 7, 2025
c659106
Merge remote-tracking branch 'origin/batched-exec-short' into batched…
original-brownbear Feb 7, 2025
72f8845
fixes
original-brownbear Feb 8, 2025
249c078
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Feb 8, 2025
b3a85b1
fixes
original-brownbear Feb 9, 2025
d9bea59
fixes
original-brownbear Feb 9, 2025
07bc5e8
fixes
original-brownbear Feb 9, 2025
b942e4e
bwc correctly
original-brownbear Feb 9, 2025
7064ade
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Feb 10, 2025
8a7edfa
mute
original-brownbear Feb 10, 2025
d1b5249
drier
original-brownbear Feb 10, 2025
8a1b272
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Feb 10, 2025
61bc61c
dry
original-brownbear Feb 10, 2025
616ef31
revert noise
original-brownbear Feb 10, 2025
cddd698
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Feb 10, 2025
f429eef
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Feb 11, 2025
cec0c0f
escpae hatch
original-brownbear Feb 11, 2025
3ce802a
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Feb 11, 2025
7ce6d07
fix more tests
original-brownbear Feb 11, 2025
b8cd5f3
fix another test
original-brownbear Feb 11, 2025
478d394
some more cleanup
original-brownbear Feb 11, 2025
a9a71f3
some more cleanup
original-brownbear Feb 12, 2025
9b66ba1
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Feb 12, 2025
7393d76
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Feb 12, 2025
cd37c5b
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Feb 12, 2025
e990b24
no batching in term error tests
original-brownbear Feb 12, 2025
44cd922
comments and such
original-brownbear Feb 12, 2025
ad7213d
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Feb 12, 2025
b06d960
Update docs/changelog/121885.yaml
original-brownbear Feb 12, 2025
272eafd
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Feb 13, 2025
e4c9fd7
Merge remote-tracking branch 'origin/batched-exec-short' into batched…
original-brownbear Feb 13, 2025
5a21508
cleaner
original-brownbear Feb 13, 2025
dc06496
much smaller request
original-brownbear Feb 13, 2025
5ad9121
avoid pointless fork
original-brownbear Feb 13, 2025
9347f3c
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Feb 13, 2025
8e992ac
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Feb 14, 2025
b02c281
do not send all index names with search request
original-brownbear Feb 14, 2025
cc5d494
no need for shards list after partial
original-brownbear Feb 14, 2025
078ccfb
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Feb 16, 2025
a16508f
fix bug
original-brownbear Feb 16, 2025
b48d64d
save condition
original-brownbear Feb 16, 2025
70a7b94
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Feb 18, 2025
987a5b3
fix compile'
original-brownbear Feb 24, 2025
41b4e4a
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Feb 26, 2025
95e7257
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Feb 27, 2025
e409dae
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Feb 27, 2025
e47fcf3
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Feb 28, 2025
6127e44
CR comments
original-brownbear Mar 4, 2025
bc9c454
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Mar 4, 2025
c3f225d
dry failure handling
original-brownbear Mar 4, 2025
10628b7
savings
original-brownbear Mar 4, 2025
2bf75ec
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Mar 7, 2025
06f9eec
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Mar 12, 2025
f503fa7
CR comments
original-brownbear Mar 12, 2025
c796d16
adjust
original-brownbear Mar 12, 2025
b149807
comment and rename
original-brownbear Mar 12, 2025
7dd906e
rename + docs
original-brownbear Mar 12, 2025
f63f8b2
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Mar 13, 2025
2f5fc4f
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Mar 16, 2025
cfa8f09
fix compile
original-brownbear Mar 16, 2025
e19a01a
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Mar 20, 2025
93ec42c
CR comments
original-brownbear Mar 20, 2025
6bcd31d
fix comment
original-brownbear Mar 20, 2025
883e3b9
morar docs
original-brownbear Mar 20, 2025
b99f6ce
package private
original-brownbear Mar 20, 2025
fcbed71
package private
original-brownbear Mar 20, 2025
2028217
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Mar 20, 2025
f89d2e9
reword todo
original-brownbear Mar 20, 2025
447f9fd
package private
original-brownbear Mar 20, 2025
2ecacf2
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Mar 20, 2025
75b16d6
remove dead code
original-brownbear Mar 20, 2025
35ca606
comment
original-brownbear Mar 20, 2025
a060542
assert
original-brownbear Mar 20, 2025
b4e3fa6
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Mar 20, 2025
6a88cfc
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Mar 25, 2025
7294a60
CR: comments
original-brownbear Mar 25, 2025
4f516f0
cleanup
original-brownbear Mar 25, 2025
196bc9d
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Mar 26, 2025
1f69c72
cb is back
original-brownbear Mar 27, 2025
88efd18
CR: comments
original-brownbear Mar 27, 2025
d927c4b
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Mar 27, 2025
0d41bd9
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Mar 27, 2025
1dcaab0
comment
original-brownbear Mar 27, 2025
4530039
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Mar 27, 2025
f6f02ad
not on serverless by default
original-brownbear Mar 27, 2025
4874dfd
Merge remote-tracking branch 'elastic/main' into batched-exec-short
original-brownbear Mar 28, 2025
c5871f7
add feature flag
original-brownbear Mar 28, 2025
d8997b5
Merge branch 'main' into batched-exec-short
original-brownbear Mar 28, 2025
ffb8d26
Merge branch 'main' into batched-exec-short
original-brownbear Mar 28, 2025
e11828c
Merge branch 'main' into batched-exec-short
original-brownbear Mar 28, 2025
771ca68
Merge branch 'main' into batched-exec-short
original-brownbear Mar 28, 2025
fe8aaa8
Merge branch 'main' into batched-exec-short
original-brownbear Mar 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/121885.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 121885
summary: Introduce batched query execution and data-node side reduce
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
import org.elasticsearch.action.search.MultiSearchRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.ErrorTraceHelper;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.xcontent.XContentType;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
Expand All @@ -40,6 +43,13 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
@Before
public void setupMessageListener() {
hasStackTrace = ErrorTraceHelper.setupErrorTraceListener(internalCluster());
// TODO: make this test work with batched query execution by enhancing ErrorTraceHelper.setupErrorTraceListener
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
}

@After
public void resetSettings() {
updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey()));
}

private void setupIndexWithDocs() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
setup:
- skip:
awaits_fix: "TODO fix this test, the response with batched execution is not deterministic enough for the available matchers"

- do:
indices.create:
index: test_1
Expand Down Expand Up @@ -48,7 +51,6 @@ setup:
batched_reduce_size: 2
body: { "size" : 0, "aggs" : { "str_terms" : { "terms" : { "field" : "str" } } } }

- match: { num_reduce_phases: 4 }
- match: { hits.total: 3 }
- length: { aggregations.str_terms.buckets: 2 }
- match: { aggregations.str_terms.buckets.0.key: "abc" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,11 +562,8 @@ public void testSearchQueryThenFetch() throws Exception {
);

clearInterceptedActions();
assertIndicesSubset(
Arrays.asList(searchRequest.indices()),
SearchTransportService.QUERY_ACTION_NAME,
SearchTransportService.FETCH_ID_ACTION_NAME
);
assertIndicesSubset(Arrays.asList(searchRequest.indices()), true, SearchTransportService.QUERY_ACTION_NAME);
assertIndicesSubset(Arrays.asList(searchRequest.indices()), SearchTransportService.FETCH_ID_ACTION_NAME);
}

public void testSearchDfsQueryThenFetch() throws Exception {
Expand Down Expand Up @@ -619,10 +616,6 @@ private static void assertIndicesSubset(List<String> indices, String... actions)
assertIndicesSubset(indices, false, actions);
}

private static void assertIndicesSubsetOptionalRequests(List<String> indices, String... actions) {
assertIndicesSubset(indices, true, actions);
}

private static void assertIndicesSubset(List<String> indices, boolean optional, String... actions) {
// indices returned by each bulk shard request need to be a subset of the original indices
for (String action : actions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.tasks.RemovedTaskListener;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -352,6 +353,8 @@ public void testTransportBulkTasks() {
}

public void testSearchTaskDescriptions() {
// TODO: enhance this test to also check the tasks created by batched query execution
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
registerTaskManagerListeners(TransportSearchAction.TYPE.name()); // main task
registerTaskManagerListeners(TransportSearchAction.TYPE.name() + "[*]"); // shard task
createIndex("test");
Expand Down Expand Up @@ -398,7 +401,7 @@ public void testSearchTaskDescriptions() {
// assert that all task descriptions have non-zero length
assertThat(taskInfo.description().length(), greaterThan(0));
}

updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey()));
}

public void testSearchTaskHeaderLimit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.AbstractAggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationExecutionContext;
Expand Down Expand Up @@ -446,6 +447,7 @@ public void testSearchIdle() throws Exception {
}

public void testCircuitBreakerReduceFail() throws Exception {
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
int numShards = randomIntBetween(1, 10);
indexSomeDocs("test", numShards, numShards * 3);

Expand Down Expand Up @@ -519,7 +521,9 @@ public void onFailure(Exception exc) {
}
assertBusy(() -> assertThat(requestBreakerUsed(), equalTo(0L)));
} finally {
updateClusterSettings(Settings.builder().putNull("indices.breaker.request.limit"));
updateClusterSettings(
Settings.builder().putNull("indices.breaker.request.limit").putNull(SearchService.BATCHED_QUERY_PHASE.getKey())
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.search.TransportSearchScrollAction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
Expand Down Expand Up @@ -239,6 +240,8 @@ public void testCancelMultiSearch() throws Exception {
}

public void testCancelFailedSearchWhenPartialResultDisallowed() throws Exception {
// TODO: make this test compatible with batched execution, currently the exceptions are slightly different with batched
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
// Have at least two nodes so that we have parallel execution of two request guaranteed even if max concurrent requests per node
// are limited to 1
internalCluster().ensureAtLeastNumDataNodes(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,15 @@
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.Aggregator.SubAggCollectionMode;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorFactory.ExecutionMode;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -50,6 +53,18 @@ public static String randomExecutionHint() {

private static int numRoutingValues;

@Before
public void disableBatchedExecution() {
// TODO: it's practically impossible to get a 100% deterministic test with batched execution unfortunately, adjust this test to
// still do something useful with batched execution (i.e. use somewhat relaxed assertions)
updateClusterSettings(Settings.builder().put(SearchService.BATCHED_QUERY_PHASE.getKey(), false));
}

@After
public void resetSettings() {
updateClusterSettings(Settings.builder().putNull(SearchService.BATCHED_QUERY_PHASE.getKey()));
}

@Override
public void setupSuiteScopeCluster() throws Exception {
assertAcked(indicesAdmin().prepareCreate("idx").setMapping(STRING_FIELD_NAME, "type=keyword").get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ static TransportVersion def(int id) {
public static final TransportVersion PROJECT_ID_IN_SNAPSHOT = def(9_040_0_00);
public static final TransportVersion INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD = def(9_041_0_00);
public static final TransportVersion REPOSITORIES_METADATA_AS_PROJECT_CUSTOM = def(9_042_0_00);
public static final TransportVersion BATCHED_QUERY_PHASE_VERSION = def(9_043_0_00);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,33 +64,33 @@
* distributed frequencies
*/
abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> extends SearchPhase {
private static final float DEFAULT_INDEX_BOOST = 1.0f;
protected static final float DEFAULT_INDEX_BOOST = 1.0f;
private final Logger logger;
private final NamedWriteableRegistry namedWriteableRegistry;
private final SearchTransportService searchTransportService;
protected final SearchTransportService searchTransportService;
private final Executor executor;
private final ActionListener<SearchResponse> listener;
private final SearchRequest request;
protected final SearchRequest request;

/**
* Used by subclasses to resolve node ids to DiscoveryNodes.
**/
private final BiFunction<String, String, Transport.Connection> nodeIdToConnection;
private final SearchTask task;
protected final SearchTask task;
protected final SearchPhaseResults<Result> results;
private final long clusterStateVersion;
private final TransportVersion minTransportVersion;
private final Map<String, AliasFilter> aliasFilter;
private final Map<String, Float> concreteIndexBoosts;
protected final Map<String, AliasFilter> aliasFilter;
protected final Map<String, Float> concreteIndexBoosts;
private final SetOnce<AtomicArray<ShardSearchFailure>> shardFailures = new SetOnce<>();
private final Object shardFailuresMutex = new Object();
private final AtomicBoolean hasShardResponse = new AtomicBoolean(false);
private final AtomicInteger successfulOps;
private final SearchTimeProvider timeProvider;
protected final SearchTimeProvider timeProvider;
private final SearchResponse.Clusters clusters;

protected final List<SearchShardIterator> shardsIts;
private final SearchShardIterator[] shardIterators;
protected final SearchShardIterator[] shardIterators;
private final AtomicInteger outstandingShards;
private final int maxConcurrentRequestsPerNode;
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -230,10 +230,17 @@ protected final void run() {
onPhaseDone();
return;
}
if (shardsIts.isEmpty()) {
return;
}
final Map<SearchShardIterator, Integer> shardIndexMap = Maps.newHashMapWithExpectedSize(shardIterators.length);
for (int i = 0; i < shardIterators.length; i++) {
shardIndexMap.put(shardIterators[i], i);
}
doRun(shardIndexMap);
}

protected void doRun(Map<SearchShardIterator, Integer> shardIndexMap) {
doCheckNoMissingShards(getName(), request, shardsIts);
for (int i = 0; i < shardsIts.size(); i++) {
final SearchShardIterator shardRoutings = shardsIts.get(i);
Expand All @@ -249,7 +256,7 @@ protected final void run() {
}
}

private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
protected final void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
if (throttleConcurrentRequests) {
var pendingExecutions = pendingExecutionsPerNode.computeIfAbsent(
shard.getNodeId(),
Expand Down Expand Up @@ -289,7 +296,7 @@ public void onFailure(Exception e) {
executePhaseOnShard(shardIt, connection, shardListener);
}

private void failOnUnavailable(int shardIndex, SearchShardIterator shardIt) {
protected final void failOnUnavailable(int shardIndex, SearchShardIterator shardIt) {
SearchShardTarget unassignedShard = new SearchShardTarget(null, shardIt.shardId(), shardIt.getClusterAlias());
onShardFailure(shardIndex, unassignedShard, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
}
Expand Down Expand Up @@ -396,7 +403,7 @@ private ShardSearchFailure[] buildShardFailures() {
return failures;
}

private void onShardFailure(final int shardIndex, SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) {
protected final void onShardFailure(final int shardIndex, SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) {
// we always add the shard failure for a specific shard instance
// we do make sure to clean it on a successful response from a shard
onShardFailure(shardIndex, shard, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ public void onFailure(Exception e) {
}
}

private record SendingTarget(@Nullable String clusterAlias, @Nullable String nodeId) {}
public record SendingTarget(@Nullable String clusterAlias, @Nullable String nodeId) {}

private CanMatchNodeRequest createCanMatchRequest(Map.Entry<SendingTarget, List<SearchShardIterator>> entry) {
final SearchShardIterator first = entry.getValue().get(0);
Expand Down
Loading
Loading