Skip to content

Commit 6c4b031

Browse files
Fix concurrent search and index delete (#42621)
Changed order of listener invocation so that we notify before registering search context and notify after unregistering same. This ensures that count up/down like what we do in ShardSearchStats works. Otherwise, we risk notifying onFreeScrollContext before notifying onNewScrollContext (same for onFreeContext/onNewContext, but we currently have no assertions failing in those). Closes #28053
1 parent d20a187 commit 6c4b031

File tree

2 files changed

+48
-12
lines changed

2 files changed

+48
-12
lines changed

server/src/main/java/org/elasticsearch/search/SearchService.java

+31-10
Original file line numberDiff line numberDiff line change
@@ -548,19 +548,35 @@ final SearchContext createAndPutContext(ShardSearchRequest request) throws IOExc
548548
}
549549

550550
SearchContext context = createContext(request);
551+
onNewContext(context);
551552
boolean success = false;
552553
try {
553554
putContext(context);
554-
if (request.scroll() != null) {
555+
success = true;
556+
return context;
557+
} finally {
558+
if (success == false) {
559+
freeContext(context.id());
560+
}
561+
}
562+
}
563+
564+
private void onNewContext(SearchContext context) {
565+
boolean success = false;
566+
try {
567+
if (context.scrollContext() != null) {
555568
openScrollContexts.incrementAndGet();
556569
context.indexShard().getSearchOperationListener().onNewScrollContext(context);
557570
}
558571
context.indexShard().getSearchOperationListener().onNewContext(context);
559572
success = true;
560-
return context;
561573
} finally {
562-
if (!success) {
563-
freeContext(context.id());
574+
// currently, the concrete listener is CompositeListener, which swallows exceptions, but here we anyway try to do the
575+
// right thing by closing and notifying onFreeXXX in case one of the listeners fails with an exception in the future.
576+
if (success == false) {
577+
try (SearchContext dummy = context) {
578+
onFreeContext(context);
579+
}
564580
}
565581
}
566582
}
@@ -648,18 +664,23 @@ private void freeAllContextForIndex(Index index) {
648664
public boolean freeContext(long id) {
649665
try (SearchContext context = removeContext(id)) {
650666
if (context != null) {
651-
assert context.refCount() > 0 : " refCount must be > 0: " + context.refCount();
652-
context.indexShard().getSearchOperationListener().onFreeContext(context);
653-
if (context.scrollContext() != null) {
654-
openScrollContexts.decrementAndGet();
655-
context.indexShard().getSearchOperationListener().onFreeScrollContext(context);
656-
}
667+
onFreeContext(context);
657668
return true;
658669
}
659670
return false;
660671
}
661672
}
662673

674+
private void onFreeContext(SearchContext context) {
675+
assert context.refCount() > 0 : " refCount must be > 0: " + context.refCount();
676+
assert activeContexts.containsKey(context.id()) == false;
677+
context.indexShard().getSearchOperationListener().onFreeContext(context);
678+
if (context.scrollContext() != null) {
679+
openScrollContexts.decrementAndGet();
680+
context.indexShard().getSearchOperationListener().onFreeScrollContext(context);
681+
}
682+
}
683+
663684
public void freeAllScrollContexts() {
664685
for (SearchContext searchContext : activeContexts.values()) {
665686
if (searchContext.scrollContext() != null) {

server/src/test/java/org/elasticsearch/search/SearchServiceTests.java

+17-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.elasticsearch.search;
2020

2121
import com.carrotsearch.hppc.IntArrayList;
22-
2322
import org.apache.lucene.search.Query;
2423
import org.apache.lucene.store.AlreadyClosedException;
2524
import org.elasticsearch.ElasticsearchException;
@@ -51,6 +50,7 @@
5150
import org.elasticsearch.index.query.QueryRewriteContext;
5251
import org.elasticsearch.index.query.QueryShardContext;
5352
import org.elasticsearch.index.query.TermQueryBuilder;
53+
import org.elasticsearch.index.search.stats.SearchStats;
5454
import org.elasticsearch.index.shard.IndexShard;
5555
import org.elasticsearch.index.shard.SearchOperationListener;
5656
import org.elasticsearch.index.shard.ShardId;
@@ -226,6 +226,7 @@ public void testSearchWhileIndexDeleted() throws InterruptedException {
226226
AtomicBoolean running = new AtomicBoolean(true);
227227
CountDownLatch startGun = new CountDownLatch(1);
228228
Semaphore semaphore = new Semaphore(Integer.MAX_VALUE);
229+
229230
final Thread thread = new Thread() {
230231
@Override
231232
public void run() {
@@ -261,12 +262,16 @@ public void onFailure(Exception e) {
261262
try {
262263
final int rounds = scaledRandomIntBetween(100, 10000);
263264
SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true);
265+
SearchRequest scrollSearchRequest = new SearchRequest().allowPartialSearchResults(true)
266+
.scroll(new Scroll(TimeValue.timeValueMinutes(1)));
264267
for (int i = 0; i < rounds; i++) {
265268
try {
266269
try {
267270
PlainActionFuture<SearchPhaseResult> result = new PlainActionFuture<>();
271+
final boolean useScroll = randomBoolean();
268272
service.executeQueryPhase(
269-
new ShardSearchLocalRequest(searchRequest, indexShard.shardId(), 1,
273+
new ShardSearchLocalRequest(useScroll ? scrollSearchRequest : searchRequest,
274+
indexShard.shardId(), 1,
270275
new AliasFilter(null, Strings.EMPTY_ARRAY), 1.0f, -1, null, null),
271276
new SearchTask(123L, "", "", "", null, Collections.emptyMap()), result);
272277
SearchPhaseResult searchPhaseResult = result.get();
@@ -276,6 +281,9 @@ public void onFailure(Exception e) {
276281
PlainActionFuture<FetchSearchResult> listener = new PlainActionFuture<>();
277282
service.executeFetchPhase(req, new SearchTask(123L, "", "", "", null, Collections.emptyMap()), listener);
278283
listener.get();
284+
if (useScroll) {
285+
service.freeContext(searchPhaseResult.getRequestId());
286+
}
279287
} catch (ExecutionException ex) {
280288
assertThat(ex.getCause(), instanceOf(RuntimeException.class));
281289
throw ((RuntimeException)ex.getCause());
@@ -293,6 +301,13 @@ public void onFailure(Exception e) {
293301
thread.join();
294302
semaphore.acquire(Integer.MAX_VALUE);
295303
}
304+
305+
assertEquals(0, service.getActiveContexts());
306+
307+
SearchStats.Stats totalStats = indexShard.searchStats().getTotal();
308+
assertEquals(0, totalStats.getQueryCurrent());
309+
assertEquals(0, totalStats.getScrollCurrent());
310+
assertEquals(0, totalStats.getFetchCurrent());
296311
}
297312

298313
public void testTimeout() throws IOException {

0 commit comments

Comments
 (0)