Skip to content

Refactor ShardFailure listener infrastructure #14206

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Oct 21, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 5 additions & 20 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public abstract class Engine implements Closeable {
protected final EngineConfig engineConfig;
protected final Store store;
protected final AtomicBoolean isClosed = new AtomicBoolean(false);
protected final FailedEngineListener failedEngineListener;
protected final EventListener eventListener;
protected final SnapshotDeletionPolicy deletionPolicy;
protected final ReentrantLock failEngineLock = new ReentrantLock();
protected final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
Expand All @@ -89,7 +89,7 @@ protected Engine(EngineConfig engineConfig) {
this.store = engineConfig.getStore();
this.logger = Loggers.getLogger(Engine.class, // we use the engine class directly here to make sure all subclasses have the same logger name
engineConfig.getIndexSettings(), engineConfig.getShardId());
this.failedEngineListener = engineConfig.getFailedEngineListener();
this.eventListener = engineConfig.getEventListener();
this.deletionPolicy = engineConfig.getDeletionPolicy();
}

Expand Down Expand Up @@ -535,7 +535,7 @@ public void failEngine(String reason, @Nullable Throwable failure) {
logger.warn("Couldn't mark store corrupted", e);
}
}
failedEngineListener.onFailedEngine(shardId, reason, failure);
eventListener.onFailedEngine(reason, failure);
}
} catch (Throwable t) {
// don't bubble up these exceptions up
Expand All @@ -560,19 +560,9 @@ protected boolean maybeFailEngine(String source, Throwable t) {
return false;
}

/** Wrap a Throwable in an {@code EngineClosedException} if the engine is already closed */
protected Throwable wrapIfClosed(Throwable t) {
if (isClosed.get()) {
if (t != failedEngine && failedEngine != null) {
t.addSuppressed(failedEngine);
}
return new EngineClosedException(shardId, t);
}
return t;
}

public interface FailedEngineListener {
void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t);
public interface EventListener {
default void onFailedEngine(String reason, @Nullable Throwable t) {}
}

public static class Searcher implements Releasable {
Expand Down Expand Up @@ -991,11 +981,6 @@ public void close() throws IOException {
}
}

/**
* Returns <code>true</code> the internal writer has any uncommitted changes. Otherwise <code>false</code>
*/
public abstract boolean hasUncommittedChanges();

public static class CommitId implements Writeable {

private final byte[] id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public final class EngineConfig {
private final Analyzer analyzer;
private final Similarity similarity;
private final CodecService codecService;
private final Engine.FailedEngineListener failedEngineListener;
private final Engine.EventListener eventListener;
private final boolean forceNewTranslog;
private final QueryCache queryCache;
private final QueryCachingPolicy queryCachingPolicy;
Expand Down Expand Up @@ -117,7 +117,7 @@ public final class EngineConfig {
public EngineConfig(ShardId shardId, ThreadPool threadPool, ShardIndexingService indexingService,
Settings indexSettings, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy,
MergePolicy mergePolicy, MergeSchedulerConfig mergeSchedulerConfig, Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.FailedEngineListener failedEngineListener,
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, TranslogConfig translogConfig) {
this.shardId = shardId;
this.indexSettings = indexSettings;
Expand All @@ -131,7 +131,7 @@ public EngineConfig(ShardId shardId, ThreadPool threadPool, ShardIndexingService
this.analyzer = analyzer;
this.similarity = similarity;
this.codecService = codecService;
this.failedEngineListener = failedEngineListener;
this.eventListener = eventListener;
this.compoundOnFlush = indexSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush);
codecName = indexSettings.get(EngineConfig.INDEX_CODEC_SETTING, EngineConfig.DEFAULT_CODEC_NAME);
// We start up inactive and rely on IndexingMemoryController to give us our fair share once we start indexing:
Expand Down Expand Up @@ -310,8 +310,8 @@ public MergeSchedulerConfig getMergeSchedulerConfig() {
/**
* Returns a listener that should be called on engine failure
*/
public Engine.FailedEngineListener getFailedEngineListener() {
return failedEngineListener;
public Engine.EventListener getEventListener() {
return eventListener;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -846,11 +846,6 @@ protected final void closeNoLock(String reason) {
}
}

@Override
public boolean hasUncommittedChanges() {
return indexWriter.hasUncommittedChanges();
}

@Override
protected SearcherManager getSearcherManager() {
return searcherManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,6 @@ public IndexCommit snapshotIndex(boolean flushFirst) throws EngineException {
throw new UnsupportedOperationException("Can not take snapshot from a shadow engine");
}



@Override
protected SearcherManager getSearcherManager() {
return searcherManager;
Expand All @@ -223,11 +221,6 @@ protected void closeNoLock(String reason) {
}
}

@Override
public boolean hasUncommittedChanges() {
return false;
}

@Override
protected SegmentInfos getLastCommittedSegmentInfos() {
return lastCommittedSegmentInfos;
Expand Down
41 changes: 30 additions & 11 deletions core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.support.LoggerMessageFormat;
Expand All @@ -51,6 +50,7 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
Expand All @@ -76,7 +76,6 @@
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.percolator.PercolateStats;
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats;
Expand Down Expand Up @@ -167,7 +166,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
private final MeanMetric refreshMetric = new MeanMetric();
private final MeanMetric flushMetric = new MeanMetric();

private final ShardEngineFailListener failedEngineListener = new ShardEngineFailListener();
private final ShardEventListener shardEventListener = new ShardEventListener();
private volatile boolean flushOnClose = true;
private volatile int flushThresholdOperations;
private volatile ByteSizeValue flushThresholdSize;
Expand Down Expand Up @@ -979,8 +978,8 @@ private void startScheduledTasksIfNeeded() {

public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval";

public void addFailedEngineListener(Engine.FailedEngineListener failedEngineListener) {
this.failedEngineListener.delegates.add(failedEngineListener);
public void addShardFailureCallback(Callback<ShardFailure> onShardFailure) {
this.shardEventListener.delegates.add(onShardFailure);
}

/** Change the indexing and translog buffer sizes. If {@code IndexWriter} is currently using more than
Expand Down Expand Up @@ -1369,15 +1368,16 @@ protected Engine getEngineOrNull() {
return this.currentEngineReference.get();
}

class ShardEngineFailListener implements Engine.FailedEngineListener {
private final CopyOnWriteArrayList<Engine.FailedEngineListener> delegates = new CopyOnWriteArrayList<>();
class ShardEventListener implements Engine.EventListener {
private final CopyOnWriteArrayList<Callback<ShardFailure>> delegates = new CopyOnWriteArrayList<>();

// called by the current engine
@Override
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable failure) {
for (Engine.FailedEngineListener listener : delegates) {
public void onFailedEngine(String reason, @Nullable Throwable failure) {
final ShardFailure shardFailure = new ShardFailure(shardRouting, reason, failure, getIndexUUID());
for (Callback<ShardFailure> listener : delegates) {
try {
listener.onFailedEngine(shardId, reason, failure);
listener.handle(shardFailure);
} catch (Exception e) {
logger.warn("exception while notifying engine failure", e);
}
Expand Down Expand Up @@ -1457,7 +1457,7 @@ protected void operationProcessed() {
};
return new EngineConfig(shardId,
threadPool, indexingService, indexSettings, warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig,
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig);
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig);
}

private static class IndexShardOperationCounter extends AbstractRefCounted {
Expand Down Expand Up @@ -1571,4 +1571,23 @@ public void onAfter() {
return false;
}

/**
* Simple struct encapsulating a shard failure
* @see IndexShard#addShardFailureCallback(Callback)
*/
public static final class ShardFailure {
public final ShardRouting routing;
public final String reason;
@Nullable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks.

public final Throwable cause;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we mark this as nullable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this for? it doesn't by me anything except of bloat?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use @nullable everywhere (including in onFailedEngine) to indicate that something can be null and people should watch out for it. I personally like it but if we end up not liking it, we should just kill @nullable and all it's usages. If we want to remove it, maybe open and issue and see what people think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should have requirements like this when writing new code... I use getFoo everywhere when I add it even if we are not consistent at least I am not adding any new unneeded annotations..

public final String indexUUID;

public ShardFailure(ShardRouting routing, String reason, @Nullable Throwable cause, String indexUUID) {
this.routing = routing;
this.reason = reason;
this.cause = cause;
this.indexUUID = indexUUID;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexShardAlreadyExistsException;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.settings.IndexSettingsService;
Expand Down Expand Up @@ -98,7 +98,7 @@ static class FailedShard {
}

private final Object mutex = new Object();
private final FailedEngineHandler failedEngineHandler = new FailedEngineHandler();
private final FailedShardHandler failedShardHandler = new FailedShardHandler();

private final boolean sendRefreshMapping;

Expand Down Expand Up @@ -381,7 +381,7 @@ private void applyMappings(ClusterChangedEvent event) {
// so this failure typically means wrong node level configuration or something similar
for (IndexShard indexShard : indexService) {
ShardRouting shardRouting = indexShard.routingEntry();
failAndRemoveShard(shardRouting, indexService, true, "failed to update mappings", t);
failAndRemoveShard(shardRouting, indexService.indexUUID(), indexService, true, "failed to update mappings", t);
}
}
}
Expand Down Expand Up @@ -637,11 +637,11 @@ private void applyInitializingShard(final ClusterState state, final IndexMetaDat
}
IndexShard indexShard = indexService.createShard(shardId, shardRouting);
indexShard.updateRoutingEntry(shardRouting, state.blocks().disableStatePersistence() == false);
indexShard.addFailedEngineListener(failedEngineHandler);
indexShard.addShardFailureCallback(failedShardHandler);
} catch (IndexShardAlreadyExistsException e) {
// ignore this, the method call can happen several times
} catch (Throwable e) {
failAndRemoveShard(shardRouting, indexService, true, "failed to create shard", e);
failAndRemoveShard(shardRouting, indexService.indexUUID(), indexService, true, "failed to create shard", e);
return;
}
}
Expand Down Expand Up @@ -768,7 +768,7 @@ public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, bo

private void handleRecoveryFailure(IndexService indexService, ShardRouting shardRouting, boolean sendShardFailure, Throwable failure) {
synchronized (mutex) {
failAndRemoveShard(shardRouting, indexService, sendShardFailure, "failed recovery", failure);
failAndRemoveShard(shardRouting, indexService.indexUUID(), indexService, sendShardFailure, "failed recovery", failure);
}
}

Expand Down Expand Up @@ -802,8 +802,10 @@ private void deleteIndex(String index, String reason) {

}

private void failAndRemoveShard(ShardRouting shardRouting, IndexService indexService, boolean sendShardFailure, String message, @Nullable Throwable failure) {
if (indexService.hasShard(shardRouting.getId())) {
private void failAndRemoveShard(ShardRouting shardRouting, String indexUUID, @Nullable IndexService indexService, boolean sendShardFailure, String message, @Nullable Throwable failure) {
if (indexService != null && indexService.hasShard(shardRouting.getId())) {
// if the indexService is null we can't remove the shard, that's fine since we might have a failure
// when the index is remove and then we already removed the index service for that shard...
try {
indexService.removeShard(shardRouting.getId(), message);
} catch (ShardNotFoundException e) {
Expand All @@ -813,7 +815,7 @@ private void failAndRemoveShard(ShardRouting shardRouting, IndexService indexSer
}
}
if (sendShardFailure) {
sendFailShard(shardRouting, indexService.indexUUID(), message, failure);
sendFailShard(shardRouting, indexUUID, message, failure);
}
}

Expand All @@ -827,29 +829,14 @@ private void sendFailShard(ShardRouting shardRouting, String indexUUID, String m
}
}

private class FailedEngineHandler implements Engine.FailedEngineListener {
private class FailedShardHandler implements Callback<IndexShard.ShardFailure> {
@Override
public void onFailedEngine(final ShardId shardId, final String reason, final @Nullable Throwable failure) {
ShardRouting shardRouting = null;
final IndexService indexService = indicesService.indexService(shardId.index().name());
if (indexService != null) {
IndexShard indexShard = indexService.getShardOrNull(shardId.id());
if (indexShard != null) {
shardRouting = indexShard.routingEntry();
}
}
if (shardRouting == null) {
logger.warn("[{}][{}] engine failed, but can't find index shard. failure reason: [{}]", failure,
shardId.index().name(), shardId.id(), reason);
return;
}
final ShardRouting fShardRouting = shardRouting;
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
synchronized (mutex) {
failAndRemoveShard(fShardRouting, indexService, true, "engine failure, reason [" + reason + "]", failure);
}
public void handle(final IndexShard.ShardFailure shardFailure) {
final IndexService indexService = indicesService.indexService(shardFailure.routing.shardId().index().name());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we used to protect agains null here -> I think we should still do this? I'm thinking about failures that happen during index deletion..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

look again we only checked for null to get the ShardRouting... not needed here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll run into an NPE in the failAndRemoveShard code:

    private void failAndRemoveShard(ShardRouting shardRouting, IndexService indexService, boolean sendShardFailure, String message, @Nullable Throwable failure) {
        if (indexService.hasShard(shardRouting.getId())) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get why this should be null I mean if it's null it's an error condition and we just throw an NPE? How can we fail and have no IndexService for it? something is messed up here and wrong. I don't think we should check for null here... the lenient code before seems wrong?

final ShardRouting shardRouting = shardFailure.routing;
threadPool.generic().execute(() -> {
synchronized (mutex) {
failAndRemoveShard(shardRouting, shardFailure.indexUUID, indexService, true, "shard failure, reason [" + shardFailure.reason + "]", shardFailure.cause);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,9 @@ public EngineConfig config(Settings indexSettings, Store store, Path translogPat

EngineConfig config = new EngineConfig(shardId, threadPool, new ShardIndexingService(shardId, indexSettings), indexSettings
, null, store, createSnapshotDeletionPolicy(), mergePolicy, mergeSchedulerConfig,
iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(shardId.index()), new Engine.FailedEngineListener() {
iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(shardId.index()), new Engine.EventListener() {
@Override
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) {
public void onFailedEngine(String reason, @Nullable Throwable t) {
// we don't need to notify anybody in this test
}
}, new TranslogHandler(shardId.index().getName(), logger), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig);
Expand Down Expand Up @@ -1950,7 +1950,7 @@ public void testRecoverFromForeignTranslog() throws IOException {

EngineConfig brokenConfig = new EngineConfig(shardId, threadPool, config.getIndexingService(), config.getIndexSettings()
, null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getMergeSchedulerConfig(),
config.getAnalyzer(), config.getSimilarity(), new CodecService(shardId.index()), config.getFailedEngineListener()
config.getAnalyzer(), config.getSimilarity(), new CodecService(shardId.index()), config.getEventListener()
, config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig);

try {
Expand Down
Loading