Skip to content

Ensure searcher is release if wrapping fails #14107

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 14, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.support.LoggerMessageFormat;
import org.elasticsearch.common.lucene.Lucene;
Expand Down Expand Up @@ -717,11 +719,20 @@ public void failShard(String reason, @Nullable Throwable e) {

public Engine.Searcher acquireSearcher(String source) {
readAllowed();
Engine engine = getEngine();
final Engine engine = getEngine();
final Engine.Searcher searcher = engine.acquireSearcher(source);
boolean success = false;
try {
return searcherWrapper == null ? engine.acquireSearcher(source) : searcherWrapper.wrap(engineConfig, engine.acquireSearcher(source));
final Engine.Searcher wrappedSearcher = searcherWrapper == null ? searcher : searcherWrapper.wrap(engineConfig, searcher);
assert wrappedSearcher != null;
success = true;
return wrappedSearcher;
} catch (IOException ex) {
throw new ElasticsearchException("failed to wrap searcher", ex);
} finally {
if (success == false) {
Releasables.close(success, searcher);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
Expand Down Expand Up @@ -911,10 +912,6 @@ public void testSearcherWrapperIsUsed() throws IOException {
search = searcher.searcher().search(new TermQuery(new Term("foobar", "bar")), 10);
assertEquals(search.totalHits, 1);
}

ShardRouting routing = new ShardRouting(shard.routingEntry());
shard.close("simon says", true);
IndexServicesProvider indexServices = indexService.getIndexServices();
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {
@Override
public DirectoryReader wrap(DirectoryReader reader) throws IOException {
Expand All @@ -927,16 +924,8 @@ public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) thr
}
};

IndexServicesProvider newProvider = new IndexServicesProvider(indexServices.getIndicesLifecycle(), indexServices.getThreadPool(), indexServices.getMapperService(), indexServices.getQueryParserService(), indexServices.getIndexCache(), indexServices.getIndicesQueryCache(), indexServices.getCodecService(), indexServices.getTermVectorsService(), indexServices.getIndexFieldDataService(), indexServices.getWarmer(), indexServices.getSimilarityService(), indexServices.getFactory(), indexServices.getBigArrays(), wrapper, indexServices.getIndexingMemoryController());
IndexShard newShard = new IndexShard(shard.shardId(), shard.indexSettings, shard.shardPath(), shard.store(), newProvider);
IndexShard newShard = reinitWithWrapper(indexService, shard, wrapper);
try {
ShardRoutingHelper.reinit(routing);
newShard.updateRoutingEntry(routing, false);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
assertTrue(newShard.recoverFromStore(routing, localNode));
routing = new ShardRouting(routing);
ShardRoutingHelper.moveToStarted(routing);
newShard.updateRoutingEntry(routing, true);
try (Engine.Searcher searcher = newShard.acquireSearcher("test")) {
TopDocs search = searcher.searcher().search(new TermQuery(new Term("foo", "bar")), 10);
assertEquals(search.totalHits, 0);
Expand All @@ -948,7 +937,34 @@ public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) thr
assertNotNull(getResult.searcher()); // make sure get uses the wrapped reader
assertTrue(getResult.searcher().reader() instanceof FieldMaskingReader);
getResult.release();
} finally {
newShard.close("just do it", randomBoolean());
}
}

public void testSearcherWrapperWorksWithGlobaOrdinals() throws IOException {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexService("test");
IndexShard shard = indexService.getShardOrNull(0);
client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get();
client().prepareIndex("test", "test", "1").setSource("{\"foobar\" : \"bar\"}").setRefresh(true).get();

IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {
@Override
public DirectoryReader wrap(DirectoryReader reader) throws IOException {
return new FieldMaskingReader("foo", reader);
}

@Override
public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws EngineException {
return searcher;
}
};

IndexShard newShard = reinitWithWrapper(indexService, shard, wrapper);
try {
// test global ordinals are evicted
MappedFieldType foo = newShard.mapperService().indexName("foo");
IndexFieldData.Global ifd = shard.indexFieldDataService().getForField(foo);
Expand All @@ -970,7 +986,53 @@ public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) thr
} finally {
newShard.close("just do it", randomBoolean());
}
}

public void testSearchIsReleaseIfWrapperFails() throws IOException {
createIndex("test");
ensureGreen();
IndicesService indicesService = getInstanceFromNode(IndicesService.class);
IndexService indexService = indicesService.indexService("test");
IndexShard shard = indexService.getShardOrNull(0);
client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}").setRefresh(true).get();
IndexSearcherWrapper wrapper = new IndexSearcherWrapper() {
@Override
public DirectoryReader wrap(DirectoryReader reader) throws IOException {
throw new RuntimeException("boom");
}

@Override
public IndexSearcher wrap(EngineConfig engineConfig, IndexSearcher searcher) throws EngineException {
return searcher;
}
};

IndexShard newShard = reinitWithWrapper(indexService, shard, wrapper);
try {
newShard.acquireSearcher("test");
fail("exception expected");
} catch (RuntimeException ex) {
//
} finally {
newShard.close("just do it", randomBoolean());
}
// test will fail due to unclosed searchers if the searcher is not released
}

private final IndexShard reinitWithWrapper(IndexService indexService, IndexShard shard, IndexSearcherWrapper wrapper) throws IOException {
ShardRouting routing = new ShardRouting(shard.routingEntry());
shard.close("simon says", true);
IndexServicesProvider indexServices = indexService.getIndexServices();
IndexServicesProvider newProvider = new IndexServicesProvider(indexServices.getIndicesLifecycle(), indexServices.getThreadPool(), indexServices.getMapperService(), indexServices.getQueryParserService(), indexServices.getIndexCache(), indexServices.getIndicesQueryCache(), indexServices.getCodecService(), indexServices.getTermVectorsService(), indexServices.getIndexFieldDataService(), indexServices.getWarmer(), indexServices.getSimilarityService(), indexServices.getFactory(), indexServices.getBigArrays(), wrapper, indexServices.getIndexingMemoryController());
IndexShard newShard = new IndexShard(shard.shardId(), shard.indexSettings, shard.shardPath(), shard.store(), newProvider);
ShardRoutingHelper.reinit(routing);
newShard.updateRoutingEntry(routing, false);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
assertTrue(newShard.recoverFromStore(routing, localNode));
routing = new ShardRouting(routing);
ShardRoutingHelper.moveToStarted(routing);
newShard.updateRoutingEntry(routing, true);
return newShard;
}

private static class FieldMaskingReader extends FilterDirectoryReader {
Expand Down