Skip to content

Commit 575250e

Browse files
committed
handle reduce phase failures when searching (raise exception to listener)
1 parent 3d9b310 commit 575250e

6 files changed

+100
-18
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to Elastic Search and Shay Banon under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. Elastic Search licenses this
6+
* file to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.action.search;
21+
22+
import org.elasticsearch.ElasticSearchException;
23+
24+
/**
25+
* @author kimchy (shay.banon)
26+
*/
27+
public class ReduceSearchPhaseException extends ElasticSearchException {
28+
29+
public ReduceSearchPhaseException(String phaseName, String msg) {
30+
super("Failed to reduce [" + phaseName + "] " + msg);
31+
}
32+
33+
public ReduceSearchPhaseException(String phaseName, String msg, Throwable cause) {
34+
super("Failed to reduce [" + phaseName + "]" + msg, cause);
35+
}
36+
}

modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,7 @@
2121

2222
import com.google.inject.Inject;
2323
import org.elasticsearch.action.ActionListener;
24-
import org.elasticsearch.action.search.SearchOperationThreading;
25-
import org.elasticsearch.action.search.SearchRequest;
26-
import org.elasticsearch.action.search.SearchResponse;
27-
import org.elasticsearch.action.search.ShardSearchFailure;
24+
import org.elasticsearch.action.search.*;
2825
import org.elasticsearch.cluster.ClusterService;
2926
import org.elasticsearch.cluster.node.Node;
3027
import org.elasticsearch.cluster.routing.ShardRouting;
@@ -73,6 +70,10 @@ private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listen
7370
super(request, listener);
7471
}
7572

73+
@Override protected String firstPhaseName() {
74+
return "dfs";
75+
}
76+
7677
@Override protected void sendExecuteFirstPhase(Node node, InternalSearchRequest request, SearchServiceListener<DfsSearchResult> listener) {
7778
searchService.sendExecuteDfs(node, request, listener);
7879
}
@@ -154,6 +155,14 @@ private void executeSecondPhase(final AtomicInteger counter, Node node, QuerySea
154155
}
155156

156157
private void finishHim() {
158+
try {
159+
innerFinishHim();
160+
} catch (Exception e) {
161+
listener.onFailure(new ReduceSearchPhaseException("query_fetch", "", e));
162+
}
163+
}
164+
165+
private void innerFinishHim() {
157166
sortedShardList = searchPhaseController.sortDocs(queryFetchResults.values());
158167
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryFetchResults, queryFetchResults);
159168
String scrollIdX = null;

modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,7 @@
2121

2222
import com.google.inject.Inject;
2323
import org.elasticsearch.action.ActionListener;
24-
import org.elasticsearch.action.search.SearchOperationThreading;
25-
import org.elasticsearch.action.search.SearchRequest;
26-
import org.elasticsearch.action.search.SearchResponse;
27-
import org.elasticsearch.action.search.ShardSearchFailure;
24+
import org.elasticsearch.action.search.*;
2825
import org.elasticsearch.cluster.ClusterService;
2926
import org.elasticsearch.cluster.node.Node;
3027
import org.elasticsearch.cluster.routing.ShardRouting;
@@ -77,6 +74,10 @@ private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listen
7774
super(request, listener);
7875
}
7976

77+
@Override protected String firstPhaseName() {
78+
return "dfs";
79+
}
80+
8081
@Override protected void sendExecuteFirstPhase(Node node, InternalSearchRequest request, SearchServiceListener<DfsSearchResult> listener) {
8182
searchService.sendExecuteDfs(node, request, listener);
8283
}
@@ -160,6 +161,14 @@ private void executeQuery(final AtomicInteger counter, QuerySearchRequest queryS
160161
}
161162

162163
private void executeFetchPhase() {
164+
try {
165+
innerExecuteFetchPhase();
166+
} catch (Exception e) {
167+
listener.onFailure(new ReduceSearchPhaseException("query", "", e));
168+
}
169+
}
170+
171+
private void innerExecuteFetchPhase() {
163172
sortedShardList = searchPhaseController.sortDocs(queryResults.values());
164173
final Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad = searchPhaseController.docIdsToLoad(sortedShardList);
165174

@@ -236,6 +245,14 @@ private void executeFetch(final AtomicInteger counter, FetchSearchRequest fetchS
236245
}
237246

238247
private void finishHim() {
248+
try {
249+
innerFinishHim();
250+
} catch (Exception e) {
251+
listener.onFailure(new ReduceSearchPhaseException("fetch", "", e));
252+
}
253+
}
254+
255+
private void innerFinishHim() {
239256
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
240257
String scrollIdX = null;
241258
if (request.scroll() != null) {

modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listen
6464
super(request, listener);
6565
}
6666

67+
@Override protected String firstPhaseName() {
68+
return "query_fetch";
69+
}
70+
6771
@Override protected void sendExecuteFirstPhase(Node node, InternalSearchRequest request, SearchServiceListener<QueryFetchSearchResult> listener) {
6872
searchService.sendExecuteFetch(node, request, listener);
6973
}

modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,7 @@
2121

2222
import com.google.inject.Inject;
2323
import org.elasticsearch.action.ActionListener;
24-
import org.elasticsearch.action.search.SearchOperationThreading;
25-
import org.elasticsearch.action.search.SearchRequest;
26-
import org.elasticsearch.action.search.SearchResponse;
27-
import org.elasticsearch.action.search.ShardSearchFailure;
24+
import org.elasticsearch.action.search.*;
2825
import org.elasticsearch.cluster.ClusterService;
2926
import org.elasticsearch.cluster.node.Node;
3027
import org.elasticsearch.cluster.routing.ShardRouting;
@@ -71,6 +68,10 @@ private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listen
7168
super(request, listener);
7269
}
7370

71+
@Override protected String firstPhaseName() {
72+
return "query";
73+
}
74+
7475
@Override protected void sendExecuteFirstPhase(Node node, InternalSearchRequest request, SearchServiceListener<QuerySearchResult> listener) {
7576
searchService.sendExecuteQuery(node, request, listener);
7677
}
@@ -157,6 +158,14 @@ private void executeFetch(final AtomicInteger counter, FetchSearchRequest fetchS
157158
}
158159

159160
private void finishHim() {
161+
try {
162+
innerFinishHim();
163+
} catch (Exception e) {
164+
listener.onFailure(new ReduceSearchPhaseException("fetch", "", e));
165+
}
166+
}
167+
168+
private void innerFinishHim() {
160169
InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, queryResults, fetchResults);
161170
String scrollId = null;
162171
if (request.scroll() != null) {

modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,7 @@
2020
package org.elasticsearch.action.search.type;
2121

2222
import org.elasticsearch.action.ActionListener;
23-
import org.elasticsearch.action.search.SearchOperationThreading;
24-
import org.elasticsearch.action.search.SearchRequest;
25-
import org.elasticsearch.action.search.SearchResponse;
26-
import org.elasticsearch.action.search.ShardSearchFailure;
23+
import org.elasticsearch.action.search.*;
2724
import org.elasticsearch.action.support.BaseAction;
2825
import org.elasticsearch.cluster.ClusterService;
2926
import org.elasticsearch.cluster.ClusterState;
@@ -194,7 +191,11 @@ private void onFirstPhaseResult(ShardRouting shard, FirstResult result, Iterator
194191
}
195192
if (successulOps.incrementAndGet() == expectedSuccessfulOps ||
196193
totalOps.incrementAndGet() == expectedTotalOps) {
197-
moveToSecondPhase();
194+
try {
195+
moveToSecondPhase();
196+
} catch (Exception e) {
197+
listener.onFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e));
198+
}
198199
}
199200
}
200201

@@ -207,7 +208,11 @@ private void onFirstPhaseResult(ShardRouting shard, final Iterator<ShardRouting>
207208
if (totalOps.incrementAndGet() == expectedTotalOps) {
208209
// no more shards, add a failure
209210
shardFailures.add(new ShardSearchFailure(t));
210-
moveToSecondPhase();
211+
try {
212+
moveToSecondPhase();
213+
} catch (Exception e) {
214+
listener.onFailure(new ReduceSearchPhaseException(firstPhaseName(), "", e));
215+
}
211216
} else {
212217
if (shardIt.hasNext()) {
213218
performFirstPhase(shardIt);
@@ -230,5 +235,7 @@ protected ShardSearchFailure[] buildShardFailures() {
230235
protected abstract void processFirstPhaseResult(ShardRouting shard, FirstResult result);
231236

232237
protected abstract void moveToSecondPhase();
238+
239+
protected abstract String firstPhaseName();
233240
}
234241
}

0 commit comments

Comments
 (0)