Skip to content

Commit 03322ea

Browse files
authored
SearchRequest#allowPartialSearchResults does not handle successful retries (#43095)
When set to false, allowPartialSearchResults option does not check if the shard failures have been reseted to null. The atomic array, that is used to record shard failures, is filled with a null value if a successful request on a shard happens after a failure on a shard of another replica. In this case the atomic array is not empty but contains only null values so this shouldn't be considered as a failure since all shards are successful (some replicas have failed but the retries on another replica succeeded). This change fixes this bug by checking the content of the atomic array and fails the request only if allowPartialSearchResults is set to false and at least one shard failure is not null. Closes #40743
1 parent 4937471 commit 03322ea

File tree

3 files changed

+120
-18
lines changed

3 files changed

+120
-18
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -140,24 +140,29 @@ public final void executeNextPhase(SearchPhase currentPhase, SearchPhase nextPha
140140
} else {
141141
Boolean allowPartialResults = request.allowPartialSearchResults();
142142
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
143-
if (allowPartialResults == false && shardFailures.get() != null ){
144-
if (logger.isDebugEnabled()) {
145-
final ShardOperationFailedException[] shardSearchFailures = ExceptionsHelper.groupBy(buildShardFailures());
146-
Throwable cause = shardSearchFailures.length == 0 ? null :
147-
ElasticsearchException.guessRootCauses(shardSearchFailures[0].getCause())[0];
148-
logger.debug(() -> new ParameterizedMessage("{} shards failed for phase: [{}]",
149-
shardSearchFailures.length, getName()), cause);
150-
}
151-
onPhaseFailure(currentPhase, "Partial shards failure", null);
152-
} else {
153-
if (logger.isTraceEnabled()) {
154-
final String resultsFrom = results.getSuccessfulResults()
155-
.map(r -> r.getSearchShardTarget().toString()).collect(Collectors.joining(","));
156-
logger.trace("[{}] Moving to next phase: [{}], based on results from: {} (cluster state version: {})",
157-
currentPhase.getName(), nextPhase.getName(), resultsFrom, clusterStateVersion);
143+
if (allowPartialResults == false && shardFailures.get() != null) {
144+
// check if there are actual failures in the atomic array since
145+
// successful retries can reset the failures to null
146+
ShardOperationFailedException[] shardSearchFailures = buildShardFailures();
147+
if (shardSearchFailures.length > 0) {
148+
if (logger.isDebugEnabled()) {
149+
int numShardFailures = shardSearchFailures.length;
150+
shardSearchFailures = ExceptionsHelper.groupBy(shardSearchFailures);
151+
Throwable cause = ElasticsearchException.guessRootCauses(shardSearchFailures[0].getCause())[0];
152+
logger.debug(() -> new ParameterizedMessage("{} shards failed for phase: [{}]",
153+
numShardFailures, getName()), cause);
154+
}
155+
onPhaseFailure(currentPhase, "Partial shards failure", null);
156+
return;
158157
}
159-
executePhase(nextPhase);
160158
}
159+
if (logger.isTraceEnabled()) {
160+
final String resultsFrom = results.getSuccessfulResults()
161+
.map(r -> r.getSearchShardTarget().toString()).collect(Collectors.joining(","));
162+
logger.trace("[{}] Moving to next phase: [{}], based on results from: {} (cluster state version: {})",
163+
currentPhase.getName(), nextPhase.getName(), resultsFrom, clusterStateVersion);
164+
}
165+
executePhase(nextPhase);
161166
}
162167
}
163168

server/src/main/java/org/elasticsearch/action/search/InitialSearchPhase.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,13 +139,13 @@ public final void run() {
139139
for (int index = 0; index < shardsIts.size(); index++) {
140140
final SearchShardIterator shardRoutings = shardsIts.get(index);
141141
if (shardRoutings.size() == 0) {
142-
if(missingShards.length() >0 ){
142+
if(missingShards.length() > 0){
143143
missingShards.append(", ");
144144
}
145145
missingShards.append(shardRoutings.shardId());
146146
}
147147
}
148-
if (missingShards.length() >0) {
148+
if (missingShards.length() > 0) {
149149
//Status red - shard is missing all copies and would produce partial results for an index search
150150
final String msg = "Search rejected due to missing shards ["+ missingShards +
151151
"]. Consider using `allow_partial_search_results` setting to bypass this error.";

server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656

5757
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentMap;
5858
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.newConcurrentSet;
59+
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
5960

6061
public class SearchAsyncActionTests extends ESTestCase {
6162

@@ -371,6 +372,102 @@ protected void executeNext(Runnable runnable, Thread originalThread) {
371372
executor.shutdown();
372373
}
373374

375+
public void testAllowPartialResults() throws InterruptedException {
376+
SearchRequest request = new SearchRequest();
377+
request.allowPartialSearchResults(false);
378+
int numConcurrent = randomIntBetween(1, 5);
379+
request.setMaxConcurrentShardRequests(numConcurrent);
380+
int numShards = randomIntBetween(5, 10);
381+
AtomicBoolean searchPhaseDidRun = new AtomicBoolean(false);
382+
ActionListener<SearchResponse> responseListener = ActionListener.wrap(response -> {},
383+
(e) -> { throw new AssertionError("unexpected", e);} );
384+
DiscoveryNode primaryNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT);
385+
// for the sake of this test we place the replica on the same node. ie. this is not a mistake since we limit per node now
386+
DiscoveryNode replicaNode = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT);
387+
388+
AtomicInteger contextIdGenerator = new AtomicInteger(0);
389+
GroupShardsIterator<SearchShardIterator> shardsIter = getShardsIter("idx",
390+
new OriginalIndices(new String[]{"idx"}, SearchRequest.DEFAULT_INDICES_OPTIONS),
391+
numShards, true, primaryNode, replicaNode);
392+
int numShardAttempts = 0;
393+
for (SearchShardIterator it : shardsIter) {
394+
numShardAttempts += it.remaining();
395+
}
396+
CountDownLatch latch = new CountDownLatch(numShardAttempts);
397+
398+
SearchTransportService transportService = new SearchTransportService(null, null);
399+
Map<String, Transport.Connection> lookup = new HashMap<>();
400+
Map<ShardId, Boolean> seenShard = new ConcurrentHashMap<>();
401+
lookup.put(primaryNode.getId(), new MockConnection(primaryNode));
402+
lookup.put(replicaNode.getId(), new MockConnection(replicaNode));
403+
Map<String, AliasFilter> aliasFilters = Collections.singletonMap("_na_", new AliasFilter(null, Strings.EMPTY_ARRAY));
404+
AtomicInteger numRequests = new AtomicInteger(0);
405+
AtomicInteger numFailReplicas = new AtomicInteger(0);
406+
AbstractSearchAsyncAction<TestSearchPhaseResult> asyncAction =
407+
new AbstractSearchAsyncAction<>(
408+
"test",
409+
logger,
410+
transportService,
411+
(cluster, node) -> {
412+
assert cluster == null : "cluster was not null: " + cluster;
413+
return lookup.get(node); },
414+
aliasFilters,
415+
Collections.emptyMap(),
416+
Collections.emptyMap(),
417+
null,
418+
request,
419+
responseListener,
420+
shardsIter,
421+
new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0),
422+
0,
423+
null,
424+
new InitialSearchPhase.ArraySearchPhaseResults<>(shardsIter.size()),
425+
request.getMaxConcurrentShardRequests(),
426+
SearchResponse.Clusters.EMPTY) {
427+
428+
@Override
429+
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
430+
SearchActionListener<TestSearchPhaseResult> listener) {
431+
seenShard.computeIfAbsent(shard.shardId(), (i) -> {
432+
numRequests.incrementAndGet(); // only count this once per shard copy
433+
return Boolean.TRUE;
434+
});
435+
new Thread(() -> {
436+
Transport.Connection connection = getConnection(null, shard.currentNodeId());
437+
TestSearchPhaseResult testSearchPhaseResult = new TestSearchPhaseResult(contextIdGenerator.incrementAndGet(),
438+
connection.getNode());
439+
if (shardIt.remaining() > 0) {
440+
numFailReplicas.incrementAndGet();
441+
listener.onFailure(new RuntimeException());
442+
} else {
443+
listener.onResponse(testSearchPhaseResult);
444+
}
445+
}).start();
446+
}
447+
448+
@Override
449+
protected SearchPhase getNextPhase(SearchPhaseResults<TestSearchPhaseResult> results, SearchPhaseContext context) {
450+
return new SearchPhase("test") {
451+
@Override
452+
public void run() {
453+
assertTrue(searchPhaseDidRun.compareAndSet(false, true));
454+
}
455+
};
456+
}
457+
458+
@Override
459+
protected void executeNext(Runnable runnable, Thread originalThread) {
460+
super.executeNext(runnable, originalThread);
461+
latch.countDown();
462+
}
463+
};
464+
asyncAction.start();
465+
latch.await();
466+
assertTrue(searchPhaseDidRun.get());
467+
assertEquals(numShards, numRequests.get());
468+
assertThat(numFailReplicas.get(), greaterThanOrEqualTo(1));
469+
}
470+
374471
static GroupShardsIterator<SearchShardIterator> getShardsIter(String index, OriginalIndices originalIndices, int numShards,
375472
boolean doReplicas, DiscoveryNode primaryNode, DiscoveryNode replicaNode) {
376473
ArrayList<SearchShardIterator> list = new ArrayList<>();

0 commit comments

Comments
 (0)