diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java
index 1431cbd4f9d17..2e970c2c3ec52 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java
@@ -72,7 +72,7 @@ public abstract class Engine implements Closeable {
protected final EngineConfig engineConfig;
protected final Store store;
protected final AtomicBoolean isClosed = new AtomicBoolean(false);
- protected final FailedEngineListener failedEngineListener;
+ protected final EventListener eventListener;
protected final SnapshotDeletionPolicy deletionPolicy;
protected final ReentrantLock failEngineLock = new ReentrantLock();
protected final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
@@ -89,7 +89,7 @@ protected Engine(EngineConfig engineConfig) {
this.store = engineConfig.getStore();
this.logger = Loggers.getLogger(Engine.class, // we use the engine class directly here to make sure all subclasses have the same logger name
engineConfig.getIndexSettings(), engineConfig.getShardId());
- this.failedEngineListener = engineConfig.getFailedEngineListener();
+ this.eventListener = engineConfig.getEventListener();
this.deletionPolicy = engineConfig.getDeletionPolicy();
}
@@ -535,7 +535,7 @@ public void failEngine(String reason, @Nullable Throwable failure) {
logger.warn("Couldn't mark store corrupted", e);
}
}
- failedEngineListener.onFailedEngine(shardId, reason, failure);
+ eventListener.onFailedEngine(reason, failure);
}
} catch (Throwable t) {
// don't bubble up these exceptions up
@@ -560,19 +560,9 @@ protected boolean maybeFailEngine(String source, Throwable t) {
return false;
}
- /** Wrap a Throwable in an {@code EngineClosedException} if the engine is already closed */
- protected Throwable wrapIfClosed(Throwable t) {
- if (isClosed.get()) {
- if (t != failedEngine && failedEngine != null) {
- t.addSuppressed(failedEngine);
- }
- return new EngineClosedException(shardId, t);
- }
- return t;
- }
- public interface FailedEngineListener {
- void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t);
+ public interface EventListener {
+ default void onFailedEngine(String reason, @Nullable Throwable t) {}
}
public static class Searcher implements Releasable {
@@ -991,11 +981,6 @@ public void close() throws IOException {
}
}
- /**
- * Returns true
the internal writer has any uncommitted changes. Otherwise false
- */
- public abstract boolean hasUncommittedChanges();
-
public static class CommitId implements Writeable {
private final byte[] id;
diff --git a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java
index 7ed3d3652117e..adc04cbee52be 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java
@@ -69,7 +69,7 @@ public final class EngineConfig {
private final Analyzer analyzer;
private final Similarity similarity;
private final CodecService codecService;
- private final Engine.FailedEngineListener failedEngineListener;
+ private final Engine.EventListener eventListener;
private final boolean forceNewTranslog;
private final QueryCache queryCache;
private final QueryCachingPolicy queryCachingPolicy;
@@ -117,7 +117,7 @@ public final class EngineConfig {
public EngineConfig(ShardId shardId, ThreadPool threadPool, ShardIndexingService indexingService,
Settings indexSettings, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy,
MergePolicy mergePolicy, MergeSchedulerConfig mergeSchedulerConfig, Analyzer analyzer,
- Similarity similarity, CodecService codecService, Engine.FailedEngineListener failedEngineListener,
+ Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, TranslogConfig translogConfig) {
this.shardId = shardId;
this.indexSettings = indexSettings;
@@ -131,7 +131,7 @@ public EngineConfig(ShardId shardId, ThreadPool threadPool, ShardIndexingService
this.analyzer = analyzer;
this.similarity = similarity;
this.codecService = codecService;
- this.failedEngineListener = failedEngineListener;
+ this.eventListener = eventListener;
this.compoundOnFlush = indexSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush);
codecName = indexSettings.get(EngineConfig.INDEX_CODEC_SETTING, EngineConfig.DEFAULT_CODEC_NAME);
// We start up inactive and rely on IndexingMemoryController to give us our fair share once we start indexing:
@@ -310,8 +310,8 @@ public MergeSchedulerConfig getMergeSchedulerConfig() {
/**
* Returns a listener that should be called on engine failure
*/
- public Engine.FailedEngineListener getFailedEngineListener() {
- return failedEngineListener;
+ public Engine.EventListener getEventListener() {
+ return eventListener;
}
/**
diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
index 75bcdfa552e9d..2bae6a9fba8e2 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
@@ -846,11 +846,6 @@ protected final void closeNoLock(String reason) {
}
}
- @Override
- public boolean hasUncommittedChanges() {
- return indexWriter.hasUncommittedChanges();
- }
-
@Override
protected SearcherManager getSearcherManager() {
return searcherManager;
diff --git a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java
index 921f1167f4324..46677d3a55be1 100644
--- a/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java
+++ b/core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java
@@ -202,8 +202,6 @@ public IndexCommit snapshotIndex(boolean flushFirst) throws EngineException {
throw new UnsupportedOperationException("Can not take snapshot from a shadow engine");
}
-
-
@Override
protected SearcherManager getSearcherManager() {
return searcherManager;
@@ -223,11 +221,6 @@ protected void closeNoLock(String reason) {
}
}
- @Override
- public boolean hasUncommittedChanges() {
- return false;
- }
-
@Override
protected SegmentInfos getLastCommittedSegmentInfos() {
return lastCommittedSegmentInfos;
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index a6b86e0f59ba7..6923dbdee0f9b 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -41,7 +41,6 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
-import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.support.LoggerMessageFormat;
@@ -51,6 +50,7 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
@@ -76,7 +76,6 @@
import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.percolator.PercolateStats;
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
-import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.recovery.RecoveryStats;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats;
@@ -167,7 +166,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
private final MeanMetric refreshMetric = new MeanMetric();
private final MeanMetric flushMetric = new MeanMetric();
- private final ShardEngineFailListener failedEngineListener = new ShardEngineFailListener();
+ private final ShardEventListener shardEventListener = new ShardEventListener();
private volatile boolean flushOnClose = true;
private volatile int flushThresholdOperations;
private volatile ByteSizeValue flushThresholdSize;
@@ -979,8 +978,8 @@ private void startScheduledTasksIfNeeded() {
public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval";
- public void addFailedEngineListener(Engine.FailedEngineListener failedEngineListener) {
- this.failedEngineListener.delegates.add(failedEngineListener);
+ public void addShardFailureCallback(Callback onShardFailure) {
+ this.shardEventListener.delegates.add(onShardFailure);
}
/** Change the indexing and translog buffer sizes. If {@code IndexWriter} is currently using more than
@@ -1369,15 +1368,16 @@ protected Engine getEngineOrNull() {
return this.currentEngineReference.get();
}
- class ShardEngineFailListener implements Engine.FailedEngineListener {
- private final CopyOnWriteArrayList delegates = new CopyOnWriteArrayList<>();
+ class ShardEventListener implements Engine.EventListener {
+ private final CopyOnWriteArrayList> delegates = new CopyOnWriteArrayList<>();
// called by the current engine
@Override
- public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable failure) {
- for (Engine.FailedEngineListener listener : delegates) {
+ public void onFailedEngine(String reason, @Nullable Throwable failure) {
+ final ShardFailure shardFailure = new ShardFailure(shardRouting, reason, failure, getIndexUUID());
+ for (Callback listener : delegates) {
try {
- listener.onFailedEngine(shardId, reason, failure);
+ listener.handle(shardFailure);
} catch (Exception e) {
logger.warn("exception while notifying engine failure", e);
}
@@ -1457,7 +1457,7 @@ protected void operationProcessed() {
};
return new EngineConfig(shardId,
threadPool, indexingService, indexSettings, warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig,
- mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig);
+ mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig);
}
private static class IndexShardOperationCounter extends AbstractRefCounted {
@@ -1571,4 +1571,23 @@ public void onAfter() {
return false;
}
+ /**
+ * Simple struct encapsulating a shard failure
+ * @see IndexShard#addShardFailureCallback(Callback)
+ */
+ public static final class ShardFailure {
+ public final ShardRouting routing;
+ public final String reason;
+ @Nullable
+ public final Throwable cause;
+ public final String indexUUID;
+
+ public ShardFailure(ShardRouting routing, String reason, @Nullable Throwable cause, String indexUUID) {
+ this.routing = routing;
+ this.reason = reason;
+ this.cause = cause;
+ this.indexUUID = indexUUID;
+ }
+ }
+
}
diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
index b4c6b27c57ebc..76c5fe26403f1 100644
--- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
+++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
@@ -41,10 +41,10 @@
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.Callback;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexShardAlreadyExistsException;
-import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.settings.IndexSettingsService;
@@ -98,7 +98,7 @@ static class FailedShard {
}
private final Object mutex = new Object();
- private final FailedEngineHandler failedEngineHandler = new FailedEngineHandler();
+ private final FailedShardHandler failedShardHandler = new FailedShardHandler();
private final boolean sendRefreshMapping;
@@ -381,7 +381,7 @@ private void applyMappings(ClusterChangedEvent event) {
// so this failure typically means wrong node level configuration or something similar
for (IndexShard indexShard : indexService) {
ShardRouting shardRouting = indexShard.routingEntry();
- failAndRemoveShard(shardRouting, indexService, true, "failed to update mappings", t);
+ failAndRemoveShard(shardRouting, indexService.indexUUID(), indexService, true, "failed to update mappings", t);
}
}
}
@@ -637,11 +637,11 @@ private void applyInitializingShard(final ClusterState state, final IndexMetaDat
}
IndexShard indexShard = indexService.createShard(shardId, shardRouting);
indexShard.updateRoutingEntry(shardRouting, state.blocks().disableStatePersistence() == false);
- indexShard.addFailedEngineListener(failedEngineHandler);
+ indexShard.addShardFailureCallback(failedShardHandler);
} catch (IndexShardAlreadyExistsException e) {
// ignore this, the method call can happen several times
} catch (Throwable e) {
- failAndRemoveShard(shardRouting, indexService, true, "failed to create shard", e);
+ failAndRemoveShard(shardRouting, indexService.indexUUID(), indexService, true, "failed to create shard", e);
return;
}
}
@@ -768,7 +768,7 @@ public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, bo
private void handleRecoveryFailure(IndexService indexService, ShardRouting shardRouting, boolean sendShardFailure, Throwable failure) {
synchronized (mutex) {
- failAndRemoveShard(shardRouting, indexService, sendShardFailure, "failed recovery", failure);
+ failAndRemoveShard(shardRouting, indexService.indexUUID(), indexService, sendShardFailure, "failed recovery", failure);
}
}
@@ -802,8 +802,10 @@ private void deleteIndex(String index, String reason) {
}
- private void failAndRemoveShard(ShardRouting shardRouting, IndexService indexService, boolean sendShardFailure, String message, @Nullable Throwable failure) {
- if (indexService.hasShard(shardRouting.getId())) {
+ private void failAndRemoveShard(ShardRouting shardRouting, String indexUUID, @Nullable IndexService indexService, boolean sendShardFailure, String message, @Nullable Throwable failure) {
+ if (indexService != null && indexService.hasShard(shardRouting.getId())) {
+ // if the indexService is null we can't remove the shard, that's fine since we might have a failure
+ // when the index is remove and then we already removed the index service for that shard...
try {
indexService.removeShard(shardRouting.getId(), message);
} catch (ShardNotFoundException e) {
@@ -813,7 +815,7 @@ private void failAndRemoveShard(ShardRouting shardRouting, IndexService indexSer
}
}
if (sendShardFailure) {
- sendFailShard(shardRouting, indexService.indexUUID(), message, failure);
+ sendFailShard(shardRouting, indexUUID, message, failure);
}
}
@@ -827,29 +829,14 @@ private void sendFailShard(ShardRouting shardRouting, String indexUUID, String m
}
}
- private class FailedEngineHandler implements Engine.FailedEngineListener {
+ private class FailedShardHandler implements Callback {
@Override
- public void onFailedEngine(final ShardId shardId, final String reason, final @Nullable Throwable failure) {
- ShardRouting shardRouting = null;
- final IndexService indexService = indicesService.indexService(shardId.index().name());
- if (indexService != null) {
- IndexShard indexShard = indexService.getShardOrNull(shardId.id());
- if (indexShard != null) {
- shardRouting = indexShard.routingEntry();
- }
- }
- if (shardRouting == null) {
- logger.warn("[{}][{}] engine failed, but can't find index shard. failure reason: [{}]", failure,
- shardId.index().name(), shardId.id(), reason);
- return;
- }
- final ShardRouting fShardRouting = shardRouting;
- threadPool.generic().execute(new Runnable() {
- @Override
- public void run() {
- synchronized (mutex) {
- failAndRemoveShard(fShardRouting, indexService, true, "engine failure, reason [" + reason + "]", failure);
- }
+ public void handle(final IndexShard.ShardFailure shardFailure) {
+ final IndexService indexService = indicesService.indexService(shardFailure.routing.shardId().index().name());
+ final ShardRouting shardRouting = shardFailure.routing;
+ threadPool.generic().execute(() -> {
+ synchronized (mutex) {
+ failAndRemoveShard(shardRouting, shardFailure.indexUUID, indexService, true, "shard failure, reason [" + shardFailure.reason + "]", shardFailure.cause);
}
});
}
diff --git a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
index 2a6150267a50f..4c0aab8dcea60 100644
--- a/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
+++ b/core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
@@ -268,9 +268,9 @@ public EngineConfig config(Settings indexSettings, Store store, Path translogPat
EngineConfig config = new EngineConfig(shardId, threadPool, new ShardIndexingService(shardId, indexSettings), indexSettings
, null, store, createSnapshotDeletionPolicy(), mergePolicy, mergeSchedulerConfig,
- iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(shardId.index()), new Engine.FailedEngineListener() {
+ iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(shardId.index()), new Engine.EventListener() {
@Override
- public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) {
+ public void onFailedEngine(String reason, @Nullable Throwable t) {
// we don't need to notify anybody in this test
}
}, new TranslogHandler(shardId.index().getName(), logger), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig);
@@ -1950,7 +1950,7 @@ public void testRecoverFromForeignTranslog() throws IOException {
EngineConfig brokenConfig = new EngineConfig(shardId, threadPool, config.getIndexingService(), config.getIndexSettings()
, null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getMergeSchedulerConfig(),
- config.getAnalyzer(), config.getSimilarity(), new CodecService(shardId.index()), config.getFailedEngineListener()
+ config.getAnalyzer(), config.getSimilarity(), new CodecService(shardId.index()), config.getEventListener()
, config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig);
try {
diff --git a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java
index 7dadafb8a0bb0..7818a0dc1f30b 100644
--- a/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java
+++ b/core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java
@@ -212,9 +212,9 @@ public EngineConfig config(Settings indexSettings, Store store, Path translogPat
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, threadPool);
EngineConfig config = new EngineConfig(shardId, threadPool, new ShardIndexingService(shardId, indexSettings), indexSettings
, null, store, createSnapshotDeletionPolicy(), mergePolicy, mergeSchedulerConfig,
- iwc.getAnalyzer(), iwc.getSimilarity() , new CodecService(shardId.index()), new Engine.FailedEngineListener() {
+ iwc.getAnalyzer(), iwc.getSimilarity() , new CodecService(shardId.index()), new Engine.EventListener() {
@Override
- public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) {
+ public void onFailedEngine(String reason, @Nullable Throwable t) {
// we don't need to notify anybody in this test
}}, null, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig);
try {