102
102
import org .elasticsearch .index .mapper .SourceToParse ;
103
103
import org .elasticsearch .index .mapper .Uid ;
104
104
import org .elasticsearch .index .mapper .VersionFieldMapper ;
105
+ import org .elasticsearch .index .seqno .RetentionLeaseSyncer ;
105
106
import org .elasticsearch .index .seqno .RetentionLeases ;
106
107
import org .elasticsearch .index .seqno .SeqNoStats ;
107
108
import org .elasticsearch .index .seqno .SequenceNumbers ;
@@ -1055,8 +1056,8 @@ public void testGlobalCheckpointSync() throws IOException {
1055
1056
final IndexMetaData .Builder indexMetadata =
1056
1057
IndexMetaData .builder (shardRouting .getIndexName ()).settings (settings ).primaryTerm (0 , 1 );
1057
1058
final AtomicBoolean synced = new AtomicBoolean ();
1058
- final IndexShard primaryShard =
1059
- newShard ( shardRouting , indexMetadata .build (), null , new InternalEngineFactory (), () -> synced .set (true ));
1059
+ final IndexShard primaryShard = newShard (
1060
+ shardRouting , indexMetadata .build (), null , new InternalEngineFactory (), () -> synced .set (true ), RetentionLeaseSyncer . EMPTY );
1060
1061
// add a replica
1061
1062
recoverShardFromStore (primaryShard );
1062
1063
final IndexShard replicaShard = newShard (shardId , false );
@@ -1471,9 +1472,8 @@ public String[] listAll() throws IOException {
1471
1472
};
1472
1473
1473
1474
try (Store store = createStore (shardId , new IndexSettings (metaData , Settings .EMPTY ), directory )) {
1474
- IndexShard shard = newShard (shardRouting , shardPath , metaData , i -> store ,
1475
- null , new InternalEngineFactory (), () -> {
1476
- }, EMPTY_EVENT_LISTENER );
1475
+ IndexShard shard = newShard (shardRouting , shardPath , metaData , i -> store , null , new InternalEngineFactory (),
1476
+ () -> { }, RetentionLeaseSyncer .EMPTY , EMPTY_EVENT_LISTENER );
1477
1477
AtomicBoolean failureCallbackTriggered = new AtomicBoolean (false );
1478
1478
shard .addShardFailureCallback ((ig )->failureCallbackTriggered .set (true ));
1479
1479
@@ -2131,6 +2131,7 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception {
2131
2131
null ,
2132
2132
shard .getEngineFactory (),
2133
2133
shard .getGlobalCheckpointSyncer (),
2134
+ shard .getRetentionLeaseSyncer (),
2134
2135
EMPTY_EVENT_LISTENER );
2135
2136
DiscoveryNode localNode = new DiscoveryNode ("foo" , buildNewFakeTransportAddress (), emptyMap (), emptySet (), Version .CURRENT );
2136
2137
newShard .markAsRecovering ("store" , new RecoveryState (newShard .routingEntry (), localNode , null ));
@@ -2250,6 +2251,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
2250
2251
wrapper ,
2251
2252
new InternalEngineFactory (),
2252
2253
() -> {},
2254
+ RetentionLeaseSyncer .EMPTY ,
2253
2255
EMPTY_EVENT_LISTENER );
2254
2256
2255
2257
recoverShardFromStore (newShard );
@@ -2403,6 +2405,7 @@ public IndexSearcher wrap(IndexSearcher searcher) throws EngineException {
2403
2405
wrapper ,
2404
2406
new InternalEngineFactory (),
2405
2407
() -> {},
2408
+ RetentionLeaseSyncer .EMPTY ,
2406
2409
EMPTY_EVENT_LISTENER );
2407
2410
2408
2411
recoverShardFromStore (newShard );
@@ -2946,9 +2949,8 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO
2946
2949
.put (IndexSettings .INDEX_CHECK_ON_STARTUP .getKey (), randomFrom ("true" , "checksum" )))
2947
2950
.build ();
2948
2951
2949
- IndexShard corruptedShard = newShard (shardRouting , shardPath , indexMetaData ,
2950
- null , null , indexShard .engineFactory ,
2951
- indexShard .getGlobalCheckpointSyncer (), EMPTY_EVENT_LISTENER );
2952
+ IndexShard corruptedShard = newShard (shardRouting , shardPath , indexMetaData , null , null , indexShard .engineFactory ,
2953
+ indexShard .getGlobalCheckpointSyncer (), indexShard .getRetentionLeaseSyncer (), EMPTY_EVENT_LISTENER );
2952
2954
2953
2955
final IndexShardRecoveryException indexShardRecoveryException =
2954
2956
expectThrows (IndexShardRecoveryException .class , () -> newStartedShard (p -> corruptedShard , true ));
@@ -2991,9 +2993,8 @@ public void testShardDoesNotStartIfCorruptedMarkerIsPresent() throws Exception {
2991
2993
}
2992
2994
2993
2995
// try to start shard on corrupted files
2994
- final IndexShard corruptedShard = newShard (shardRouting , shardPath , indexMetaData ,
2995
- null , null , indexShard .engineFactory ,
2996
- indexShard .getGlobalCheckpointSyncer (), EMPTY_EVENT_LISTENER );
2996
+ final IndexShard corruptedShard = newShard (shardRouting , shardPath , indexMetaData , null , null , indexShard .engineFactory ,
2997
+ indexShard .getGlobalCheckpointSyncer (), indexShard .getRetentionLeaseSyncer (), EMPTY_EVENT_LISTENER );
2997
2998
2998
2999
final IndexShardRecoveryException exception1 = expectThrows (IndexShardRecoveryException .class ,
2999
3000
() -> newStartedShard (p -> corruptedShard , true ));
@@ -3014,9 +3015,8 @@ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IO
3014
3015
assertThat ("store has to be marked as corrupted" , corruptedMarkerCount .get (), equalTo (1 ));
3015
3016
3016
3017
// try to start another time shard on corrupted files
3017
- final IndexShard corruptedShard2 = newShard (shardRouting , shardPath , indexMetaData ,
3018
- null , null , indexShard .engineFactory ,
3019
- indexShard .getGlobalCheckpointSyncer (), EMPTY_EVENT_LISTENER );
3018
+ final IndexShard corruptedShard2 = newShard (shardRouting , shardPath , indexMetaData , null , null , indexShard .engineFactory ,
3019
+ indexShard .getGlobalCheckpointSyncer (), indexShard .getRetentionLeaseSyncer (), EMPTY_EVENT_LISTENER );
3020
3020
3021
3021
final IndexShardRecoveryException exception2 = expectThrows (IndexShardRecoveryException .class ,
3022
3022
() -> newStartedShard (p -> corruptedShard2 , true ));
@@ -3054,9 +3054,8 @@ public void testReadSnapshotAndCheckIndexConcurrently() throws Exception {
3054
3054
.put (indexShard .indexSettings .getSettings ())
3055
3055
.put (IndexSettings .INDEX_CHECK_ON_STARTUP .getKey (), randomFrom ("false" , "true" , "checksum" )))
3056
3056
.build ();
3057
- final IndexShard newShard = newShard (shardRouting , indexShard .shardPath (), indexMetaData ,
3058
- null , null , indexShard .engineFactory ,
3059
- indexShard .getGlobalCheckpointSyncer (), EMPTY_EVENT_LISTENER );
3057
+ final IndexShard newShard = newShard (shardRouting , indexShard .shardPath (), indexMetaData , null , null , indexShard .engineFactory ,
3058
+ indexShard .getGlobalCheckpointSyncer (), indexShard .getRetentionLeaseSyncer (), EMPTY_EVENT_LISTENER );
3060
3059
3061
3060
Store .MetadataSnapshot storeFileMetaDatas = newShard .snapshotStoreMetadata ();
3062
3061
assertTrue ("at least 2 files, commit and data: " + storeFileMetaDatas .toString (), storeFileMetaDatas .size () > 1 );
@@ -3436,15 +3435,14 @@ public void testFlushOnInactive() throws Exception {
3436
3435
ShardPath shardPath = new ShardPath (false , nodePath .resolve (shardId ), nodePath .resolve (shardId ), shardId );
3437
3436
AtomicBoolean markedInactive = new AtomicBoolean ();
3438
3437
AtomicReference <IndexShard > primaryRef = new AtomicReference <>();
3439
- IndexShard primary = newShard (shardRouting , shardPath , metaData , null , null ,
3440
- new InternalEngineFactory (), () -> {
3441
- }, new IndexEventListener () {
3442
- @ Override
3443
- public void onShardInactive (IndexShard indexShard ) {
3444
- markedInactive .set (true );
3445
- primaryRef .get ().flush (new FlushRequest ());
3446
- }
3447
- });
3438
+ IndexShard primary = newShard (shardRouting , shardPath , metaData , null , null , new InternalEngineFactory (), () -> { },
3439
+ RetentionLeaseSyncer .EMPTY , new IndexEventListener () {
3440
+ @ Override
3441
+ public void onShardInactive (IndexShard indexShard ) {
3442
+ markedInactive .set (true );
3443
+ primaryRef .get ().flush (new FlushRequest ());
3444
+ }
3445
+ });
3448
3446
primaryRef .set (primary );
3449
3447
recoverShardFromStore (primary );
3450
3448
for (int i = 0 ; i < 3 ; i ++) {
0 commit comments