Skip to content

Commit 88ed45c

Browse files
authored
Node level can match action (elastic#78765) (elastic#79344)
Changes can-match from a shard-level to a node-level action, which helps avoid an explosion of shard-level can-match subrequests in clusters with many shards, that can cause stability issues. Also introduces a new search_coordination thread pool to handle the sending and handling of node-level can-match requests.
1 parent bf3fcbb commit 88ed45c

File tree

20 files changed

+1172
-432
lines changed

20 files changed

+1172
-432
lines changed

docs/reference/modules/threadpool.asciidoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ There are several thread pools, but the important ones include:
2323
Thread pool type is `fixed_auto_queue_size` with a size of `1`, and initial
2424
queue_size of `100`.
2525

26+
`search_coordination`::
27+
For lightweight search-related coordination operations. Thread pool type is
28+
`fixed` with a size of a max of `min(5, (`<<node.processors,
29+
`# of allocated processors`>>`) / 2)`, and queue_size of `1000`.
30+
2631
`get`::
2732
For get operations. Thread pool type is `fixed`
2833
with a size of <<node.processors, `# of allocated processors`>>,

qa/ccs-rolling-upgrade-remote-cluster/src/test/java/org/elasticsearch/upgrades/SearchStatesIT.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ static int indexDocs(RestHighLevelClient client, String index, int numDocs) thro
8989
return numDocs;
9090
}
9191

92-
void verifySearch(String localIndex, int localNumDocs, String remoteIndex, int remoteNumDocs) {
92+
void verifySearch(String localIndex, int localNumDocs, String remoteIndex, int remoteNumDocs, Integer preFilterShardSize) {
9393
try (RestHighLevelClient localClient = newLocalClient(LOGGER)) {
9494
Request request = new Request("POST", "/_search");
9595
final int expectedDocs;
@@ -103,6 +103,12 @@ void verifySearch(String localIndex, int localNumDocs, String remoteIndex, int r
103103
if (UPGRADE_FROM_VERSION.onOrAfter(Version.V_7_0_0)) {
104104
request.addParameter("ccs_minimize_roundtrips", Boolean.toString(randomBoolean()));
105105
}
106+
if (preFilterShardSize == null && randomBoolean()) {
107+
preFilterShardSize = randomIntBetween(1, 100);
108+
}
109+
if (preFilterShardSize != null) {
110+
request.addParameter("pre_filter_shard_size", Integer.toString(preFilterShardSize));
111+
}
106112
int size = between(1, 100);
107113
request.setJsonEntity("{\"sort\": \"f\", \"size\": " + size + "}");
108114
Response response = localClient.getLowLevelClient().performRequest(request);
@@ -142,7 +148,32 @@ public void testBWCSearchStates() throws Exception {
142148
configureRemoteClusters(remoteNodes, CLUSTER_ALIAS, UPGRADE_FROM_VERSION, LOGGER);
143149
int iterations = between(1, 20);
144150
for (int i = 0; i < iterations; i++) {
145-
verifySearch(localIndex, localNumDocs, CLUSTER_ALIAS + ":" + remoteIndex, remoteNumDocs);
151+
verifySearch(localIndex, localNumDocs, CLUSTER_ALIAS + ":" + remoteIndex, remoteNumDocs, null);
152+
}
153+
localClient.indices().delete(new DeleteIndexRequest(localIndex), RequestOptions.DEFAULT);
154+
remoteClient.indices().delete(new DeleteIndexRequest(remoteIndex), RequestOptions.DEFAULT);
155+
}
156+
}
157+
158+
public void testCanMatch() throws Exception {
159+
String localIndex = "test_can_match_local_index";
160+
String remoteIndex = "test_can_match_remote_index";
161+
try (RestHighLevelClient localClient = newLocalClient(LOGGER);
162+
RestHighLevelClient remoteClient = newRemoteClient()) {
163+
localClient.indices().create(new CreateIndexRequest(localIndex)
164+
.settings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(5, 20))),
165+
RequestOptions.DEFAULT);
166+
int localNumDocs = indexDocs(localClient, localIndex, between(10, 100));
167+
168+
remoteClient.indices().create(new CreateIndexRequest(remoteIndex)
169+
.settings(Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, between(5, 20))),
170+
RequestOptions.DEFAULT);
171+
int remoteNumDocs = indexDocs(remoteClient, remoteIndex, between(10, 100));
172+
173+
configureRemoteClusters(getNodes(remoteClient.getLowLevelClient()), CLUSTER_ALIAS, UPGRADE_FROM_VERSION, LOGGER);
174+
int iterations = between(1, 10);
175+
for (int i = 0; i < iterations; i++) {
176+
verifySearch(localIndex, localNumDocs, CLUSTER_ALIAS + ":" + remoteIndex, remoteNumDocs, between(1, 10));
146177
}
147178
localClient.indices().delete(new DeleteIndexRequest(localIndex), RequestOptions.DEFAULT);
148179
remoteClient.indices().delete(new DeleteIndexRequest(remoteIndex), RequestOptions.DEFAULT);

qa/multi-cluster-search/src/test/java/org/elasticsearch/search/CCSDuelIT.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -724,7 +724,11 @@ private static void assumeMultiClusterSetup() {
724724
private static SearchRequest initSearchRequest() {
725725
List<String> indices = Arrays.asList(INDEX_NAME, "my_remote_cluster:" + INDEX_NAME);
726726
Collections.shuffle(indices, random());
727-
return new SearchRequest(indices.toArray(new String[0]));
727+
final SearchRequest request = new SearchRequest(indices.toArray(new String[0]));
728+
if (randomBoolean()) {
729+
request.setPreFilterShardSize(between(1, 20));
730+
}
731+
return request;
728732
}
729733

730734
private static void duelSearch(SearchRequest searchRequest, Consumer<SearchResponse> responseChecker) throws Exception {

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,10 @@
2323
import org.elasticsearch.action.support.TransportActions;
2424
import org.elasticsearch.cluster.ClusterState;
2525
import org.elasticsearch.cluster.routing.GroupShardsIterator;
26-
import org.elasticsearch.core.Releasable;
27-
import org.elasticsearch.core.Releasables;
2826
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2927
import org.elasticsearch.common.util.concurrent.AtomicArray;
30-
import org.elasticsearch.core.TimeValue;
31-
import org.elasticsearch.index.seqno.SequenceNumbers;
28+
import org.elasticsearch.core.Releasable;
29+
import org.elasticsearch.core.Releasables;
3230
import org.elasticsearch.index.shard.ShardId;
3331
import org.elasticsearch.search.SearchContextMissingException;
3432
import org.elasticsearch.search.SearchPhaseResult;
@@ -65,7 +63,6 @@
6563
*/
6664
abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> extends SearchPhase implements SearchPhaseContext {
6765
private static final float DEFAULT_INDEX_BOOST = 1.0f;
68-
private static final long[] EMPTY_LONG_ARRAY = new long[0];
6966
private final Logger logger;
7067
private final SearchTransportService searchTransportService;
7168
private final Executor executor;
@@ -736,21 +733,9 @@ public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shar
736733
AliasFilter filter = aliasFilter.get(shardIt.shardId().getIndex().getUUID());
737734
assert filter != null;
738735
float indexBoost = concreteIndexBoosts.getOrDefault(shardIt.shardId().getIndex().getUUID(), DEFAULT_INDEX_BOOST);
739-
final Map<String, long[]> indexToWaitForCheckpoints = request.getWaitForCheckpoints();
740-
final TimeValue waitForCheckpointsTimeout = request.getWaitForCheckpointsTimeout();
741-
final long[] waitForCheckpoints = indexToWaitForCheckpoints.getOrDefault(shardIt.shardId().getIndex().getName(), EMPTY_LONG_ARRAY);
742-
743-
long waitForCheckpoint;
744-
if (waitForCheckpoints.length == 0) {
745-
waitForCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
746-
} else {
747-
assert waitForCheckpoints.length > shardIndex;
748-
waitForCheckpoint = waitForCheckpoints[shardIndex];
749-
}
750736
ShardSearchRequest shardRequest = new ShardSearchRequest(shardIt.getOriginalIndices(), request,
751737
shardIt.shardId(), shardIndex, getNumShards(), filter, indexBoost, timeProvider.getAbsoluteStartMillis(),
752-
shardIt.getClusterAlias(), shardIt.getSearchContextId(), shardIt.getSearchContextKeepAlive(), waitForCheckpoint,
753-
waitForCheckpointsTimeout);
738+
shardIt.getClusterAlias(), shardIt.getSearchContextId(), shardIt.getSearchContextKeepAlive());
754739
// if we already received a search result we can inform the shard that it
755740
// can return a null response if the request rewrites to match none rather
756741
// than creating an empty response in the search thread pool.
Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the Server Side Public License, v 1; you may not use this file except
5+
* in compliance with, at your election, the Elastic License 2.0 or the Server
6+
* Side Public License, v 1.
7+
*/
8+
9+
package org.elasticsearch.action.search;
10+
11+
import org.elasticsearch.action.IndicesRequest;
12+
import org.elasticsearch.action.OriginalIndices;
13+
import org.elasticsearch.action.support.IndicesOptions;
14+
import org.elasticsearch.common.io.stream.StreamInput;
15+
import org.elasticsearch.common.io.stream.StreamOutput;
16+
import org.elasticsearch.common.io.stream.Writeable;
17+
import org.elasticsearch.core.Nullable;
18+
import org.elasticsearch.core.TimeValue;
19+
import org.elasticsearch.index.shard.ShardId;
20+
import org.elasticsearch.search.Scroll;
21+
import org.elasticsearch.search.builder.SearchSourceBuilder;
22+
import org.elasticsearch.search.internal.AliasFilter;
23+
import org.elasticsearch.search.internal.ShardSearchContextId;
24+
import org.elasticsearch.search.internal.ShardSearchRequest;
25+
import org.elasticsearch.tasks.Task;
26+
import org.elasticsearch.tasks.TaskId;
27+
import org.elasticsearch.transport.TransportRequest;
28+
29+
import java.io.IOException;
30+
import java.util.ArrayList;
31+
import java.util.Arrays;
32+
import java.util.List;
33+
import java.util.Map;
34+
import java.util.stream.Collectors;
35+
36+
/**
37+
* Node-level request used during can-match phase
38+
*/
39+
public class CanMatchNodeRequest extends TransportRequest implements IndicesRequest {
40+
41+
private final SearchSourceBuilder source;
42+
private final List<Shard> shards;
43+
private final SearchType searchType;
44+
private final String[] types;
45+
private final Boolean requestCache;
46+
private final boolean allowPartialSearchResults;
47+
private final Scroll scroll;
48+
private final int numberOfShards;
49+
private final long nowInMillis;
50+
@Nullable
51+
private final String clusterAlias;
52+
private final String[] indices;
53+
private final IndicesOptions indicesOptions;
54+
private final TimeValue waitForCheckpointsTimeout;
55+
56+
public static class Shard implements Writeable {
57+
private final String[] indices;
58+
private final ShardId shardId;
59+
private final int shardRequestIndex;
60+
private final AliasFilter aliasFilter;
61+
private final float indexBoost;
62+
private final ShardSearchContextId readerId;
63+
private final TimeValue keepAlive;
64+
private final long waitForCheckpoint;
65+
66+
public Shard(String[] indices,
67+
ShardId shardId,
68+
int shardRequestIndex,
69+
AliasFilter aliasFilter,
70+
float indexBoost,
71+
ShardSearchContextId readerId,
72+
TimeValue keepAlive,
73+
long waitForCheckpoint) {
74+
this.indices = indices;
75+
this.shardId = shardId;
76+
this.shardRequestIndex = shardRequestIndex;
77+
this.aliasFilter = aliasFilter;
78+
this.indexBoost = indexBoost;
79+
this.readerId = readerId;
80+
this.keepAlive = keepAlive;
81+
this.waitForCheckpoint = waitForCheckpoint;
82+
assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive;
83+
}
84+
85+
public Shard(StreamInput in) throws IOException {
86+
indices = in.readStringArray();
87+
shardId = new ShardId(in);
88+
shardRequestIndex = in.readVInt();
89+
aliasFilter = new AliasFilter(in);
90+
indexBoost = in.readFloat();
91+
readerId = in.readOptionalWriteable(ShardSearchContextId::new);
92+
keepAlive = in.readOptionalTimeValue();
93+
waitForCheckpoint = in.readLong();
94+
assert keepAlive == null || readerId != null : "readerId: " + readerId + " keepAlive: " + keepAlive;
95+
}
96+
97+
@Override
98+
public void writeTo(StreamOutput out) throws IOException {
99+
out.writeStringArray(indices);
100+
shardId.writeTo(out);
101+
out.writeVInt(shardRequestIndex);
102+
aliasFilter.writeTo(out);
103+
out.writeFloat(indexBoost);
104+
out.writeOptionalWriteable(readerId);
105+
out.writeOptionalTimeValue(keepAlive);
106+
out.writeLong(waitForCheckpoint);
107+
}
108+
109+
public int getShardRequestIndex() {
110+
return shardRequestIndex;
111+
}
112+
113+
public String[] getOriginalIndices() {
114+
return indices;
115+
}
116+
117+
public ShardId shardId() {
118+
return shardId;
119+
}
120+
}
121+
122+
public CanMatchNodeRequest(
123+
SearchRequest searchRequest,
124+
IndicesOptions indicesOptions,
125+
List<Shard> shards,
126+
int numberOfShards,
127+
long nowInMillis,
128+
@Nullable String clusterAlias
129+
) {
130+
this.source = searchRequest.source();
131+
this.indicesOptions = indicesOptions;
132+
this.shards = new ArrayList<>(shards);
133+
this.searchType = searchRequest.searchType();
134+
this.types = searchRequest.types();
135+
this.requestCache = searchRequest.requestCache();
136+
// If allowPartialSearchResults is unset (ie null), the cluster-level default should have been substituted
137+
// at this stage. Any NPEs in the above are therefore an error in request preparation logic.
138+
assert searchRequest.allowPartialSearchResults() != null;
139+
this.allowPartialSearchResults = searchRequest.allowPartialSearchResults();
140+
this.scroll = searchRequest.scroll();
141+
this.numberOfShards = numberOfShards;
142+
this.nowInMillis = nowInMillis;
143+
this.clusterAlias = clusterAlias;
144+
this.waitForCheckpointsTimeout = searchRequest.getWaitForCheckpointsTimeout();
145+
indices = shards.stream().map(Shard::getOriginalIndices).flatMap(Arrays::stream).distinct()
146+
.toArray(String[]::new);
147+
}
148+
149+
public CanMatchNodeRequest(StreamInput in) throws IOException {
150+
super(in);
151+
source = in.readOptionalWriteable(SearchSourceBuilder::new);
152+
indicesOptions = IndicesOptions.readIndicesOptions(in);
153+
searchType = SearchType.fromId(in.readByte());
154+
types = in.readStringArray();
155+
scroll = in.readOptionalWriteable(Scroll::new);
156+
requestCache = in.readOptionalBoolean();
157+
allowPartialSearchResults = in.readBoolean();
158+
numberOfShards = in.readVInt();
159+
nowInMillis = in.readVLong();
160+
clusterAlias = in.readOptionalString();
161+
waitForCheckpointsTimeout = in.readTimeValue();
162+
shards = in.readList(Shard::new);
163+
indices = shards.stream().map(Shard::getOriginalIndices).flatMap(Arrays::stream).distinct()
164+
.toArray(String[]::new);
165+
}
166+
167+
@Override
168+
public void writeTo(StreamOutput out) throws IOException {
169+
super.writeTo(out);
170+
out.writeOptionalWriteable(source);
171+
indicesOptions.writeIndicesOptions(out);
172+
out.writeByte(searchType.id());
173+
out.writeStringArray(types);
174+
out.writeOptionalWriteable(scroll);
175+
out.writeOptionalBoolean(requestCache);
176+
out.writeBoolean(allowPartialSearchResults);
177+
out.writeVInt(numberOfShards);
178+
out.writeVLong(nowInMillis);
179+
out.writeOptionalString(clusterAlias);
180+
out.writeTimeValue(waitForCheckpointsTimeout);
181+
out.writeList(shards);
182+
}
183+
184+
public List<Shard> getShardLevelRequests() {
185+
return shards;
186+
}
187+
188+
public List<ShardSearchRequest> createShardSearchRequests() {
189+
return shards.stream().map(this::createShardSearchRequest).collect(Collectors.toList());
190+
}
191+
192+
public ShardSearchRequest createShardSearchRequest(Shard r) {
193+
ShardSearchRequest shardSearchRequest = new ShardSearchRequest(
194+
new OriginalIndices(r.indices, indicesOptions), r.shardId, r.shardRequestIndex, numberOfShards, searchType,
195+
source, types, requestCache, r.aliasFilter, r.indexBoost, allowPartialSearchResults, scroll,
196+
nowInMillis, clusterAlias, r.readerId, r.keepAlive, r.waitForCheckpoint, waitForCheckpointsTimeout
197+
);
198+
shardSearchRequest.setParentTask(getParentTask());
199+
return shardSearchRequest;
200+
}
201+
202+
@Override
203+
public String[] indices() {
204+
return indices;
205+
}
206+
207+
@Override
208+
public IndicesOptions indicesOptions() {
209+
return indicesOptions;
210+
}
211+
212+
@Override
213+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
214+
return new SearchShardTask(id, type, action, getDescription(), parentTaskId, headers);
215+
}
216+
217+
@Override
218+
public String getDescription() {
219+
// Shard id is enough here, the request itself can be found by looking at the parent task description
220+
return "shardIds[" + shards.stream().map(slr -> slr.shardId).collect(Collectors.toList()) + "]";
221+
}
222+
223+
}

0 commit comments

Comments
 (0)