Skip to content

Commit 3ba4dfa

Browse files
committed
Merge pull request #14206 from s1monw/refactor_shard_failure_listener
Refactor ShardFailure listener infrastructure
2 parents f4e9f69 + cba210c commit 3ba4dfa

File tree

8 files changed

+63
-84
lines changed

8 files changed

+63
-84
lines changed

core/src/main/java/org/elasticsearch/index/engine/Engine.java

+5-20
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public abstract class Engine implements Closeable {
7272
protected final EngineConfig engineConfig;
7373
protected final Store store;
7474
protected final AtomicBoolean isClosed = new AtomicBoolean(false);
75-
protected final FailedEngineListener failedEngineListener;
75+
protected final EventListener eventListener;
7676
protected final SnapshotDeletionPolicy deletionPolicy;
7777
protected final ReentrantLock failEngineLock = new ReentrantLock();
7878
protected final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
@@ -89,7 +89,7 @@ protected Engine(EngineConfig engineConfig) {
8989
this.store = engineConfig.getStore();
9090
this.logger = Loggers.getLogger(Engine.class, // we use the engine class directly here to make sure all subclasses have the same logger name
9191
engineConfig.getIndexSettings(), engineConfig.getShardId());
92-
this.failedEngineListener = engineConfig.getFailedEngineListener();
92+
this.eventListener = engineConfig.getEventListener();
9393
this.deletionPolicy = engineConfig.getDeletionPolicy();
9494
}
9595

@@ -535,7 +535,7 @@ public void failEngine(String reason, @Nullable Throwable failure) {
535535
logger.warn("Couldn't mark store corrupted", e);
536536
}
537537
}
538-
failedEngineListener.onFailedEngine(shardId, reason, failure);
538+
eventListener.onFailedEngine(reason, failure);
539539
}
540540
} catch (Throwable t) {
541541
// don't bubble up these exceptions up
@@ -560,19 +560,9 @@ protected boolean maybeFailEngine(String source, Throwable t) {
560560
return false;
561561
}
562562

563-
/** Wrap a Throwable in an {@code EngineClosedException} if the engine is already closed */
564-
protected Throwable wrapIfClosed(Throwable t) {
565-
if (isClosed.get()) {
566-
if (t != failedEngine && failedEngine != null) {
567-
t.addSuppressed(failedEngine);
568-
}
569-
return new EngineClosedException(shardId, t);
570-
}
571-
return t;
572-
}
573563

574-
public interface FailedEngineListener {
575-
void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t);
564+
public interface EventListener {
565+
default void onFailedEngine(String reason, @Nullable Throwable t) {}
576566
}
577567

578568
public static class Searcher implements Releasable {
@@ -991,11 +981,6 @@ public void close() throws IOException {
991981
}
992982
}
993983

994-
/**
995-
* Returns <code>true</code> the internal writer has any uncommitted changes. Otherwise <code>false</code>
996-
*/
997-
public abstract boolean hasUncommittedChanges();
998-
999984
public static class CommitId implements Writeable {
1000985

1001986
private final byte[] id;

core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public final class EngineConfig {
6969
private final Analyzer analyzer;
7070
private final Similarity similarity;
7171
private final CodecService codecService;
72-
private final Engine.FailedEngineListener failedEngineListener;
72+
private final Engine.EventListener eventListener;
7373
private final boolean forceNewTranslog;
7474
private final QueryCache queryCache;
7575
private final QueryCachingPolicy queryCachingPolicy;
@@ -117,7 +117,7 @@ public final class EngineConfig {
117117
public EngineConfig(ShardId shardId, ThreadPool threadPool, ShardIndexingService indexingService,
118118
Settings indexSettings, IndicesWarmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy,
119119
MergePolicy mergePolicy, MergeSchedulerConfig mergeSchedulerConfig, Analyzer analyzer,
120-
Similarity similarity, CodecService codecService, Engine.FailedEngineListener failedEngineListener,
120+
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
121121
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy, TranslogConfig translogConfig) {
122122
this.shardId = shardId;
123123
this.indexSettings = indexSettings;
@@ -131,7 +131,7 @@ public EngineConfig(ShardId shardId, ThreadPool threadPool, ShardIndexingService
131131
this.analyzer = analyzer;
132132
this.similarity = similarity;
133133
this.codecService = codecService;
134-
this.failedEngineListener = failedEngineListener;
134+
this.eventListener = eventListener;
135135
this.compoundOnFlush = indexSettings.getAsBoolean(EngineConfig.INDEX_COMPOUND_ON_FLUSH, compoundOnFlush);
136136
codecName = indexSettings.get(EngineConfig.INDEX_CODEC_SETTING, EngineConfig.DEFAULT_CODEC_NAME);
137137
// We start up inactive and rely on IndexingMemoryController to give us our fair share once we start indexing:
@@ -310,8 +310,8 @@ public MergeSchedulerConfig getMergeSchedulerConfig() {
310310
/**
311311
* Returns a listener that should be called on engine failure
312312
*/
313-
public Engine.FailedEngineListener getFailedEngineListener() {
314-
return failedEngineListener;
313+
public Engine.EventListener getEventListener() {
314+
return eventListener;
315315
}
316316

317317
/**

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

-5
Original file line numberDiff line numberDiff line change
@@ -846,11 +846,6 @@ protected final void closeNoLock(String reason) {
846846
}
847847
}
848848

849-
@Override
850-
public boolean hasUncommittedChanges() {
851-
return indexWriter.hasUncommittedChanges();
852-
}
853-
854849
@Override
855850
protected SearcherManager getSearcherManager() {
856851
return searcherManager;

core/src/main/java/org/elasticsearch/index/engine/ShadowEngine.java

-7
Original file line numberDiff line numberDiff line change
@@ -202,8 +202,6 @@ public IndexCommit snapshotIndex(boolean flushFirst) throws EngineException {
202202
throw new UnsupportedOperationException("Can not take snapshot from a shadow engine");
203203
}
204204

205-
206-
207205
@Override
208206
protected SearcherManager getSearcherManager() {
209207
return searcherManager;
@@ -223,11 +221,6 @@ protected void closeNoLock(String reason) {
223221
}
224222
}
225223

226-
@Override
227-
public boolean hasUncommittedChanges() {
228-
return false;
229-
}
230-
231224
@Override
232225
protected SegmentInfos getLastCommittedSegmentInfos() {
233226
return lastCommittedSegmentInfos;

core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+30-11
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import org.elasticsearch.common.Nullable;
4242
import org.elasticsearch.common.inject.Inject;
4343
import org.elasticsearch.common.io.stream.BytesStreamOutput;
44-
import org.elasticsearch.common.lease.Releasable;
4544
import org.elasticsearch.common.lease.Releasables;
4645
import org.elasticsearch.common.logging.ESLogger;
4746
import org.elasticsearch.common.logging.support.LoggerMessageFormat;
@@ -51,6 +50,7 @@
5150
import org.elasticsearch.common.unit.ByteSizeUnit;
5251
import org.elasticsearch.common.unit.ByteSizeValue;
5352
import org.elasticsearch.common.unit.TimeValue;
53+
import org.elasticsearch.common.util.Callback;
5454
import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
5555
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
5656
import org.elasticsearch.common.util.concurrent.FutureUtils;
@@ -76,7 +76,6 @@
7676
import org.elasticsearch.index.merge.MergeStats;
7777
import org.elasticsearch.index.percolator.PercolateStats;
7878
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
79-
import org.elasticsearch.index.query.IndexQueryParserService;
8079
import org.elasticsearch.index.recovery.RecoveryStats;
8180
import org.elasticsearch.index.refresh.RefreshStats;
8281
import org.elasticsearch.index.search.stats.SearchStats;
@@ -167,7 +166,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndexSett
167166
private final MeanMetric refreshMetric = new MeanMetric();
168167
private final MeanMetric flushMetric = new MeanMetric();
169168

170-
private final ShardEngineFailListener failedEngineListener = new ShardEngineFailListener();
169+
private final ShardEventListener shardEventListener = new ShardEventListener();
171170
private volatile boolean flushOnClose = true;
172171
private volatile int flushThresholdOperations;
173172
private volatile ByteSizeValue flushThresholdSize;
@@ -980,8 +979,8 @@ private void startScheduledTasksIfNeeded() {
980979

981980
public static final String INDEX_REFRESH_INTERVAL = "index.refresh_interval";
982981

983-
public void addFailedEngineListener(Engine.FailedEngineListener failedEngineListener) {
984-
this.failedEngineListener.delegates.add(failedEngineListener);
982+
public void addShardFailureCallback(Callback<ShardFailure> onShardFailure) {
983+
this.shardEventListener.delegates.add(onShardFailure);
985984
}
986985

987986
/** Change the indexing and translog buffer sizes. If {@code IndexWriter} is currently using more than
@@ -1370,15 +1369,16 @@ protected Engine getEngineOrNull() {
13701369
return this.currentEngineReference.get();
13711370
}
13721371

1373-
class ShardEngineFailListener implements Engine.FailedEngineListener {
1374-
private final CopyOnWriteArrayList<Engine.FailedEngineListener> delegates = new CopyOnWriteArrayList<>();
1372+
class ShardEventListener implements Engine.EventListener {
1373+
private final CopyOnWriteArrayList<Callback<ShardFailure>> delegates = new CopyOnWriteArrayList<>();
13751374

13761375
// called by the current engine
13771376
@Override
1378-
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable failure) {
1379-
for (Engine.FailedEngineListener listener : delegates) {
1377+
public void onFailedEngine(String reason, @Nullable Throwable failure) {
1378+
final ShardFailure shardFailure = new ShardFailure(shardRouting, reason, failure, getIndexUUID());
1379+
for (Callback<ShardFailure> listener : delegates) {
13801380
try {
1381-
listener.onFailedEngine(shardId, reason, failure);
1381+
listener.handle(shardFailure);
13821382
} catch (Exception e) {
13831383
logger.warn("exception while notifying engine failure", e);
13841384
}
@@ -1458,7 +1458,7 @@ protected void operationProcessed() {
14581458
};
14591459
return new EngineConfig(shardId,
14601460
threadPool, indexingService, indexSettings, warmer, store, deletionPolicy, mergePolicyConfig.getMergePolicy(), mergeSchedulerConfig,
1461-
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, failedEngineListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig);
1461+
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig);
14621462
}
14631463

14641464
private static class IndexShardOperationCounter extends AbstractRefCounted {
@@ -1572,4 +1572,23 @@ public void onAfter() {
15721572
return false;
15731573
}
15741574

1575+
/**
1576+
* Simple struct encapsulating a shard failure
1577+
* @see IndexShard#addShardFailureCallback(Callback)
1578+
*/
1579+
public static final class ShardFailure {
1580+
public final ShardRouting routing;
1581+
public final String reason;
1582+
@Nullable
1583+
public final Throwable cause;
1584+
public final String indexUUID;
1585+
1586+
public ShardFailure(ShardRouting routing, String reason, @Nullable Throwable cause, String indexUUID) {
1587+
this.routing = routing;
1588+
this.reason = reason;
1589+
this.cause = cause;
1590+
this.indexUUID = indexUUID;
1591+
}
1592+
}
1593+
15751594
}

core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

+18-31
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,10 @@
4141
import org.elasticsearch.common.lucene.Lucene;
4242
import org.elasticsearch.common.settings.Settings;
4343
import org.elasticsearch.common.unit.TimeValue;
44+
import org.elasticsearch.common.util.Callback;
4445
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
4546
import org.elasticsearch.index.IndexService;
4647
import org.elasticsearch.index.IndexShardAlreadyExistsException;
47-
import org.elasticsearch.index.engine.Engine;
4848
import org.elasticsearch.index.mapper.DocumentMapper;
4949
import org.elasticsearch.index.mapper.MapperService;
5050
import org.elasticsearch.index.settings.IndexSettingsService;
@@ -98,7 +98,7 @@ static class FailedShard {
9898
}
9999

100100
private final Object mutex = new Object();
101-
private final FailedEngineHandler failedEngineHandler = new FailedEngineHandler();
101+
private final FailedShardHandler failedShardHandler = new FailedShardHandler();
102102

103103
private final boolean sendRefreshMapping;
104104

@@ -381,7 +381,7 @@ private void applyMappings(ClusterChangedEvent event) {
381381
// so this failure typically means wrong node level configuration or something similar
382382
for (IndexShard indexShard : indexService) {
383383
ShardRouting shardRouting = indexShard.routingEntry();
384-
failAndRemoveShard(shardRouting, indexService, true, "failed to update mappings", t);
384+
failAndRemoveShard(shardRouting, indexService.indexUUID(), indexService, true, "failed to update mappings", t);
385385
}
386386
}
387387
}
@@ -637,11 +637,11 @@ private void applyInitializingShard(final ClusterState state, final IndexMetaDat
637637
}
638638
IndexShard indexShard = indexService.createShard(shardId, shardRouting);
639639
indexShard.updateRoutingEntry(shardRouting, state.blocks().disableStatePersistence() == false);
640-
indexShard.addFailedEngineListener(failedEngineHandler);
640+
indexShard.addShardFailureCallback(failedShardHandler);
641641
} catch (IndexShardAlreadyExistsException e) {
642642
// ignore this, the method call can happen several times
643643
} catch (Throwable e) {
644-
failAndRemoveShard(shardRouting, indexService, true, "failed to create shard", e);
644+
failAndRemoveShard(shardRouting, indexService.indexUUID(), indexService, true, "failed to create shard", e);
645645
return;
646646
}
647647
}
@@ -768,7 +768,7 @@ public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, bo
768768

769769
private void handleRecoveryFailure(IndexService indexService, ShardRouting shardRouting, boolean sendShardFailure, Throwable failure) {
770770
synchronized (mutex) {
771-
failAndRemoveShard(shardRouting, indexService, sendShardFailure, "failed recovery", failure);
771+
failAndRemoveShard(shardRouting, indexService.indexUUID(), indexService, sendShardFailure, "failed recovery", failure);
772772
}
773773
}
774774

@@ -802,8 +802,10 @@ private void deleteIndex(String index, String reason) {
802802

803803
}
804804

805-
private void failAndRemoveShard(ShardRouting shardRouting, IndexService indexService, boolean sendShardFailure, String message, @Nullable Throwable failure) {
806-
if (indexService.hasShard(shardRouting.getId())) {
805+
private void failAndRemoveShard(ShardRouting shardRouting, String indexUUID, @Nullable IndexService indexService, boolean sendShardFailure, String message, @Nullable Throwable failure) {
806+
if (indexService != null && indexService.hasShard(shardRouting.getId())) {
807+
// if the indexService is null we can't remove the shard, that's fine since we might have a failure
808+
// when the index is remove and then we already removed the index service for that shard...
807809
try {
808810
indexService.removeShard(shardRouting.getId(), message);
809811
} catch (ShardNotFoundException e) {
@@ -813,7 +815,7 @@ private void failAndRemoveShard(ShardRouting shardRouting, IndexService indexSer
813815
}
814816
}
815817
if (sendShardFailure) {
816-
sendFailShard(shardRouting, indexService.indexUUID(), message, failure);
818+
sendFailShard(shardRouting, indexUUID, message, failure);
817819
}
818820
}
819821

@@ -827,29 +829,14 @@ private void sendFailShard(ShardRouting shardRouting, String indexUUID, String m
827829
}
828830
}
829831

830-
private class FailedEngineHandler implements Engine.FailedEngineListener {
832+
private class FailedShardHandler implements Callback<IndexShard.ShardFailure> {
831833
@Override
832-
public void onFailedEngine(final ShardId shardId, final String reason, final @Nullable Throwable failure) {
833-
ShardRouting shardRouting = null;
834-
final IndexService indexService = indicesService.indexService(shardId.index().name());
835-
if (indexService != null) {
836-
IndexShard indexShard = indexService.getShardOrNull(shardId.id());
837-
if (indexShard != null) {
838-
shardRouting = indexShard.routingEntry();
839-
}
840-
}
841-
if (shardRouting == null) {
842-
logger.warn("[{}][{}] engine failed, but can't find index shard. failure reason: [{}]", failure,
843-
shardId.index().name(), shardId.id(), reason);
844-
return;
845-
}
846-
final ShardRouting fShardRouting = shardRouting;
847-
threadPool.generic().execute(new Runnable() {
848-
@Override
849-
public void run() {
850-
synchronized (mutex) {
851-
failAndRemoveShard(fShardRouting, indexService, true, "engine failure, reason [" + reason + "]", failure);
852-
}
834+
public void handle(final IndexShard.ShardFailure shardFailure) {
835+
final IndexService indexService = indicesService.indexService(shardFailure.routing.shardId().index().name());
836+
final ShardRouting shardRouting = shardFailure.routing;
837+
threadPool.generic().execute(() -> {
838+
synchronized (mutex) {
839+
failAndRemoveShard(shardRouting, shardFailure.indexUUID, indexService, true, "shard failure, reason [" + shardFailure.reason + "]", shardFailure.cause);
853840
}
854841
});
855842
}

core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -269,9 +269,9 @@ public EngineConfig config(Settings indexSettings, Store store, Path translogPat
269269

270270
EngineConfig config = new EngineConfig(shardId, threadPool, new ShardIndexingService(shardId, indexSettings), indexSettings
271271
, null, store, createSnapshotDeletionPolicy(), mergePolicy, mergeSchedulerConfig,
272-
iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(shardId.index()), new Engine.FailedEngineListener() {
272+
iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(shardId.index()), new Engine.EventListener() {
273273
@Override
274-
public void onFailedEngine(ShardId shardId, String reason, @Nullable Throwable t) {
274+
public void onFailedEngine(String reason, @Nullable Throwable t) {
275275
// we don't need to notify anybody in this test
276276
}
277277
}, new TranslogHandler(shardId.index().getName(), logger), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig);
@@ -1919,7 +1919,7 @@ public void testRecoverFromForeignTranslog() throws IOException {
19191919

19201920
EngineConfig brokenConfig = new EngineConfig(shardId, threadPool, config.getIndexingService(), config.getIndexSettings()
19211921
, null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getMergeSchedulerConfig(),
1922-
config.getAnalyzer(), config.getSimilarity(), new CodecService(shardId.index()), config.getFailedEngineListener()
1922+
config.getAnalyzer(), config.getSimilarity(), new CodecService(shardId.index()), config.getEventListener()
19231923
, config.getTranslogRecoveryPerformer(), IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig);
19241924

19251925
try {

0 commit comments

Comments
 (0)