Skip to content

Commit 7fa51b3

Browse files
committed
Let search phases override max concurrent requests
If the query coordinating node is also a data node that holds all the shards for a search request, we can end up recursing through the can match phase (because we send a local request and on response in the listener move to the next shard and do this again, without ever having returned from previous shards). This recursion can lead to stack overflow for even a reasonable number of indices (daily indices over a sixty days with five shards per day is enough to trigger the stack overflow). Moreover, all this execution would be happening on a network thread (the thread that initially received the query). With this commit, we allow search phases to override max concurrent requests. This allows the can match phase to avoid recursing through the shards towards a stack overflow. Relates #26484
1 parent 9188c9c commit 7fa51b3

9 files changed

+83
-18
lines changed

core/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ protected AbstractSearchAsyncAction(String name, Logger logger, SearchTransportS
7676
Executor executor, SearchRequest request,
7777
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
7878
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
79-
SearchTask task, SearchPhaseResults<Result> resultConsumer) {
80-
super(name, request, shardsIts, logger);
79+
SearchTask task, SearchPhaseResults<Result> resultConsumer, int maxConcurrentShardRequests) {
80+
super(name, request, shardsIts, logger, maxConcurrentShardRequests);
8181
this.timeProvider = timeProvider;
8282
this.logger = logger;
8383
this.searchTransportService = searchTransportService;

core/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,6 @@
2626
import org.elasticsearch.search.internal.AliasFilter;
2727
import org.elasticsearch.transport.Transport;
2828

29-
import java.util.ArrayList;
30-
import java.util.Collections;
31-
import java.util.Iterator;
32-
import java.util.List;
3329
import java.util.Map;
3430
import java.util.concurrent.Executor;
3531
import java.util.function.BiFunction;
@@ -55,9 +51,12 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<Searc
5551
ActionListener<SearchResponse> listener, GroupShardsIterator<SearchShardIterator> shardsIts,
5652
TransportSearchAction.SearchTimeProvider timeProvider, long clusterStateVersion,
5753
SearchTask task, Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory) {
54+
/*
55+
* We set max concurrent shard requests to the number of shards to otherwise avoid deep recursing that would occur if the local node
56+
* is the coordinating node for the query, holds all the shards for the request, and there are a lot of shards.
57+
*/
5858
super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request,
59-
listener,
60-
shardsIts, timeProvider, clusterStateVersion, task, new BitSetSearchPhaseResults(shardsIts.size()));
59+
listener, shardsIts, timeProvider, clusterStateVersion, task, new BitSetSearchPhaseResults(shardsIts.size()), shardsIts.size());
6160
this.phaseFactory = phaseFactory;
6261
this.shardsIts = shardsIts;
6362
}

core/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
5252
private final AtomicInteger shardExecutionIndex = new AtomicInteger(0);
5353
private final int maxConcurrentShardRequests;
5454

55-
InitialSearchPhase(String name, SearchRequest request, GroupShardsIterator<SearchShardIterator> shardsIts, Logger logger) {
55+
InitialSearchPhase(String name, SearchRequest request, GroupShardsIterator<SearchShardIterator> shardsIts, Logger logger,
56+
int maxConcurrentShardRequests) {
5657
super(name);
5758
this.request = request;
5859
this.shardsIts = shardsIts;
@@ -62,7 +63,7 @@ abstract class InitialSearchPhase<FirstResult extends SearchPhaseResult> extends
6263
// on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result
6364
// we process hence we add one for the non active partition here.
6465
this.expectedTotalOps = shardsIts.totalSizeWith1ForEmpty();
65-
maxConcurrentShardRequests = Math.min(request.getMaxConcurrentShardRequests(), shardsIts.size());
66+
this.maxConcurrentShardRequests = Math.min(maxConcurrentShardRequests, shardsIts.size());
6667
}
6768

6869
private void onShardFailure(final int shardIndex, @Nullable ShardRouting shard, @Nullable String nodeId,

core/src/main/java/org/elasticsearch/action/search/SearchDfsQueryThenFetchAsyncAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
4242
final GroupShardsIterator<SearchShardIterator> shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider,
4343
final long clusterStateVersion, final SearchTask task) {
4444
super("dfs", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener,
45-
shardsIts, timeProvider, clusterStateVersion, task, new ArraySearchPhaseResults<>(shardsIts.size()));
45+
shardsIts, timeProvider, clusterStateVersion, task, new ArraySearchPhaseResults<>(shardsIts.size()),
46+
request.getMaxConcurrentShardRequests());
4647
this.searchPhaseController = searchPhaseController;
4748
}
4849

core/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ final class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<Se
4242
final GroupShardsIterator<SearchShardIterator> shardsIts, final TransportSearchAction.SearchTimeProvider timeProvider,
4343
long clusterStateVersion, SearchTask task) {
4444
super("query", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, executor, request, listener,
45-
shardsIts, timeProvider, clusterStateVersion, task, searchPhaseController.newSearchPhaseResults(request, shardsIts.size()));
45+
shardsIts, timeProvider, clusterStateVersion, task, searchPhaseController.newSearchPhaseResults(request, shardsIts.size()),
46+
request.getMaxConcurrentShardRequests());
4647
this.searchPhaseController = searchPhaseController;
4748
}
4849

core/src/main/java/org/elasticsearch/action/search/SearchTransportService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@
4747
import org.elasticsearch.tasks.Task;
4848
import org.elasticsearch.threadpool.ThreadPool;
4949
import org.elasticsearch.transport.RemoteClusterService;
50+
import org.elasticsearch.transport.TaskAwareTransportRequestHandler;
5051
import org.elasticsearch.transport.Transport;
5152
import org.elasticsearch.transport.TransportActionProxy;
52-
import org.elasticsearch.transport.TaskAwareTransportRequestHandler;
5353
import org.elasticsearch.transport.TransportChannel;
5454
import org.elasticsearch.transport.TransportRequest;
5555
import org.elasticsearch.transport.TransportRequestOptions;

core/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,12 @@ private AbstractSearchAsyncAction<SearchPhaseResult> createAction(
6060
System::nanoTime);
6161
}
6262

63+
final SearchRequest request = new SearchRequest();
6364
return new AbstractSearchAsyncAction<SearchPhaseResult>("test", null, null, null,
6465
Collections.singletonMap("foo", new AliasFilter(new MatchAllQueryBuilder())), Collections.singletonMap("foo", 2.0f), null,
65-
new SearchRequest(), null, new GroupShardsIterator<>(Collections.singletonList(
66+
request, null, new GroupShardsIterator<>(Collections.singletonList(
6667
new SearchShardIterator(null, null, Collections.emptyList(), null))), timeProvider, 0, null,
67-
new InitialSearchPhase.ArraySearchPhaseResults<>(10)) {
68+
new InitialSearchPhase.ArraySearchPhaseResults<>(10), request.getMaxConcurrentShardRequests()) {
6869
@Override
6970
protected SearchPhase getNextPhase(final SearchPhaseResults<SearchPhaseResult> results, final SearchPhaseContext context) {
7071
return null;

core/src/test/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhaseTests.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
import java.util.concurrent.CountDownLatch;
4343
import java.util.concurrent.atomic.AtomicReference;
4444

45+
import static org.elasticsearch.cluster.metadata.TemplateUpgradeServiceTests.buildNewFakeTransportAddress;
46+
4547
public class CanMatchPreFilterSearchPhaseTests extends ESTestCase {
4648

4749

@@ -172,4 +174,61 @@ public void run() throws IOException {
172174
assertEquals(shard1, !result.get().get(0).skip());
173175
assertFalse(result.get().get(1).skip()); // never skip the failure
174176
}
177+
178+
/*
179+
* In cases that a query coordinating node held all the shards for a query, the can match phase would recurse and end in stack overflow
180+
* when subjected to max concurrent search requests. This test is a test for that situation.
181+
*/
182+
public void testLotsOfShards() throws InterruptedException {
183+
final TransportSearchAction.SearchTimeProvider timeProvider =
184+
new TransportSearchAction.SearchTimeProvider(0, System.nanoTime(), System::nanoTime);
185+
186+
final Map<String, Transport.Connection> lookup = new ConcurrentHashMap<>();
187+
final DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT);
188+
final DiscoveryNode replicaNode = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT);
189+
lookup.put("node1", new SearchAsyncActionTests.MockConnection(primaryNode));
190+
lookup.put("node2", new SearchAsyncActionTests.MockConnection(replicaNode));
191+
192+
final SearchTransportService searchTransportService =
193+
new SearchTransportService(Settings.builder().put("search.remote.connect", false).build(), null) {
194+
@Override
195+
public void sendCanMatch(
196+
Transport.Connection connection,
197+
ShardSearchTransportRequest request,
198+
SearchTask task,
199+
ActionListener<CanMatchResponse> listener) {
200+
listener.onResponse(new CanMatchResponse(randomBoolean()));
201+
}
202+
};
203+
204+
final AtomicReference<GroupShardsIterator<SearchShardIterator>> result = new AtomicReference<>();
205+
final CountDownLatch latch = new CountDownLatch(1);
206+
final OriginalIndices originalIndices = new OriginalIndices(new String[]{"idx"}, IndicesOptions.strictExpandOpenAndForbidClosed());
207+
final GroupShardsIterator<SearchShardIterator> shardsIter =
208+
SearchAsyncActionTests.getShardsIter("idx", originalIndices, 2048, randomBoolean(), primaryNode, replicaNode);
209+
final CanMatchPreFilterSearchPhase canMatchPhase = new CanMatchPreFilterSearchPhase(
210+
logger,
211+
searchTransportService,
212+
(clusterAlias, node) -> lookup.get(node),
213+
Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY)),
214+
Collections.emptyMap(),
215+
EsExecutors.newDirectExecutorService(),
216+
new SearchRequest(),
217+
null,
218+
shardsIter,
219+
timeProvider,
220+
0,
221+
null,
222+
(iter) -> new SearchPhase("test") {
223+
@Override
224+
public void run() throws IOException {
225+
result.set(iter);
226+
latch.countDown();
227+
}});
228+
229+
canMatchPhase.start();
230+
latch.await();
231+
232+
}
233+
175234
}

core/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,8 @@ public void onFailure(Exception e) {
111111
new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0),
112112
0,
113113
null,
114-
new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size())) {
114+
new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size()),
115+
request.getMaxConcurrentShardRequests()) {
115116

116117
@Override
117118
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
@@ -200,7 +201,8 @@ public void onFailure(Exception e) {
200201
new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0),
201202
0,
202203
null,
203-
new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size())) {
204+
new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size()),
205+
request.getMaxConcurrentShardRequests()) {
204206

205207
@Override
206208
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
@@ -301,7 +303,8 @@ public void sendFreeContext(Transport.Connection connection, long contextId, Ori
301303
new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0),
302304
0,
303305
null,
304-
new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size())) {
306+
new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size()),
307+
request.getMaxConcurrentShardRequests()) {
305308
TestSearchResponse response = new TestSearchResponse();
306309

307310
@Override

0 commit comments

Comments
 (0)