101
101
import org .elasticsearch .index .mapper .SourceToParse ;
102
102
import org .elasticsearch .index .mapper .Uid ;
103
103
import org .elasticsearch .index .mapper .VersionFieldMapper ;
104
+ import org .elasticsearch .index .seqno .RetentionLeaseSyncer ;
104
105
import org .elasticsearch .index .seqno .RetentionLeases ;
105
106
import org .elasticsearch .index .seqno .SeqNoStats ;
106
107
import org .elasticsearch .index .seqno .SequenceNumbers ;
@@ -1046,8 +1047,8 @@ public void testGlobalCheckpointSync() throws IOException {
1046
1047
final IndexMetaData .Builder indexMetadata =
1047
1048
IndexMetaData .builder (shardRouting .getIndexName ()).settings (settings ).primaryTerm (0 , 1 );
1048
1049
final AtomicBoolean synced = new AtomicBoolean ();
1049
- final IndexShard primaryShard =
1050
- newShard ( shardRouting , indexMetadata .build (), null , new InternalEngineFactory (), () -> synced .set (true ));
1050
+ final IndexShard primaryShard = newShard (
1051
+ shardRouting , indexMetadata .build (), null , new InternalEngineFactory (), () -> synced .set (true ), RetentionLeaseSyncer . EMPTY );
1051
1052
// add a replica
1052
1053
recoverShardFromStore (primaryShard );
1053
1054
final IndexShard replicaShard = newShard (shardId , false );
@@ -1462,9 +1463,8 @@ public String[] listAll() throws IOException {
1462
1463
};
1463
1464
1464
1465
try (Store store = createStore (shardId , new IndexSettings (metaData , Settings .EMPTY ), directory )) {
1465
- IndexShard shard = newShard (shardRouting , shardPath , metaData , i -> store ,
1466
- null , new InternalEngineFactory (), () -> {
1467
- }, EMPTY_EVENT_LISTENER );
1466
+ IndexShard shard = newShard (shardRouting , shardPath , metaData , i -> store , null , new InternalEngineFactory (),
1467
+ () -> { }, RetentionLeaseSyncer .EMPTY , EMPTY_EVENT_LISTENER );
1468
1468
AtomicBoolean failureCallbackTriggered = new AtomicBoolean (false );
1469
1469
shard .addShardFailureCallback ((ig )->failureCallbackTriggered .set (true ));
1470
1470
@@ -2122,6 +2122,7 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception {
2122
2122
null ,
2123
2123
shard .getEngineFactory (),
2124
2124
shard .getGlobalCheckpointSyncer (),
2125
+ shard .getRetentionLeaseSyncer (),
2125
2126
EMPTY_EVENT_LISTENER );
2126
2127
DiscoveryNode localNode = new DiscoveryNode ("foo" , buildNewFakeTransportAddress (), emptyMap (), emptySet (), Version .CURRENT );
2127
2128
newShard .markAsRecovering ("store" , new RecoveryState (newShard .routingEntry (), localNode , null ));
@@ -2242,6 +2243,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
2242
2243
wrapper ,
2243
2244
new InternalEngineFactory (),
2244
2245
() -> {},
2246
+ RetentionLeaseSyncer .EMPTY ,
2245
2247
EMPTY_EVENT_LISTENER );
2246
2248
2247
2249
recoverShardFromStore (newShard );
@@ -2396,6 +2398,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
2396
2398
wrapper ,
2397
2399
new InternalEngineFactory (),
2398
2400
() -> {},
2401
+ RetentionLeaseSyncer .EMPTY ,
2399
2402
EMPTY_EVENT_LISTENER );
2400
2403
2401
2404
recoverShardFromStore (newShard );
@@ -2962,9 +2965,8 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO
2962
2965
.put (IndexSettings .INDEX_CHECK_ON_STARTUP .getKey (), randomFrom ("true" , "checksum" )))
2963
2966
.build ();
2964
2967
2965
- IndexShard corruptedShard = newShard (shardRouting , shardPath , indexMetaData ,
2966
- null , null , indexShard .engineFactory ,
2967
- indexShard .getGlobalCheckpointSyncer (), EMPTY_EVENT_LISTENER );
2968
+ IndexShard corruptedShard = newShard (shardRouting , shardPath , indexMetaData , null , null , indexShard .engineFactory ,
2969
+ indexShard .getGlobalCheckpointSyncer (), indexShard .getRetentionLeaseSyncer (), EMPTY_EVENT_LISTENER );
2968
2970
2969
2971
final IndexShardRecoveryException indexShardRecoveryException =
2970
2972
expectThrows (IndexShardRecoveryException .class , () -> newStartedShard (p -> corruptedShard , true ));
@@ -3007,9 +3009,8 @@ public void testShardDoesNotStartIfCorruptedMarkerIsPresent() throws Exception {
3007
3009
}
3008
3010
3009
3011
// try to start shard on corrupted files
3010
- final IndexShard corruptedShard = newShard (shardRouting , shardPath , indexMetaData ,
3011
- null , null , indexShard .engineFactory ,
3012
- indexShard .getGlobalCheckpointSyncer (), EMPTY_EVENT_LISTENER );
3012
+ final IndexShard corruptedShard = newShard (shardRouting , shardPath , indexMetaData , null , null , indexShard .engineFactory ,
3013
+ indexShard .getGlobalCheckpointSyncer (), indexShard .getRetentionLeaseSyncer (), EMPTY_EVENT_LISTENER );
3013
3014
3014
3015
final IndexShardRecoveryException exception1 = expectThrows (IndexShardRecoveryException .class ,
3015
3016
() -> newStartedShard (p -> corruptedShard , true ));
@@ -3030,9 +3031,8 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO
3030
3031
assertThat ("store has to be marked as corrupted" , corruptedMarkerCount .get (), equalTo (1 ));
3031
3032
3032
3033
// try to start another time shard on corrupted files
3033
- final IndexShard corruptedShard2 = newShard (shardRouting , shardPath , indexMetaData ,
3034
- null , null , indexShard .engineFactory ,
3035
- indexShard .getGlobalCheckpointSyncer (), EMPTY_EVENT_LISTENER );
3034
+ final IndexShard corruptedShard2 = newShard (shardRouting , shardPath , indexMetaData , null , null , indexShard .engineFactory ,
3035
+ indexShard .getGlobalCheckpointSyncer (), indexShard .getRetentionLeaseSyncer (), EMPTY_EVENT_LISTENER );
3036
3036
3037
3037
final IndexShardRecoveryException exception2 = expectThrows (IndexShardRecoveryException .class ,
3038
3038
() -> newStartedShard (p -> corruptedShard2 , true ));
@@ -3070,9 +3070,8 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception {
3070
3070
.put (indexShard .indexSettings .getSettings ())
3071
3071
.put (IndexSettings .INDEX_CHECK_ON_STARTUP .getKey (), randomFrom ("false" , "true" , "checksum" )))
3072
3072
.build ();
3073
- final IndexShard newShard = newShard (shardRouting , indexShard .shardPath (), indexMetaData ,
3074
- null , null , indexShard .engineFactory ,
3075
- indexShard .getGlobalCheckpointSyncer (), EMPTY_EVENT_LISTENER );
3073
+ final IndexShard newShard = newShard (shardRouting , indexShard .shardPath (), indexMetaData , null , null , indexShard .engineFactory ,
3074
+ indexShard .getGlobalCheckpointSyncer (), indexShard .getRetentionLeaseSyncer (), EMPTY_EVENT_LISTENER );
3076
3075
3077
3076
Store .MetadataSnapshot storeFileMetaDatas = newShard .snapshotStoreMetadata ();
3078
3077
assertTrue ("at least 2 files, commit and data: " + storeFileMetaDatas .toString (), storeFileMetaDatas .size () > 1 );
@@ -3482,15 +3481,14 @@ public void testFlushOnInactive() throws Exception {
3482
3481
ShardPath shardPath = new ShardPath (false , nodePath .resolve (shardId ), nodePath .resolve (shardId ), shardId );
3483
3482
AtomicBoolean markedInactive = new AtomicBoolean ();
3484
3483
AtomicReference <IndexShard > primaryRef = new AtomicReference <>();
3485
- IndexShard primary = newShard (shardRouting , shardPath , metaData , null , null ,
3486
- new InternalEngineFactory (), () -> {
3487
- }, new IndexEventListener () {
3488
- @ Override
3489
- public void onShardInactive (IndexShard indexShard ) {
3490
- markedInactive .set (true );
3491
- primaryRef .get ().flush (new FlushRequest ());
3492
- }
3493
- });
3484
+ IndexShard primary = newShard (shardRouting , shardPath , metaData , null , null , new InternalEngineFactory (), () -> { },
3485
+ RetentionLeaseSyncer .EMPTY , new IndexEventListener () {
3486
+ @ Override
3487
+ public void onShardInactive (IndexShard indexShard ) {
3488
+ markedInactive .set (true );
3489
+ primaryRef .get ().flush (new FlushRequest ());
3490
+ }
3491
+ });
3494
3492
primaryRef .set (primary );
3495
3493
recoverShardFromStore (primary );
3496
3494
for (int i = 0 ; i < 3 ; i ++) {
0 commit comments