Skip to content

Commit 1e5ba7b

Browse files
authored
Handle failures with no explicit cause in async search (#58319)
This commit fixes an AOOBE in the handling of fatal failures in _async_search. If the underlying cause is not found, this change uses the root failure. Closes #58311
1 parent 8164489 commit 1e5ba7b

File tree

5 files changed

+157
-2
lines changed

5 files changed

+157
-2
lines changed

x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java

+18
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66

77
package org.elasticsearch.xpack.search;
88

9+
import org.apache.lucene.store.AlreadyClosedException;
910
import org.elasticsearch.ExceptionsHelper;
1011
import org.elasticsearch.action.index.IndexRequestBuilder;
1112
import org.elasticsearch.common.settings.Settings;
1213
import org.elasticsearch.common.unit.TimeValue;
14+
import org.elasticsearch.rest.RestStatus;
1315
import org.elasticsearch.search.aggregations.AggregationBuilders;
1416
import org.elasticsearch.search.aggregations.bucket.terms.InternalTerms;
1517
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
@@ -398,4 +400,20 @@ public void testRemoveAsyncIndex() throws Exception {
398400
ensureTaskNotRunning(newResp.getId());
399401
ensureTaskRemoval(newResp.getId());
400402
}
403+
404+
public void testSearchPhaseFailureNoCause() throws Exception {
405+
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(indexName);
406+
request.setKeepOnCompletion(true);
407+
request.setWaitForCompletionTimeout(TimeValue.timeValueMinutes(10));
408+
request.getSearchRequest().allowPartialSearchResults(false);
409+
request.getSearchRequest()
410+
// AlreadyClosedException are ignored by the coordinating node
411+
.source(new SearchSourceBuilder().query(new ThrowingQueryBuilder(randomLong(), new AlreadyClosedException("boom"), 0)));
412+
AsyncSearchResponse response = submitAsyncSearch(request);
413+
assertFalse(response.isRunning());
414+
assertTrue(response.isPartial());
415+
assertThat(response.status(), equalTo(RestStatus.INTERNAL_SERVER_ERROR));
416+
assertNotNull(response.getFailure());
417+
ensureTaskNotRunning(response.getId());
418+
}
401419
}

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java

+11-1
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,17 @@ synchronized void updateWithFailure(Exception exc) {
130130
//note that when search fails, we may have gotten partial results before the failure. In that case async
131131
// search will return an error plus the last partial results that were collected.
132132
this.isPartial = true;
133-
this.failure = ElasticsearchException.guessRootCauses(exc)[0];
133+
ElasticsearchException[] rootCauses = ElasticsearchException.guessRootCauses(exc);
134+
if (rootCauses == null || rootCauses.length == 0) {
135+
this.failure = new ElasticsearchException(exc.getMessage(), exc) {
136+
@Override
137+
protected String getExceptionName() {
138+
return getExceptionName(getCause());
139+
}
140+
};
141+
} else {
142+
this.failure = rootCauses[0];
143+
}
134144
this.frozen = true;
135145
}
136146

x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,12 @@ public SearchTestPlugin() {}
6464

6565
@Override
6666
public List<QuerySpec<?>> getQueries() {
67-
return Collections.singletonList(new QuerySpec<>(BlockingQueryBuilder.NAME, in -> new BlockingQueryBuilder(in),
67+
return Arrays.asList(
68+
new QuerySpec<>(BlockingQueryBuilder.NAME, in -> new BlockingQueryBuilder(in),
69+
p -> {
70+
throw new IllegalStateException("not implemented");
71+
}),
72+
new QuerySpec<>(ThrowingQueryBuilder.NAME, in -> new ThrowingQueryBuilder(in),
6873
p -> {
6974
throw new IllegalStateException("not implemented");
7075
}));

x-pack/plugin/async-search/src/test/java/org/elasticsearch/xpack/search/AsyncSearchTaskTests.java

+21
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.apache.lucene.search.TotalHits;
99
import org.elasticsearch.action.ActionListener;
1010
import org.elasticsearch.action.OriginalIndices;
11+
import org.elasticsearch.action.search.SearchPhaseExecutionException;
1112
import org.elasticsearch.action.search.SearchResponse;
1213
import org.elasticsearch.action.search.SearchShard;
1314
import org.elasticsearch.action.search.ShardSearchFailure;
@@ -216,6 +217,26 @@ public void testFatalFailureDuringFetch() throws InterruptedException {
216217
assertCompletionListeners(task, totalShards, totalShards, numSkippedShards, numShards, true);
217218
}
218219

220+
public void testFatalFailureWithNoCause() throws InterruptedException {
221+
AsyncSearchTask task = createAsyncSearchTask();
222+
AsyncSearchTask.Listener listener = task.getSearchProgressActionListener();
223+
int numShards = randomIntBetween(0, 10);
224+
List<SearchShard> shards = new ArrayList<>();
225+
for (int i = 0; i < numShards; i++) {
226+
shards.add(new SearchShard(null, new ShardId("0", "0", 1)));
227+
}
228+
List<SearchShard> skippedShards = new ArrayList<>();
229+
int numSkippedShards = randomIntBetween(0, 10);
230+
for (int i = 0; i < numSkippedShards; i++) {
231+
skippedShards.add(new SearchShard(null, new ShardId("0", "0", 1)));
232+
}
233+
int totalShards = numShards + numSkippedShards;
234+
task.getSearchProgressActionListener().onListShards(shards, skippedShards, SearchResponse.Clusters.EMPTY, false);
235+
236+
listener.onFailure(new SearchPhaseExecutionException("fetch", "boum", ShardSearchFailure.EMPTY_ARRAY));
237+
assertCompletionListeners(task, totalShards, 0, numSkippedShards, 0, true);
238+
}
239+
219240
private static SearchResponse newSearchResponse(int totalShards, int successfulShards, int skippedShards,
220241
ShardSearchFailure... failures) {
221242
InternalSearchResponse response = new InternalSearchResponse(SearchHits.empty(),
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.search;
7+
8+
import org.apache.lucene.search.IndexSearcher;
9+
import org.apache.lucene.search.Query;
10+
import org.apache.lucene.search.ScoreMode;
11+
import org.apache.lucene.search.Weight;
12+
import org.elasticsearch.common.io.stream.StreamInput;
13+
import org.elasticsearch.common.io.stream.StreamOutput;
14+
import org.elasticsearch.common.lucene.search.Queries;
15+
import org.elasticsearch.common.xcontent.XContentBuilder;
16+
import org.elasticsearch.index.query.AbstractQueryBuilder;
17+
import org.elasticsearch.index.query.QueryShardContext;
18+
19+
import java.io.IOException;
20+
21+
class ThrowingQueryBuilder extends AbstractQueryBuilder<ThrowingQueryBuilder> {
22+
public static final String NAME = "throw";
23+
24+
private final long randomUID;
25+
private final RuntimeException failure;
26+
private final int shardId;
27+
28+
/**
29+
* Creates a {@link ThrowingQueryBuilder} with the provided <code>randomUID</code>.
30+
*/
31+
ThrowingQueryBuilder(long randomUID, RuntimeException failure, int shardId) {
32+
super();
33+
this.randomUID = randomUID;
34+
this.failure = failure;
35+
this.shardId = shardId;
36+
}
37+
38+
ThrowingQueryBuilder(StreamInput in) throws IOException {
39+
super(in);
40+
this.randomUID = in.readLong();
41+
this.failure = in.readException();
42+
this.shardId = in.readVInt();
43+
}
44+
45+
@Override
46+
protected void doWriteTo(StreamOutput out) throws IOException {
47+
out.writeLong(randomUID);
48+
out.writeException(failure);
49+
out.writeVInt(shardId);
50+
}
51+
52+
@Override
53+
protected void doXContent(XContentBuilder builder, Params params) throws IOException {
54+
builder.startObject(NAME);
55+
builder.endObject();
56+
}
57+
58+
@Override
59+
protected Query doToQuery(QueryShardContext context) {
60+
final Query delegate = Queries.newMatchAllQuery();
61+
return new Query() {
62+
@Override
63+
public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float boost) throws IOException {
64+
if (context.getShardId() == shardId) {
65+
throw failure;
66+
}
67+
return delegate.createWeight(searcher, scoreMode, boost);
68+
}
69+
70+
@Override
71+
public String toString(String field) {
72+
return delegate.toString(field);
73+
}
74+
75+
@Override
76+
public boolean equals(Object obj) {
77+
return false;
78+
}
79+
80+
@Override
81+
public int hashCode() {
82+
return 0;
83+
}
84+
};
85+
}
86+
87+
@Override
88+
protected boolean doEquals(ThrowingQueryBuilder other) {
89+
return false;
90+
}
91+
92+
@Override
93+
protected int doHashCode() {
94+
return 0;
95+
}
96+
97+
@Override
98+
public String getWriteableName() {
99+
return NAME;
100+
}
101+
}

0 commit comments

Comments
 (0)