Skip to content

Commit 2783432

Browse files
authored
Ensure that we don't call listener twice when detecting a partial failure in _search (elastic#47694)
This change fixes a bug that can occur when a shard failure is detected while we build the search response and accept partial failures in set to false. In this case we currently call onFailure on the provided listener but also continue the search as if the failure didn't occur. This can lead to a listener called twice, once with onFailure and once with onSuccess which is forbidden by design.
1 parent a8a7477 commit 2783432

File tree

3 files changed

+17
-14
lines changed

3 files changed

+17
-14
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

+12-9
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,7 @@ private void executePhase(SearchPhase phase) {
352352
}
353353
}
354354

355-
private ShardSearchFailure[] buildShardFailures() {
355+
ShardSearchFailure[] buildShardFailures() {
356356
AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();
357357
if (shardFailures == null) {
358358
return ShardSearchFailure.EMPTY_ARRAY;
@@ -510,20 +510,23 @@ public final SearchRequest getRequest() {
510510
return request;
511511
}
512512

513-
protected final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {
514-
ShardSearchFailure[] failures = buildShardFailures();
515-
Boolean allowPartialResults = request.allowPartialSearchResults();
516-
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
517-
if (allowPartialResults == false && failures.length > 0){
518-
raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));
519-
}
513+
protected final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse,
514+
String scrollId,
515+
ShardSearchFailure[] failures) {
520516
return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(),
521517
skippedOps.get(), buildTookInMillis(), failures, clusters);
522518
}
523519

524520
@Override
525521
public void sendSearchResponse(InternalSearchResponse internalSearchResponse, String scrollId) {
526-
listener.onResponse(buildSearchResponse(internalSearchResponse, scrollId));
522+
ShardSearchFailure[] failures = buildShardFailures();
523+
Boolean allowPartialResults = request.allowPartialSearchResults();
524+
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
525+
if (allowPartialResults == false && failures.length > 0){
526+
raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));
527+
} else {
528+
listener.onResponse(buildSearchResponse(internalSearchResponse, scrollId, failures));
529+
}
527530
}
528531

529532
@Override

server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ public void testBuildSearchResponse() {
163163
new ArraySearchPhaseResults<>(10), null, false, new AtomicLong());
164164
String scrollId = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10);
165165
InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty();
166-
SearchResponse searchResponse = action.buildSearchResponse(internalSearchResponse, scrollId);
166+
SearchResponse searchResponse = action.buildSearchResponse(internalSearchResponse, scrollId, action.buildShardFailures());
167167
assertEquals(scrollId, searchResponse.getScrollId());
168168
assertSame(searchResponse.getAggregations(), internalSearchResponse.aggregations());
169169
assertSame(searchResponse.getSuggest(), internalSearchResponse.suggest());
@@ -179,15 +179,15 @@ public void testBuildSearchResponseAllowPartialFailures() {
179179
new IllegalArgumentException());
180180
String scrollId = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10);
181181
InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty();
182-
SearchResponse searchResponse = action.buildSearchResponse(internalSearchResponse, scrollId);
182+
SearchResponse searchResponse = action.buildSearchResponse(internalSearchResponse, scrollId, action.buildShardFailures());
183183
assertEquals(scrollId, searchResponse.getScrollId());
184184
assertSame(searchResponse.getAggregations(), internalSearchResponse.aggregations());
185185
assertSame(searchResponse.getSuggest(), internalSearchResponse.suggest());
186186
assertSame(searchResponse.getProfileResults(), internalSearchResponse.profile());
187187
assertSame(searchResponse.getHits(), internalSearchResponse.hits());
188188
}
189189

190-
public void testBuildSearchResponseDisallowPartialFailures() {
190+
public void testSendSearchResponseDisallowPartialFailures() {
191191
SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(false);
192192
AtomicReference<Exception> exception = new AtomicReference<>();
193193
ActionListener<SearchResponse> listener = ActionListener.wrap(response -> fail("onResponse should not be called"), exception::set);
@@ -203,7 +203,7 @@ public void testBuildSearchResponseDisallowPartialFailures() {
203203
action.onShardFailure(i, new SearchShardTarget(failureNodeId, failureShardId, failureClusterAlias, OriginalIndices.NONE),
204204
new IllegalArgumentException());
205205
}
206-
action.buildSearchResponse(InternalSearchResponse.empty(), randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10));
206+
action.sendSearchResponse(InternalSearchResponse.empty(), randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10));
207207
assertThat(exception.get(), instanceOf(SearchPhaseExecutionException.class));
208208
SearchPhaseExecutionException searchPhaseExecutionException = (SearchPhaseExecutionException)exception.get();
209209
assertEquals(0, searchPhaseExecutionException.getSuppressed().length);

server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ protected void executeNext(Runnable runnable, Thread originalThread) {
148148
asyncAction.start();
149149
latch.await();
150150
assertTrue(searchPhaseDidRun.get());
151-
SearchResponse searchResponse = asyncAction.buildSearchResponse(null, null);
151+
SearchResponse searchResponse = asyncAction.buildSearchResponse(null, null, asyncAction.buildShardFailures());
152152
assertEquals(shardsIter.size() - numSkipped, numRequests.get());
153153
assertEquals(0, searchResponse.getFailedShards());
154154
assertEquals(numSkipped, searchResponse.getSkippedShards());

0 commit comments

Comments
 (0)