Skip to content

Commit 30aae50

Browse files
committed
Time Memory Leak: Search requests don't eagerly clean the search context, closes #153.
1 parent ceb0138 commit 30aae50

File tree

7 files changed

+39
-12
lines changed

7 files changed

+39
-12
lines changed

modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ private class AsyncAction extends BaseAsyncAction<DfsSearchResult> {
6969

7070
private final Map<SearchShardTarget, FetchSearchResult> fetchResults = searchCache.obtainFetchResults();
7171

72+
private volatile Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad;
7273

7374
private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
7475
super(request, listener);
@@ -169,9 +170,9 @@ private void executeFetchPhase() {
169170
private void innerExecuteFetchPhase() {
170171
sortedShardList = searchPhaseController.sortDocs(queryResults.values());
171172
final Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad = searchPhaseController.docIdsToLoad(sortedShardList);
173+
this.docIdsToLoad = docIdsToLoad;
172174

173175
if (docIdsToLoad.isEmpty()) {
174-
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
175176
finishHim();
176177
}
177178

@@ -219,8 +220,6 @@ private void innerExecuteFetchPhase() {
219220
}
220221
}
221222
}
222-
223-
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
224223
}
225224

226225
private void executeFetch(final AtomicInteger counter, FetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
@@ -259,6 +258,7 @@ private void innerFinishHim() {
259258
if (request.scroll() != null) {
260259
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), dfsResults);
261260
}
261+
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
262262
searchCache.releaseDfsResults(dfsResults);
263263
searchCache.releaseQueryResults(queryResults);
264264
searchCache.releaseFetchResults(fetchResults);

modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryAndFetchAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
import static org.elasticsearch.action.search.type.TransportSearchHelper.*;
4343

4444
/**
45-
* @author kimchy (Shay Banon)
45+
* @author kimchy (shay.banon)
4646
*/
4747
public class TransportSearchQueryAndFetchAction extends TransportSearchTypeAction {
4848

modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ private class AsyncAction extends BaseAsyncAction<QuerySearchResult> {
6363

6464
private final Map<SearchShardTarget, FetchSearchResult> fetchResults = searchCache.obtainFetchResults();
6565

66+
private volatile Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad;
6667

6768
private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listener) {
6869
super(request, listener);
@@ -83,9 +84,9 @@ private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listen
8384
@Override protected void moveToSecondPhase() {
8485
sortedShardList = searchPhaseController.sortDocs(queryResults.values());
8586
final Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad = searchPhaseController.docIdsToLoad(sortedShardList);
87+
this.docIdsToLoad = docIdsToLoad;
8688

8789
if (docIdsToLoad.isEmpty()) {
88-
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
8990
finishHim();
9091
}
9192

@@ -134,8 +135,6 @@ private AsyncAction(SearchRequest request, ActionListener<SearchResponse> listen
134135
}
135136
}
136137
}
137-
138-
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
139138
}
140139

141140
private void executeFetch(final AtomicInteger counter, FetchSearchRequest fetchSearchRequest, DiscoveryNode node) {
@@ -174,6 +173,7 @@ private void innerFinishHim() {
174173
if (request.scroll() != null) {
175174
scrollId = TransportSearchHelper.buildScrollId(request.searchType(), queryResults.values());
176175
}
176+
releaseIrrelevantSearchContexts(queryResults, docIdsToLoad);
177177
searchCache.releaseQueryResults(queryResults);
178178
searchCache.releaseFetchResults(fetchResults);
179179
invokeListener(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps, successulOps.get(), buildShardFailures()));

modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,9 @@ protected ShardSearchFailure[] buildShardFailures() {
279279
*/
280280
protected void releaseIrrelevantSearchContexts(Map<SearchShardTarget, QuerySearchResultProvider> queryResults,
281281
Map<SearchShardTarget, ExtTIntArrayList> docIdsToLoad) {
282+
if (docIdsToLoad == null) {
283+
return;
284+
}
282285
for (Map.Entry<SearchShardTarget, QuerySearchResultProvider> entry : queryResults.entrySet()) {
283286
if (!docIdsToLoad.containsKey(entry.getKey())) {
284287
DiscoveryNode node = nodes.get(entry.getKey().nodeId());

modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/support/AbstractConcurrentMapFilterCache.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,16 @@ protected AbstractConcurrentMapFilterCache(Index index, @IndexSettings Settings
7575

7676
private class IndexReaderCleaner implements Runnable {
7777
@Override public void run() {
78+
int totalCount = cache.size();
79+
int cleaned = 0;
7880
for (Iterator<IndexReader> readerIt = cache.keySet().iterator(); readerIt.hasNext();) {
7981
IndexReader reader = readerIt.next();
8082
if (reader.getRefCount() <= 0) {
8183
readerIt.remove();
84+
cleaned++;
8285
}
8386
}
87+
logger.trace("Cleaned [{}] out of estimated total [{}]", cleaned, totalCount);
8488
}
8589
}
8690

modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -225,8 +225,8 @@ public QueryFetchSearchResult executeFetchPhase(QuerySearchRequest request) thro
225225
queryPhase.execute(context);
226226
shortcutDocIdsToLoad(context);
227227
fetchPhase.execute(context);
228-
if (context.scroll() != null) {
229-
activeContexts.put(context.id(), context);
228+
if (context.scroll() == null) {
229+
freeContext(request.id());
230230
}
231231
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
232232
} catch (RuntimeException e) {

modules/elasticsearch/src/main/java/org/elasticsearch/search/action/SearchServiceTransportAction.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,25 +31,45 @@
3131
import org.elasticsearch.search.query.QuerySearchRequest;
3232
import org.elasticsearch.search.query.QuerySearchResult;
3333
import org.elasticsearch.transport.*;
34+
import org.elasticsearch.util.component.AbstractComponent;
3435
import org.elasticsearch.util.guice.inject.Inject;
3536
import org.elasticsearch.util.io.stream.LongStreamable;
3637
import org.elasticsearch.util.io.stream.VoidStreamable;
38+
import org.elasticsearch.util.logging.ESLogger;
39+
import org.elasticsearch.util.settings.Settings;
3740

3841
/**
3942
* An encapsulation of {@link org.elasticsearch.search.SearchService} operations exposed through
4043
* transport.
4144
*
4245
* @author kimchy (Shay Banon)
4346
*/
44-
public class SearchServiceTransportAction {
47+
public class SearchServiceTransportAction extends AbstractComponent {
48+
49+
static final class FreeContextResponseHandler extends VoidTransportResponseHandler {
50+
51+
private final ESLogger logger;
52+
53+
FreeContextResponseHandler(ESLogger logger) {
54+
super(false);
55+
this.logger = logger;
56+
}
57+
58+
@Override public void handleException(RemoteTransportException exp) {
59+
logger.warn("Failed to send release search context", exp);
60+
}
61+
}
4562

4663
private final TransportService transportService;
4764

4865
private final ClusterService clusterService;
4966

5067
private final SearchService searchService;
5168

52-
@Inject public SearchServiceTransportAction(TransportService transportService, ClusterService clusterService, SearchService searchService) {
69+
private final FreeContextResponseHandler freeContextResponseHandler = new FreeContextResponseHandler(logger);
70+
71+
@Inject public SearchServiceTransportAction(Settings settings, TransportService transportService, ClusterService clusterService, SearchService searchService) {
72+
super(settings);
5373
this.transportService = transportService;
5474
this.clusterService = clusterService;
5575
this.searchService = searchService;
@@ -69,7 +89,7 @@ public void sendFreeContext(DiscoveryNode node, final long contextId) {
6989
if (clusterService.state().nodes().localNodeId().equals(node.id())) {
7090
searchService.freeContext(contextId);
7191
} else {
72-
transportService.sendRequest(node, SearchFreeContextTransportHandler.ACTION, new LongStreamable(contextId), VoidTransportResponseHandler.INSTANCE_NOSPAWN);
92+
transportService.sendRequest(node, SearchFreeContextTransportHandler.ACTION, new LongStreamable(contextId), freeContextResponseHandler);
7393
}
7494
}
7595

0 commit comments

Comments
 (0)