22
22
import org .apache .logging .log4j .LogManager ;
23
23
import org .apache .logging .log4j .Logger ;
24
24
import org .apache .logging .log4j .message .ParameterizedMessage ;
25
- import org .apache .lucene .index .DirectoryReader ;
26
- import org .apache .lucene .index .IndexCommit ;
27
25
import org .apache .lucene .store .AlreadyClosedException ;
28
26
import org .apache .lucene .store .RateLimiter ;
29
27
import org .elasticsearch .ElasticsearchException ;
44
42
import org .elasticsearch .common .util .CancellableThreads ;
45
43
import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
46
44
import org .elasticsearch .index .IndexNotFoundException ;
47
- import org .elasticsearch .index .engine .CombinedDeletionPolicy ;
48
45
import org .elasticsearch .index .engine .RecoveryEngineException ;
49
46
import org .elasticsearch .index .mapper .MapperException ;
50
- import org .elasticsearch .index .seqno .SequenceNumbers ;
51
47
import org .elasticsearch .index .shard .IllegalIndexShardStateException ;
52
48
import org .elasticsearch .index .shard .IndexEventListener ;
53
49
import org .elasticsearch .index .shard .IndexShard ;
54
50
import org .elasticsearch .index .shard .ShardId ;
55
51
import org .elasticsearch .index .shard .ShardNotFoundException ;
56
52
import org .elasticsearch .index .store .Store ;
57
- import org .elasticsearch .index .translog .Translog ;
58
- import org .elasticsearch .index .translog .TranslogCorruptedException ;
59
53
import org .elasticsearch .indices .recovery .RecoveriesCollection .RecoveryRef ;
60
54
import org .elasticsearch .tasks .Task ;
61
55
import org .elasticsearch .threadpool .ThreadPool ;
68
62
import org .elasticsearch .transport .TransportService ;
69
63
70
64
import java .io .IOException ;
71
- import java .util .List ;
72
- import java .util .StringJoiner ;
73
65
import java .util .concurrent .atomic .AtomicLong ;
74
66
import java .util .function .Consumer ;
75
67
76
68
import static org .elasticsearch .common .unit .TimeValue .timeValueMillis ;
69
+ import static org .elasticsearch .index .seqno .SequenceNumbers .UNASSIGNED_SEQ_NO ;
77
70
78
71
/**
79
72
* The recovery target handles recoveries of peer shards of the shard+node to recover to.
@@ -178,9 +171,12 @@ private void doRecovery(final long recoveryId) {
178
171
cancellableThreads = recoveryTarget .cancellableThreads ();
179
172
try {
180
173
assert recoveryTarget .sourceNode () != null : "can not do a recovery without a source node" ;
181
- request = getStartRecoveryRequest (recoveryTarget );
182
174
logger .trace ("{} preparing shard for peer recovery" , recoveryTarget .shardId ());
183
175
recoveryTarget .indexShard ().prepareForIndexRecovery ();
176
+ final long startingSeqNo = recoveryTarget .indexShard ().recoverLocallyUpToGlobalCheckpoint ();
177
+ assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget .state ().getStage () == RecoveryState .Stage .TRANSLOG :
178
+ "unexpected recovery stage [" + recoveryTarget .state ().getStage () + "] starting seqno [ " + startingSeqNo + "]" ;
179
+ request = getStartRecoveryRequest (logger , clusterService .localNode (), recoveryTarget , startingSeqNo );
184
180
} catch (final Exception e ) {
185
181
// this will be logged as warning later on...
186
182
logger .trace ("unexpected error while preparing shard for peer recovery, failing recovery" , e );
@@ -319,7 +315,7 @@ public RecoveryResponse read(StreamInput in) throws IOException {
319
315
* @param recoveryTarget the target of the recovery
320
316
* @return a snapshot of the store metadata
321
317
*/
322
- private Store .MetadataSnapshot getStoreMetadataSnapshot (final RecoveryTarget recoveryTarget ) {
318
+ private static Store .MetadataSnapshot getStoreMetadataSnapshot (final Logger logger , final RecoveryTarget recoveryTarget ) {
323
319
try {
324
320
return recoveryTarget .indexShard ().snapshotStoreMetadata ();
325
321
} catch (final org .apache .lucene .index .IndexNotFoundException e ) {
@@ -335,89 +331,32 @@ private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget rec
335
331
/**
336
332
* Prepare the start recovery request.
337
333
*
334
+ * @param logger the logger
335
+ * @param localNode the local node of the recovery target
338
336
* @param recoveryTarget the target of the recovery
337
+ * @param startingSeqNo a sequence number that an operation-based peer recovery can start with.
338
+ * This is the first operation after the local checkpoint of the safe commit if exists.
339
339
* @return a start recovery request
340
340
*/
341
- private StartRecoveryRequest getStartRecoveryRequest (final RecoveryTarget recoveryTarget ) {
341
+ public static StartRecoveryRequest getStartRecoveryRequest (Logger logger , DiscoveryNode localNode ,
342
+ RecoveryTarget recoveryTarget , long startingSeqNo ) {
342
343
final StartRecoveryRequest request ;
343
344
logger .trace ("{} collecting local files for [{}]" , recoveryTarget .shardId (), recoveryTarget .sourceNode ());
344
345
345
- final Store .MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot (recoveryTarget );
346
+ final Store .MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot (logger , recoveryTarget );
346
347
logger .trace ("{} local file count [{}]" , recoveryTarget .shardId (), metadataSnapshot .size ());
347
-
348
- final long startingSeqNo ;
349
- if (metadataSnapshot .size () > 0 ) {
350
- startingSeqNo = getStartingSeqNo (logger , recoveryTarget );
351
- } else {
352
- startingSeqNo = SequenceNumbers .UNASSIGNED_SEQ_NO ;
353
- }
354
-
355
- if (startingSeqNo == SequenceNumbers .UNASSIGNED_SEQ_NO ) {
356
- logger .trace ("{} preparing for file-based recovery from [{}]" , recoveryTarget .shardId (), recoveryTarget .sourceNode ());
357
- } else {
358
- logger .trace (
359
- "{} preparing for sequence-number-based recovery starting at sequence number [{}] from [{}]" ,
360
- recoveryTarget .shardId (),
361
- startingSeqNo ,
362
- recoveryTarget .sourceNode ());
363
- }
364
-
365
348
request = new StartRecoveryRequest (
366
349
recoveryTarget .shardId (),
367
350
recoveryTarget .indexShard ().routingEntry ().allocationId ().getId (),
368
351
recoveryTarget .sourceNode (),
369
- clusterService . localNode () ,
352
+ localNode ,
370
353
metadataSnapshot ,
371
354
recoveryTarget .state ().getPrimary (),
372
355
recoveryTarget .recoveryId (),
373
356
startingSeqNo );
374
357
return request ;
375
358
}
376
359
377
- /**
378
- * Get the starting sequence number for a sequence-number-based request.
379
- *
380
- * @param recoveryTarget the target of the recovery
381
- * @return the starting sequence number or {@link SequenceNumbers#UNASSIGNED_SEQ_NO} if obtaining the starting sequence number
382
- * failed
383
- */
384
- public static long getStartingSeqNo (final Logger logger , final RecoveryTarget recoveryTarget ) {
385
- try {
386
- final Store store = recoveryTarget .store ();
387
- final String translogUUID = store .readLastCommittedSegmentsInfo ().getUserData ().get (Translog .TRANSLOG_UUID_KEY );
388
- final long globalCheckpoint = Translog .readGlobalCheckpoint (recoveryTarget .translogLocation (), translogUUID );
389
- final List <IndexCommit > existingCommits = DirectoryReader .listCommits (store .directory ());
390
- final IndexCommit safeCommit = CombinedDeletionPolicy .findSafeCommitPoint (existingCommits , globalCheckpoint );
391
- final SequenceNumbers .CommitInfo seqNoStats = Store .loadSeqNoInfo (safeCommit );
392
- if (logger .isTraceEnabled ()) {
393
- final StringJoiner descriptionOfExistingCommits = new StringJoiner ("," );
394
- for (IndexCommit commit : existingCommits ) {
395
- descriptionOfExistingCommits .add (CombinedDeletionPolicy .commitDescription (commit ));
396
- }
397
- logger .trace ("Calculate starting seqno based on global checkpoint [{}], safe commit [{}], existing commits [{}]" ,
398
- globalCheckpoint , CombinedDeletionPolicy .commitDescription (safeCommit ), descriptionOfExistingCommits );
399
- }
400
- if (seqNoStats .maxSeqNo <= globalCheckpoint ) {
401
- assert seqNoStats .localCheckpoint <= globalCheckpoint ;
402
- /*
403
- * Commit point is good for sequence-number based recovery as the maximum sequence number included in it is below the global
404
- * checkpoint (i.e., it excludes any operations that may not be on the primary). Recovery will start at the first operation
405
- * after the local checkpoint stored in the commit.
406
- */
407
- return seqNoStats .localCheckpoint + 1 ;
408
- } else {
409
- return SequenceNumbers .UNASSIGNED_SEQ_NO ;
410
- }
411
- } catch (final TranslogCorruptedException | IOException e ) {
412
- /*
413
- * This can happen, for example, if a phase one of the recovery completed successfully, a network partition happens before the
414
- * translog on the recovery target is opened, the recovery enters a retry loop seeing now that the index files are on disk and
415
- * proceeds to attempt a sequence-number-based recovery.
416
- */
417
- return SequenceNumbers .UNASSIGNED_SEQ_NO ;
418
- }
419
- }
420
-
421
360
public interface RecoveryListener {
422
361
void onRecoveryDone (RecoveryState state );
423
362
0 commit comments