|
32 | 32 | import org.elasticsearch.action.support.TransportActions;
|
33 | 33 | import org.elasticsearch.cluster.ClusterState;
|
34 | 34 | import org.elasticsearch.cluster.routing.GroupShardsIterator;
|
35 |
| -import org.elasticsearch.common.Nullable; |
36 | 35 | import org.elasticsearch.common.lease.Releasable;
|
37 | 36 | import org.elasticsearch.common.lease.Releasables;
|
38 | 37 | import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
@@ -235,7 +234,9 @@ private void performPhaseOnShard(final int shardIndex, final SearchShardIterator
|
235 | 234 | * we can continue (cf. InitialSearchPhase#maybeFork).
|
236 | 235 | */
|
237 | 236 | if (shard == null) {
|
238 |
| - fork(() -> onShardFailure(shardIndex, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()))); |
| 237 | + SearchShardTarget unassignedShard = new SearchShardTarget(null, shardIt.shardId(), |
| 238 | + shardIt.getClusterAlias(), shardIt.getOriginalIndices()); |
| 239 | + fork(() -> onShardFailure(shardIndex, unassignedShard, shardIt, new NoShardAvailableActionException(shardIt.shardId()))); |
239 | 240 | } else {
|
240 | 241 | final PendingExecutions pendingExecutions = throttleConcurrentRequests ?
|
241 | 242 | pendingExecutionsPerNode.computeIfAbsent(shard.getNodeId(), n -> new PendingExecutions(maxConcurrentRequestsPerNode))
|
@@ -386,14 +387,13 @@ ShardSearchFailure[] buildShardFailures() {
|
386 | 387 | return failures;
|
387 | 388 | }
|
388 | 389 |
|
389 |
| - private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) { |
| 390 | + private void onShardFailure(final int shardIndex, SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) { |
390 | 391 | // we always add the shard failure for a specific shard instance
|
391 | 392 | // we do make sure to clean it on a successful response from a shard
|
392 | 393 | onShardFailure(shardIndex, shard, e);
|
393 | 394 | final SearchShardTarget nextShard = shardIt.nextOrNull();
|
394 | 395 | final boolean lastShard = nextShard == null;
|
395 |
| - logger.debug(() -> new ParameterizedMessage("{}: Failed to execute [{}] lastShard [{}]", |
396 |
| - shard != null ? shard : shardIt.shardId(), request, lastShard), e); |
| 396 | + logger.debug(() -> new ParameterizedMessage("{}: Failed to execute [{}] lastShard [{}]", shard, request, lastShard), e); |
397 | 397 | if (lastShard) {
|
398 | 398 | if (request.allowPartialSearchResults() == false) {
|
399 | 399 | if (requestCancelled.compareAndSet(false, true)) {
|
@@ -437,10 +437,15 @@ protected void onShardGroupFailure(int shardIndex, SearchShardTarget shardTarget
|
437 | 437 | * @param e the failure reason
|
438 | 438 | */
|
439 | 439 | @Override
|
440 |
| - public final void onShardFailure(final int shardIndex, @Nullable SearchShardTarget shardTarget, Exception e) { |
441 |
| - // we don't aggregate shard failures on non active shards and failures due to the internal cancellation, |
| 440 | + public final void onShardFailure(final int shardIndex, SearchShardTarget shardTarget, Exception e) { |
| 441 | + if (TransportActions.isShardNotAvailableException(e)) { |
| 442 | + // Groups shard not available exceptions under a generic exception that returns a SERVICE_UNAVAILABLE(503) |
| 443 | + // temporary error. |
| 444 | + e = new NoShardAvailableActionException(shardTarget.getShardId(), e.getMessage()); |
| 445 | + } |
| 446 | + // we don't aggregate shard on failures due to the internal cancellation, |
442 | 447 | // but do keep the header counts right
|
443 |
| - if (TransportActions.isShardNotAvailableException(e) == false && (requestCancelled.get() && isTaskCancelledException(e)) == false) { |
| 448 | + if ((requestCancelled.get() && isTaskCancelledException(e)) == false) { |
444 | 449 | AtomicArray<ShardSearchFailure> shardFailures = this.shardFailures.get();
|
445 | 450 | // lazily create shard failures, so we can early build the empty shard failure list in most cases (no failures)
|
446 | 451 | if (shardFailures == null) { // this is double checked locking but it's fine since SetOnce uses a volatile read internally
|
@@ -545,7 +550,11 @@ public final SearchRequest getRequest() {
|
545 | 550 |
|
546 | 551 | protected final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, ShardSearchFailure[] failures,
|
547 | 552 | String scrollId, String searchContextId) {
|
548 |
| - return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(), |
| 553 | + int numSuccess = successfulOps.get(); |
| 554 | + int numFailures = failures.length; |
| 555 | + assert numSuccess + numFailures == getNumShards() |
| 556 | + : "numSuccess(" + numSuccess + ") + numFailures(" + numFailures + ") != totalShards(" + getNumShards() + ")"; |
| 557 | + return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), numSuccess, |
549 | 558 | skippedOps.get(), buildTookInMillis(), failures, clusters, searchContextId);
|
550 | 559 | }
|
551 | 560 |
|
|
0 commit comments