Skip to content

Commit 4ec3a6d

Browse files
authored
Ensure either success or failure path for SearchOperationListener is called (#37467)
Today we have several implementations of executing SearchOperationListener in SearchService. While all of them seem to be safe at least on, the one that executes scroll searches can cause illegal execution of SearchOperationListener that can then in-turn trigger assertions in ShardSearchStats. This change adds a SearchOperationListenerExecutor that uses try-with blocks to ensure listeners are called in a safe way. Relates to #37185
1 parent 100537f commit 4ec3a6d

File tree

1 file changed

+95
-76
lines changed

1 file changed

+95
-76
lines changed

server/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 95 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.lucene.search.FieldDoc;
2525
import org.apache.lucene.search.TopDocs;
2626
import org.elasticsearch.ElasticsearchException;
27-
import org.elasticsearch.ExceptionsHelper;
2827
import org.elasticsearch.action.ActionListener;
2928
import org.elasticsearch.action.OriginalIndices;
3029
import org.elasticsearch.action.search.SearchTask;
@@ -329,7 +328,7 @@ private DfsSearchResult executeDfsPhase(ShardSearchRequest request, SearchTask t
329328
} catch (Exception e) {
330329
logger.trace("Dfs phase failed", e);
331330
processFailure(context, e);
332-
throw ExceptionsHelper.convertToRuntime(e);
331+
throw e;
333332
} finally {
334333
cleanContext(context);
335334
}
@@ -380,29 +379,24 @@ protected void doRun() {
380379
});
381380
}
382381

383-
private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws IOException {
382+
private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTask task) throws Exception {
384383
final SearchContext context = createAndPutContext(request);
385-
final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
386384
context.incRef();
387-
boolean queryPhaseSuccess = false;
388385
try {
389386
context.setTask(task);
390-
operationListener.onPreQueryPhase(context);
391-
long time = System.nanoTime();
392-
contextProcessing(context);
393-
394-
loadOrExecuteQueryPhase(request, context);
395-
396-
if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
397-
freeContext(context.id());
398-
} else {
399-
contextProcessedSuccessfully(context);
387+
final long afterQueryTime;
388+
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
389+
contextProcessing(context);
390+
loadOrExecuteQueryPhase(request, context);
391+
if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
392+
freeContext(context.id());
393+
} else {
394+
contextProcessedSuccessfully(context);
395+
}
396+
afterQueryTime = executor.success();
400397
}
401-
final long afterQueryTime = System.nanoTime();
402-
queryPhaseSuccess = true;
403-
operationListener.onQueryPhase(context, afterQueryTime - time);
404398
if (request.numberOfShards() == 1) {
405-
return executeFetchPhase(context, operationListener, afterQueryTime);
399+
return executeFetchPhase(context, afterQueryTime);
406400
}
407401
return context.queryResult();
408402
} catch (Exception e) {
@@ -411,56 +405,44 @@ private SearchPhaseResult executeQueryPhase(ShardSearchRequest request, SearchTa
411405
e = (e.getCause() == null || e.getCause() instanceof Exception) ?
412406
(Exception) e.getCause() : new ElasticsearchException(e.getCause());
413407
}
414-
if (!queryPhaseSuccess) {
415-
operationListener.onFailedQueryPhase(context);
416-
}
417408
logger.trace("Query phase failed", e);
418409
processFailure(context, e);
419-
throw ExceptionsHelper.convertToRuntime(e);
410+
throw e;
420411
} finally {
421412
cleanContext(context);
422413
}
423414
}
424415

425-
private QueryFetchSearchResult executeFetchPhase(SearchContext context, SearchOperationListener operationListener,
426-
long afterQueryTime) {
427-
operationListener.onPreFetchPhase(context);
428-
try {
416+
private QueryFetchSearchResult executeFetchPhase(SearchContext context, long afterQueryTime) {
417+
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, afterQueryTime)){
429418
shortcutDocIdsToLoad(context);
430419
fetchPhase.execute(context);
431420
if (fetchPhaseShouldFreeContext(context)) {
432421
freeContext(context.id());
433422
} else {
434423
contextProcessedSuccessfully(context);
435424
}
436-
} catch (Exception e) {
437-
operationListener.onFailedFetchPhase(context);
438-
throw ExceptionsHelper.convertToRuntime(e);
425+
executor.success();
439426
}
440-
operationListener.onFetchPhase(context, System.nanoTime() - afterQueryTime);
441427
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
442428
}
443429

444430
public void executeQueryPhase(InternalScrollSearchRequest request, SearchTask task, ActionListener<ScrollQuerySearchResult> listener) {
445431
runAsync(request.id(), () -> {
446432
final SearchContext context = findContext(request.id(), request);
447-
SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
448433
context.incRef();
449-
try {
434+
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
450435
context.setTask(task);
451-
operationListener.onPreQueryPhase(context);
452-
long time = System.nanoTime();
453436
contextProcessing(context);
454437
processScroll(request, context);
455438
queryPhase.execute(context);
456439
contextProcessedSuccessfully(context);
457-
operationListener.onQueryPhase(context, System.nanoTime() - time);
440+
executor.success();
458441
return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget());
459442
} catch (Exception e) {
460-
operationListener.onFailedQueryPhase(context);
461443
logger.trace("Query phase failed", e);
462444
processFailure(context, e);
463-
throw ExceptionsHelper.convertToRuntime(e);
445+
throw e;
464446
} finally {
465447
cleanContext(context);
466448
}
@@ -471,29 +453,23 @@ public void executeQueryPhase(QuerySearchRequest request, SearchTask task, Actio
471453
runAsync(request.id(), () -> {
472454
final SearchContext context = findContext(request.id(), request);
473455
context.setTask(task);
474-
IndexShard indexShard = context.indexShard();
475-
SearchOperationListener operationListener = indexShard.getSearchOperationListener();
476456
context.incRef();
477-
try {
457+
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)) {
478458
contextProcessing(context);
479459
context.searcher().setAggregatedDfs(request.dfs());
480-
481-
operationListener.onPreQueryPhase(context);
482-
long time = System.nanoTime();
483460
queryPhase.execute(context);
484461
if (context.queryResult().hasSearchContext() == false && context.scrollContext() == null) {
485462
// no hits, we can release the context since there will be no fetch phase
486463
freeContext(context.id());
487464
} else {
488465
contextProcessedSuccessfully(context);
489466
}
490-
operationListener.onQueryPhase(context, System.nanoTime() - time);
467+
executor.success();
491468
return context.queryResult();
492469
} catch (Exception e) {
493-
operationListener.onFailedQueryPhase(context);
494470
logger.trace("Query phase failed", e);
495471
processFailure(context, e);
496-
throw ExceptionsHelper.convertToRuntime(e);
472+
throw e;
497473
} finally {
498474
cleanContext(context);
499475
}
@@ -527,28 +503,19 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchTask ta
527503
ActionListener<ScrollQueryFetchSearchResult> listener) {
528504
runAsync(request.id(), () -> {
529505
final SearchContext context = findContext(request.id(), request);
506+
context.setTask(task);
530507
context.incRef();
531-
try {
532-
context.setTask(task);
508+
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context)){
533509
contextProcessing(context);
534-
SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
535510
processScroll(request, context);
536-
operationListener.onPreQueryPhase(context);
537-
final long time = System.nanoTime();
538-
try {
539-
queryPhase.execute(context);
540-
} catch (Exception e) {
541-
operationListener.onFailedQueryPhase(context);
542-
throw ExceptionsHelper.convertToRuntime(e);
543-
}
544-
long afterQueryTime = System.nanoTime();
545-
operationListener.onQueryPhase(context, afterQueryTime - time);
546-
QueryFetchSearchResult fetchSearchResult = executeFetchPhase(context, operationListener, afterQueryTime);
511+
queryPhase.execute(context);
512+
final long afterQueryTime = executor.success();
513+
QueryFetchSearchResult fetchSearchResult = executeFetchPhase(context, afterQueryTime);
547514
return new ScrollQueryFetchSearchResult(fetchSearchResult, context.shardTarget());
548515
} catch (Exception e) {
549516
logger.trace("Fetch phase failed", e);
550517
processFailure(context, e);
551-
throw ExceptionsHelper.convertToRuntime(e);
518+
throw e;
552519
} finally {
553520
cleanContext(context);
554521
}
@@ -558,7 +525,6 @@ public void executeFetchPhase(InternalScrollSearchRequest request, SearchTask ta
558525
public void executeFetchPhase(ShardFetchRequest request, SearchTask task, ActionListener<FetchSearchResult> listener) {
559526
runAsync(request.id(), () -> {
560527
final SearchContext context = findContext(request.id(), request);
561-
final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
562528
context.incRef();
563529
try {
564530
context.setTask(task);
@@ -567,21 +533,20 @@ public void executeFetchPhase(ShardFetchRequest request, SearchTask task, Action
567533
context.scrollContext().lastEmittedDoc = request.lastEmittedDoc();
568534
}
569535
context.docIdsToLoad(request.docIds(), 0, request.docIdsSize());
570-
operationListener.onPreFetchPhase(context);
571-
long time = System.nanoTime();
572-
fetchPhase.execute(context);
573-
if (fetchPhaseShouldFreeContext(context)) {
574-
freeContext(request.id());
575-
} else {
576-
contextProcessedSuccessfully(context);
536+
try (SearchOperationListenerExecutor executor = new SearchOperationListenerExecutor(context, true, System.nanoTime())) {
537+
fetchPhase.execute(context);
538+
if (fetchPhaseShouldFreeContext(context)) {
539+
freeContext(request.id());
540+
} else {
541+
contextProcessedSuccessfully(context);
542+
}
543+
executor.success();
577544
}
578-
operationListener.onFetchPhase(context, System.nanoTime() - time);
579545
return context.fetchResult();
580546
} catch (Exception e) {
581-
operationListener.onFailedFetchPhase(context);
582547
logger.trace("Fetch phase failed", e);
583548
processFailure(context, e);
584-
throw ExceptionsHelper.convertToRuntime(e);
549+
throw e;
585550
} finally {
586551
cleanContext(context);
587552
}
@@ -661,7 +626,7 @@ final SearchContext createContext(ShardSearchRequest request) throws IOException
661626
context.lowLevelCancellation(lowLevelCancellation);
662627
} catch (Exception e) {
663628
context.close();
664-
throw ExceptionsHelper.convertToRuntime(e);
629+
throw e;
665630
}
666631

667632
return context;
@@ -733,7 +698,7 @@ public void freeAllScrollContexts() {
733698
}
734699
}
735700

736-
private void contextScrollKeepAlive(SearchContext context, long keepAlive) throws IOException {
701+
private void contextScrollKeepAlive(SearchContext context, long keepAlive) {
737702
if (keepAlive > maxKeepAlive) {
738703
throw new IllegalArgumentException(
739704
"Keep alive for scroll (" + TimeValue.timeValueMillis(keepAlive) + ") is too large. " +
@@ -991,7 +956,7 @@ private void shortcutDocIdsToLoad(SearchContext context) {
991956
context.docIdsToLoad(docIdsToLoad, 0, docIdsToLoad.length);
992957
}
993958

994-
private void processScroll(InternalScrollSearchRequest request, SearchContext context) throws IOException {
959+
private void processScroll(InternalScrollSearchRequest request, SearchContext context) {
995960
// process scroll
996961
context.from(context.from() + context.size());
997962
context.scrollContext().scroll = request.scroll();
@@ -1147,4 +1112,58 @@ public boolean canMatch() {
11471112
return canMatch;
11481113
}
11491114
}
1115+
1116+
/**
1117+
* This helper class ensures we only execute either the success or the failure path for {@link SearchOperationListener}.
1118+
* This is crucial for some implementations like {@link org.elasticsearch.index.search.stats.ShardSearchStats}.
1119+
*/
1120+
private static final class SearchOperationListenerExecutor implements AutoCloseable {
1121+
private final SearchOperationListener listener;
1122+
private final SearchContext context;
1123+
private final long time;
1124+
private final boolean fetch;
1125+
private long afterQueryTime = -1;
1126+
private boolean closed = false;
1127+
1128+
SearchOperationListenerExecutor(SearchContext context) {
1129+
this(context, false, System.nanoTime());
1130+
}
1131+
1132+
SearchOperationListenerExecutor(SearchContext context, boolean fetch, long startTime) {
1133+
this.listener = context.indexShard().getSearchOperationListener();
1134+
this.context = context;
1135+
time = startTime;
1136+
this.fetch = fetch;
1137+
if (fetch) {
1138+
listener.onPreFetchPhase(context);
1139+
} else {
1140+
listener.onPreQueryPhase(context);
1141+
}
1142+
}
1143+
1144+
long success() {
1145+
return afterQueryTime = System.nanoTime();
1146+
}
1147+
1148+
@Override
1149+
public void close() {
1150+
assert closed == false : "already closed - while technically ok double closing is a likely a bug in this case";
1151+
if (closed == false) {
1152+
closed = true;
1153+
if (afterQueryTime != -1) {
1154+
if (fetch) {
1155+
listener.onFetchPhase(context, afterQueryTime - time);
1156+
} else {
1157+
listener.onQueryPhase(context, afterQueryTime - time);
1158+
}
1159+
} else {
1160+
if (fetch) {
1161+
listener.onFailedFetchPhase(context);
1162+
} else {
1163+
listener.onFailedQueryPhase(context);
1164+
}
1165+
}
1166+
}
1167+
}
1168+
}
11501169
}

0 commit comments

Comments
 (0)