41
41
import org .elasticsearch .common .lucene .Lucene ;
42
42
import org .elasticsearch .common .settings .Settings ;
43
43
import org .elasticsearch .common .unit .TimeValue ;
44
+ import org .elasticsearch .common .util .Callback ;
44
45
import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
45
46
import org .elasticsearch .index .IndexService ;
46
47
import org .elasticsearch .index .IndexShardAlreadyExistsException ;
47
- import org .elasticsearch .index .engine .Engine ;
48
48
import org .elasticsearch .index .mapper .DocumentMapper ;
49
49
import org .elasticsearch .index .mapper .MapperService ;
50
50
import org .elasticsearch .index .settings .IndexSettingsService ;
@@ -98,7 +98,7 @@ static class FailedShard {
98
98
}
99
99
100
100
private final Object mutex = new Object ();
101
- private final FailedEngineHandler failedEngineHandler = new FailedEngineHandler ();
101
+ private final FailedShardHandler failedShardHandler = new FailedShardHandler ();
102
102
103
103
private final boolean sendRefreshMapping ;
104
104
@@ -381,7 +381,7 @@ private void applyMappings(ClusterChangedEvent event) {
381
381
// so this failure typically means wrong node level configuration or something similar
382
382
for (IndexShard indexShard : indexService ) {
383
383
ShardRouting shardRouting = indexShard .routingEntry ();
384
- failAndRemoveShard (shardRouting , indexService , true , "failed to update mappings" , t );
384
+ failAndRemoveShard (shardRouting , indexService . indexUUID (), indexService , true , "failed to update mappings" , t );
385
385
}
386
386
}
387
387
}
@@ -637,11 +637,11 @@ private void applyInitializingShard(final ClusterState state, final IndexMetaDat
637
637
}
638
638
IndexShard indexShard = indexService .createShard (shardId , shardRouting );
639
639
indexShard .updateRoutingEntry (shardRouting , state .blocks ().disableStatePersistence () == false );
640
- indexShard .addFailedEngineListener ( failedEngineHandler );
640
+ indexShard .addShardFailureCallback ( failedShardHandler );
641
641
} catch (IndexShardAlreadyExistsException e ) {
642
642
// ignore this, the method call can happen several times
643
643
} catch (Throwable e ) {
644
- failAndRemoveShard (shardRouting , indexService , true , "failed to create shard" , e );
644
+ failAndRemoveShard (shardRouting , indexService . indexUUID (), indexService , true , "failed to create shard" , e );
645
645
return ;
646
646
}
647
647
}
@@ -768,7 +768,7 @@ public void onRecoveryFailure(RecoveryState state, RecoveryFailedException e, bo
768
768
769
769
private void handleRecoveryFailure (IndexService indexService , ShardRouting shardRouting , boolean sendShardFailure , Throwable failure ) {
770
770
synchronized (mutex ) {
771
- failAndRemoveShard (shardRouting , indexService , sendShardFailure , "failed recovery" , failure );
771
+ failAndRemoveShard (shardRouting , indexService . indexUUID (), indexService , sendShardFailure , "failed recovery" , failure );
772
772
}
773
773
}
774
774
@@ -802,8 +802,10 @@ private void deleteIndex(String index, String reason) {
802
802
803
803
}
804
804
805
- private void failAndRemoveShard (ShardRouting shardRouting , IndexService indexService , boolean sendShardFailure , String message , @ Nullable Throwable failure ) {
806
- if (indexService .hasShard (shardRouting .getId ())) {
805
+ private void failAndRemoveShard (ShardRouting shardRouting , String indexUUID , @ Nullable IndexService indexService , boolean sendShardFailure , String message , @ Nullable Throwable failure ) {
806
+ if (indexService != null && indexService .hasShard (shardRouting .getId ())) {
807
+ // if the indexService is null we can't remove the shard, that's fine since we might have a failure
808
+ // when the index is remove and then we already removed the index service for that shard...
807
809
try {
808
810
indexService .removeShard (shardRouting .getId (), message );
809
811
} catch (ShardNotFoundException e ) {
@@ -813,7 +815,7 @@ private void failAndRemoveShard(ShardRouting shardRouting, IndexService indexSer
813
815
}
814
816
}
815
817
if (sendShardFailure ) {
816
- sendFailShard (shardRouting , indexService . indexUUID () , message , failure );
818
+ sendFailShard (shardRouting , indexUUID , message , failure );
817
819
}
818
820
}
819
821
@@ -827,29 +829,14 @@ private void sendFailShard(ShardRouting shardRouting, String indexUUID, String m
827
829
}
828
830
}
829
831
830
- private class FailedEngineHandler implements Engine . FailedEngineListener {
832
+ private class FailedShardHandler implements Callback < IndexShard . ShardFailure > {
831
833
@ Override
832
- public void onFailedEngine (final ShardId shardId , final String reason , final @ Nullable Throwable failure ) {
833
- ShardRouting shardRouting = null ;
834
- final IndexService indexService = indicesService .indexService (shardId .index ().name ());
835
- if (indexService != null ) {
836
- IndexShard indexShard = indexService .getShardOrNull (shardId .id ());
837
- if (indexShard != null ) {
838
- shardRouting = indexShard .routingEntry ();
839
- }
840
- }
841
- if (shardRouting == null ) {
842
- logger .warn ("[{}][{}] engine failed, but can't find index shard. failure reason: [{}]" , failure ,
843
- shardId .index ().name (), shardId .id (), reason );
844
- return ;
845
- }
846
- final ShardRouting fShardRouting = shardRouting ;
847
- threadPool .generic ().execute (new Runnable () {
848
- @ Override
849
- public void run () {
850
- synchronized (mutex ) {
851
- failAndRemoveShard (fShardRouting , indexService , true , "engine failure, reason [" + reason + "]" , failure );
852
- }
834
+ public void handle (final IndexShard .ShardFailure shardFailure ) {
835
+ final IndexService indexService = indicesService .indexService (shardFailure .routing .shardId ().index ().name ());
836
+ final ShardRouting shardRouting = shardFailure .routing ;
837
+ threadPool .generic ().execute (() -> {
838
+ synchronized (mutex ) {
839
+ failAndRemoveShard (shardRouting , shardFailure .indexUUID , indexService , true , "shard failure, reason [" + shardFailure .reason + "]" , shardFailure .cause );
853
840
}
854
841
});
855
842
}
0 commit comments