Skip to content

Commit 38dc926

Browse files
jimczidnhatn
andauthored
Ensure validation of the reader context is executed first (#61831)
This change makes sure that reader context is validated (`SearchOperationListener#validateReaderContext) before any other operation and that it is correctly recycled or removed at the end of the operation. This commit also fixes a race condition bug that would allocate the security reader for scrolls more than once. Relates #61446 Co-authored-by: Nhat Nguyen <[email protected]>
1 parent 3df0810 commit 38dc926

File tree

10 files changed

+145
-129
lines changed

10 files changed

+145
-129
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -546,24 +546,27 @@ public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause)
546546
}
547547

548548
/**
549-
* This method should be called if a search phase failed to ensure all relevant search contexts and resources are released.
550-
* this method will also notify the listener and sends back a failure to the user.
549+
* This method should be called if a search phase failed to ensure all relevant reader contexts are released.
550+
* This method will also notify the listener and sends back a failure to the user.
551551
*
552552
* @param exception the exception explaining or causing the phase failure
553553
*/
554554
private void raisePhaseFailure(SearchPhaseExecutionException exception) {
555-
results.getSuccessfulResults().forEach((entry) -> {
556-
if (entry.getContextId() != null) {
557-
try {
558-
SearchShardTarget searchShardTarget = entry.getSearchShardTarget();
559-
Transport.Connection connection = getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
560-
sendReleaseSearchContext(entry.getContextId(), connection, searchShardTarget.getOriginalIndices());
561-
} catch (Exception inner) {
562-
inner.addSuppressed(exception);
563-
logger.trace("failed to release context", inner);
555+
// we don't release persistent readers (point in time).
556+
if (request.pointInTimeBuilder() == null) {
557+
results.getSuccessfulResults().forEach((entry) -> {
558+
if (entry.getContextId() != null) {
559+
try {
560+
SearchShardTarget searchShardTarget = entry.getSearchShardTarget();
561+
Transport.Connection connection = getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
562+
sendReleaseSearchContext(entry.getContextId(), connection, searchShardTarget.getOriginalIndices());
563+
} catch (Exception inner) {
564+
inner.addSuppressed(exception);
565+
logger.trace("failed to release context", inner);
566+
}
564567
}
565-
}
566-
});
568+
});
569+
}
567570
listener.onFailure(exception);
568571
}
569572

server/src/main/java/org/elasticsearch/action/search/DfsQueryPhase.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,13 @@ public void onFailure(Exception exception) {
9898
progressListener.notifyQueryFailure(shardIndex, searchShardTarget, exception);
9999
counter.onFailure(shardIndex, searchShardTarget, exception);
100100
} finally {
101-
// the query might not have been executed at all (for example because thread pool rejected
102-
// execution) and the search context that was created in dfs phase might not be released.
103-
// release it again to be in the safe side
104-
context.sendReleaseSearchContext(
105-
querySearchRequest.contextId(), connection, searchShardTarget.getOriginalIndices());
101+
if (context.getRequest().pointInTimeBuilder() == null) {
102+
// the query might not have been executed at all (for example because thread pool rejected
103+
// execution) and the search context that was created in dfs phase might not be released.
104+
// release it again to be in the safe side
105+
context.sendReleaseSearchContext(
106+
querySearchRequest.contextId(), connection, searchShardTarget.getOriginalIndices());
107+
}
106108
}
107109
}
108110
});

server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -206,11 +206,11 @@ public void onFailure(Exception e) {
206206
* Releases shard targets that are not used in the docsIdsToLoad.
207207
*/
208208
private void releaseIrrelevantSearchContext(QuerySearchResult queryResult) {
209-
// we only release search context that we did not fetch from if we are not scrolling
210-
// and if it has at lease one hit that didn't make it to the global topDocs
211-
if (context.getRequest().scroll() == null &&
212-
context.getRequest().pointInTimeBuilder() == null &&
213-
queryResult.hasSearchContext()) {
209+
// we only release search context that we did not fetch from, if we are not scrolling
210+
// or using a PIT and if it has at least one hit that didn't make it to the global topDocs
211+
if (queryResult.hasSearchContext()
212+
&& context.getRequest().scroll() == null
213+
&& context.getRequest().pointInTimeBuilder() == null) {
214214
try {
215215
SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget();
216216
Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());

server/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ default void onFreeScrollContext(ReaderContext readerContext) {}
113113
* @param readerContext The reader context used by this request.
114114
* @param transportRequest the request that is going to use the search context
115115
*/
116-
default void validateSearchContext(ReaderContext readerContext, TransportRequest transportRequest) {}
116+
default void validateReaderContext(ReaderContext readerContext, TransportRequest transportRequest) {}
117117

118118
/**
119119
* A Composite listener that multiplexes calls to each of the listeners methods.
@@ -238,11 +238,11 @@ public void onFreeScrollContext(ReaderContext readerContext) {
238238
}
239239

240240
@Override
241-
public void validateSearchContext(ReaderContext readerContext, TransportRequest request) {
241+
public void validateReaderContext(ReaderContext readerContext, TransportRequest request) {
242242
Exception exception = null;
243243
for (SearchOperationListener listener : listeners) {
244244
try {
245-
listener.validateSearchContext(readerContext, request);
245+
listener.validateReaderContext(readerContext, request);
246246
} catch (Exception e) {
247247
exception = ExceptionsHelper.useOrSuppress(exception, e);
248248
}

0 commit comments

Comments
 (0)