Skip to content

Commit 5f86ea1

Browse files
Fix concurrent search and index delete
Changed order of listener invocation so that we notify before registering search context and notify after unregistering same. This ensures that count up/down like what we do in ShardSearchStats works. Otherwise, we risk notifying onFreeScrollContext before notifying onNewScrollContext (same for onFreeContext/onNewContext, but we currently have no assertions failing in those).
1 parent 5f651f4 commit 5f86ea1

File tree

2 files changed

+17
-3
lines changed

2 files changed

+17
-3
lines changed

server/src/main/java/org/elasticsearch/search/SearchService.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
4545
import org.elasticsearch.core.internal.io.IOUtils;
4646
import org.elasticsearch.index.Index;
47+
import org.elasticsearch.index.IndexNotFoundException;
4748
import org.elasticsearch.index.IndexService;
4849
import org.elasticsearch.index.IndexSettings;
4950
import org.elasticsearch.index.engine.Engine;
@@ -550,12 +551,19 @@ final SearchContext createAndPutContext(ShardSearchRequest request) throws IOExc
550551
SearchContext context = createContext(request);
551552
boolean success = false;
552553
try {
553-
putContext(context);
554554
if (request.scroll() != null) {
555555
openScrollContexts.incrementAndGet();
556556
context.indexShard().getSearchOperationListener().onNewScrollContext(context);
557557
}
558558
context.indexShard().getSearchOperationListener().onNewContext(context);
559+
putContext(context);
560+
// ensure that if index is deleted concurrently, we free the context immediately, either here or in afterIndexRemoved
561+
try {
562+
indicesService.indexServiceSafe(request.shardId().getIndex());
563+
} catch (IndexNotFoundException e) {
564+
freeContext(context.id());
565+
throw e;
566+
}
559567
success = true;
560568
return context;
561569
} finally {

server/src/test/java/org/elasticsearch/search/SearchServiceTests.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.elasticsearch.search;
2020

2121
import com.carrotsearch.hppc.IntArrayList;
22-
2322
import org.apache.lucene.search.Query;
2423
import org.apache.lucene.store.AlreadyClosedException;
2524
import org.elasticsearch.ElasticsearchException;
@@ -261,12 +260,16 @@ public void onFailure(Exception e) {
261260
try {
262261
final int rounds = scaledRandomIntBetween(100, 10000);
263262
SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true);
263+
SearchRequest scrollSearchRequest = new SearchRequest().allowPartialSearchResults(true)
264+
.scroll(new Scroll(TimeValue.timeValueMinutes(1)));
264265
for (int i = 0; i < rounds; i++) {
265266
try {
266267
try {
267268
PlainActionFuture<SearchPhaseResult> result = new PlainActionFuture<>();
269+
boolean useScroll = randomBoolean();
268270
service.executeQueryPhase(
269-
new ShardSearchLocalRequest(searchRequest, indexShard.shardId(), 1,
271+
new ShardSearchLocalRequest(useScroll ? searchRequest : scrollSearchRequest,
272+
indexShard.shardId(), 1,
270273
new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null),
271274
new SearchTask(123L, "", "", "", null, Collections.emptyMap()), result);
272275
SearchPhaseResult searchPhaseResult = result.get();
@@ -276,6 +279,9 @@ public void onFailure(Exception e) {
276279
PlainActionFuture<FetchSearchResult> listener = new PlainActionFuture<>();
277280
service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null, Collections.emptyMap()), listener);
278281
listener.get();
282+
if (useScroll) {
283+
service.freeContext(searchPhaseResult.requestId);
284+
}
279285
} catch (ExecutionException ex) {
280286
assertThat(ex.getCause(), instanceOf(RuntimeException.class));
281287
throw ((RuntimeException)ex.getCause());

0 commit comments

Comments
 (0)