Skip to content

Commit e3a38c8

Browse files
committed
Recover retention leases during peer recovery
This commit integrates retention leases with recovery. With this change, we copy the current retention leases on primary to the replica during phase two of recovery. At this point, the replica will have been added to the replication group and so is already receiving retention lease sync requests from the primary. This means that if any retention lease syncs are triggered on the primary after we sample the retention leases here during phase two, that sync request will also arrive on the replica ensuring that the replica is from this point on up to date with the retention leases on the primary. We have to copy these during phase two since we will be applying indexing operations, potentially triggering merges, and therefore must ensure the correct retention leases are in place beforehand.
1 parent d0fb8dc commit e3a38c8

11 files changed

+305
-81
lines changed

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@
134134
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
135135
import org.elasticsearch.indices.recovery.RecoveryFailedException;
136136
import org.elasticsearch.indices.recovery.RecoveryState;
137+
import org.elasticsearch.indices.recovery.RecoveryTarget;
137138
import org.elasticsearch.repositories.RepositoriesService;
138139
import org.elasticsearch.repositories.Repository;
139140
import org.elasticsearch.rest.RestStatus;
@@ -3034,7 +3035,7 @@ public long getMaxSeqNoOfUpdatesOrDeletes() {
30343035
* which is at least the value of the max_seq_no_of_updates marker on the primary after that operation was executed on the primary.
30353036
*
30363037
* @see #acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)
3037-
* @see org.elasticsearch.indices.recovery.RecoveryTarget#indexTranslogOperations(List, int, long, long, ActionListener)
3038+
* @see RecoveryTarget#indexTranslogOperations(List, int, long, long, RetentionLeases, ActionListener)
30383039
*/
30393040
public void advanceMaxSeqNoOfUpdatesOrDeletes(long seqNo) {
30403041
assert seqNo != UNASSIGNED_SEQ_NO

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -515,17 +515,21 @@ public void onTimeout(TimeValue timeout) {
515515
}
516516
});
517517
};
518-
recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps(),
519-
request.maxSeenAutoIdTimestampOnPrimary(), request.maxSeqNoOfUpdatesOrDeletesOnPrimary(),
520-
ActionListener.wrap(
521-
checkpoint -> listener.onResponse(new RecoveryTranslogOperationsResponse(checkpoint)),
522-
e -> {
523-
if (e instanceof MapperException) {
524-
retryOnMappingException.accept(e);
525-
} else {
526-
listener.onFailure(e);
527-
}
528-
})
518+
recoveryTarget.indexTranslogOperations(
519+
request.operations(),
520+
request.totalTranslogOps(),
521+
request.maxSeenAutoIdTimestampOnPrimary(),
522+
request.maxSeqNoOfUpdatesOrDeletesOnPrimary(),
523+
request.retentionLeases(),
524+
ActionListener.wrap(
525+
checkpoint -> listener.onResponse(new RecoveryTranslogOperationsResponse(checkpoint)),
526+
e -> {
527+
if (e instanceof MapperException) {
528+
retryOnMappingException.accept(e);
529+
} else {
530+
listener.onFailure(e);
531+
}
532+
})
529533
);
530534
}
531535
}

server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

Lines changed: 58 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.elasticsearch.index.engine.Engine;
5151
import org.elasticsearch.index.engine.RecoveryEngineException;
5252
import org.elasticsearch.index.seqno.LocalCheckpointTracker;
53+
import org.elasticsearch.index.seqno.RetentionLeases;
5354
import org.elasticsearch.index.seqno.SequenceNumbers;
5455
import org.elasticsearch.index.shard.IndexShard;
5556
import org.elasticsearch.index.shard.IndexShardClosedException;
@@ -231,8 +232,16 @@ public void recoverToTarget(ActionListener<RecoveryResponse> listener) {
231232
// are at least as high as the corresponding values on the primary when any of these operations were executed on it.
232233
final long maxSeenAutoIdTimestamp = shard.getMaxSeenAutoIdTimestamp();
233234
final long maxSeqNoOfUpdatesOrDeletes = shard.getMaxSeqNoOfUpdatesOrDeletes();
234-
phase2(startingSeqNo, requiredSeqNoRangeStart, endingSeqNo, phase2Snapshot, maxSeenAutoIdTimestamp,
235-
maxSeqNoOfUpdatesOrDeletes, sendSnapshotStep);
235+
final RetentionLeases retentionLeases = shard.getRetentionLeases();
236+
phase2(
237+
startingSeqNo,
238+
requiredSeqNoRangeStart,
239+
endingSeqNo,
240+
phase2Snapshot,
241+
maxSeenAutoIdTimestamp,
242+
maxSeqNoOfUpdatesOrDeletes,
243+
retentionLeases,
244+
sendSnapshotStep);
236245
sendSnapshotStep.whenComplete(
237246
r -> IOUtils.close(phase2Snapshot),
238247
e -> {
@@ -517,8 +526,15 @@ void prepareTargetForTranslog(boolean fileBasedRecovery, int totalTranslogOps, A
517526
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates or deletes on the primary after these operations were executed on it.
518527
* @param listener a listener which will be notified with the local checkpoint on the target.
519528
*/
520-
void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo, Translog.Snapshot snapshot, long maxSeenAutoIdTimestamp,
521-
long maxSeqNoOfUpdatesOrDeletes, ActionListener<SendSnapshotResult> listener) throws IOException {
529+
void phase2(
530+
final long startingSeqNo,
531+
final long requiredSeqNoRangeStart,
532+
final long endingSeqNo,
533+
final Translog.Snapshot snapshot,
534+
final long maxSeenAutoIdTimestamp,
535+
final long maxSeqNoOfUpdatesOrDeletes,
536+
final RetentionLeases retentionLeases,
537+
final ActionListener<SendSnapshotResult> listener) throws IOException {
522538
assert requiredSeqNoRangeStart <= endingSeqNo + 1:
523539
"requiredSeqNoRangeStart " + requiredSeqNoRangeStart + " is larger than endingSeqNo " + endingSeqNo;
524540
assert startingSeqNo <= requiredSeqNoRangeStart :
@@ -584,25 +600,50 @@ void phase2(long startingSeqNo, long requiredSeqNoRangeStart, long endingSeqNo,
584600
listener::onFailure
585601
);
586602

587-
sendBatch(readNextBatch, true, SequenceNumbers.UNASSIGNED_SEQ_NO, snapshot.totalOperations(),
588-
maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, batchedListener);
603+
sendBatch(
604+
readNextBatch,
605+
true,
606+
SequenceNumbers.UNASSIGNED_SEQ_NO,
607+
snapshot.totalOperations(),
608+
maxSeenAutoIdTimestamp,
609+
maxSeqNoOfUpdatesOrDeletes,
610+
retentionLeases,
611+
batchedListener);
589612
}
590613

591-
private void sendBatch(CheckedSupplier<List<Translog.Operation>, IOException> nextBatch, boolean firstBatch,
592-
long targetLocalCheckpoint, int totalTranslogOps, long maxSeenAutoIdTimestamp,
593-
long maxSeqNoOfUpdatesOrDeletes, ActionListener<Long> listener) throws IOException {
614+
private void sendBatch(
615+
final CheckedSupplier<List<Translog.Operation>, IOException> nextBatch,
616+
final boolean firstBatch,
617+
final long targetLocalCheckpoint,
618+
final int totalTranslogOps,
619+
final long maxSeenAutoIdTimestamp,
620+
final long maxSeqNoOfUpdatesOrDeletes,
621+
final RetentionLeases retentionLeases,
622+
final ActionListener<Long> listener) throws IOException {
594623
final List<Translog.Operation> operations = nextBatch.get();
595624
// send the leftover operations or if no operations were sent, request the target to respond with its local checkpoint
596625
if (operations.isEmpty() == false || firstBatch) {
597626
cancellableThreads.execute(() -> {
598-
recoveryTarget.indexTranslogOperations(operations, totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes,
599-
ActionListener.wrap(
600-
newCheckpoint -> {
601-
sendBatch(nextBatch, false, SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint),
602-
totalTranslogOps, maxSeenAutoIdTimestamp, maxSeqNoOfUpdatesOrDeletes, listener);
603-
},
604-
listener::onFailure
605-
));
627+
recoveryTarget.indexTranslogOperations(
628+
operations,
629+
totalTranslogOps,
630+
maxSeenAutoIdTimestamp,
631+
maxSeqNoOfUpdatesOrDeletes,
632+
retentionLeases,
633+
ActionListener.wrap(
634+
newCheckpoint -> {
635+
sendBatch(
636+
nextBatch,
637+
false,
638+
SequenceNumbers.max(targetLocalCheckpoint, newCheckpoint),
639+
totalTranslogOps,
640+
maxSeenAutoIdTimestamp,
641+
maxSeqNoOfUpdatesOrDeletes,
642+
retentionLeases,
643+
listener);
644+
},
645+
listener::onFailure
646+
));
606647
});
607648
} else {
608649
listener.onResponse(targetLocalCheckpoint);

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.elasticsearch.index.engine.Engine;
4444
import org.elasticsearch.index.mapper.MapperException;
4545
import org.elasticsearch.index.seqno.ReplicationTracker;
46+
import org.elasticsearch.index.seqno.RetentionLeases;
4647
import org.elasticsearch.index.seqno.SequenceNumbers;
4748
import org.elasticsearch.index.shard.IndexShard;
4849
import org.elasticsearch.index.shard.IndexShardNotRecoveringException;
@@ -400,8 +401,13 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar
400401
}
401402

402403
@Override
403-
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary,
404-
long maxSeqNoOfDeletesOrUpdatesOnPrimary, ActionListener<Long> listener) {
404+
public void indexTranslogOperations(
405+
final List<Translog.Operation> operations,
406+
final int totalTranslogOps,
407+
final long maxSeenAutoIdTimestampOnPrimary,
408+
final long maxSeqNoOfDeletesOrUpdatesOnPrimary,
409+
final RetentionLeases retentionLeases,
410+
final ActionListener<Long> listener) {
405411
ActionListener.completeWith(listener, () -> {
406412
final RecoveryState.Translog translog = state().getTranslog();
407413
translog.totalOperations(totalTranslogOps);
@@ -421,6 +427,11 @@ public void indexTranslogOperations(List<Translog.Operation> operations, int tot
421427
* replaying any of these operations will be at least the max_seq_no_of_updates on the primary when that op was executed on.
422428
*/
423429
indexShard().advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfDeletesOrUpdatesOnPrimary);
430+
/*
431+
* We have to update the retention leases before we start applying translog operations to ensure we are retaining according to
432+
* the policy.
433+
*/
434+
indexShard().updateRetentionLeasesOnReplica(retentionLeases);
424435
for (Translog.Operation operation : operations) {
425436
Engine.Result result = indexShard().applyTranslogOperation(operation, Engine.Operation.Origin.PEER_RECOVERY);
426437
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.elasticsearch.action.ActionListener;
2222
import org.elasticsearch.common.bytes.BytesReference;
2323
import org.elasticsearch.index.seqno.ReplicationTracker;
24+
import org.elasticsearch.index.seqno.RetentionLeases;
2425
import org.elasticsearch.index.store.Store;
2526
import org.elasticsearch.index.store.StoreFileMetaData;
2627
import org.elasticsearch.index.translog.Translog;
@@ -39,8 +40,8 @@ public interface RecoveryTargetHandler {
3940
void prepareForTranslogOperations(boolean fileBasedRecovery, int totalTranslogOps, ActionListener<Void> listener);
4041

4142
/**
42-
* The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, and
43-
* updates the global checkpoint.
43+
* The finalize request refreshes the engine now that new segments are available, enables garbage collection of tombstone files, updates
44+
* the global checkpoint.
4445
*
4546
* @param globalCheckpoint the global checkpoint on the recovery source
4647
* @param listener the listener which will be notified when this method is completed
@@ -68,11 +69,17 @@ public interface RecoveryTargetHandler {
6869
* @param maxSeqNoOfUpdatesOrDeletesOnPrimary the max seq_no of update operations (index operations overwrite Lucene) or delete ops on
6970
* the primary shard when capturing these operations. This value is at least as high as the
7071
* max_seq_no_of_updates on the primary was when any of these ops were processed on it.
72+
* @param retentionLeases the retention leases on the primary
7173
* @param listener a listener which will be notified with the local checkpoint on the target
7274
* after these operations are successfully indexed on the target.
7375
*/
74-
void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary,
75-
long maxSeqNoOfUpdatesOrDeletesOnPrimary, ActionListener<Long> listener);
76+
void indexTranslogOperations(
77+
List<Translog.Operation> operations,
78+
int totalTranslogOps,
79+
long maxSeenAutoIdTimestampOnPrimary,
80+
long maxSeqNoOfUpdatesOrDeletesOnPrimary,
81+
RetentionLeases retentionLeases,
82+
ActionListener<Long> listener);
7683

7784
/**
7885
* Notifies the target of the files it is going to receive

server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTranslogOperationsRequest.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.action.index.IndexRequest;
2424
import org.elasticsearch.common.io.stream.StreamInput;
2525
import org.elasticsearch.common.io.stream.StreamOutput;
26+
import org.elasticsearch.index.seqno.RetentionLeases;
2627
import org.elasticsearch.index.seqno.SequenceNumbers;
2728
import org.elasticsearch.index.shard.ShardId;
2829
import org.elasticsearch.index.translog.Translog;
@@ -39,18 +40,26 @@ public class RecoveryTranslogOperationsRequest extends TransportRequest {
3940
private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;
4041
private long maxSeenAutoIdTimestampOnPrimary;
4142
private long maxSeqNoOfUpdatesOrDeletesOnPrimary;
43+
private RetentionLeases retentionLeases;
4244

4345
public RecoveryTranslogOperationsRequest() {
4446
}
4547

46-
RecoveryTranslogOperationsRequest(long recoveryId, ShardId shardId, List<Translog.Operation> operations, int totalTranslogOps,
47-
long maxSeenAutoIdTimestampOnPrimary, long maxSeqNoOfUpdatesOrDeletesOnPrimary) {
48+
RecoveryTranslogOperationsRequest(
49+
final long recoveryId,
50+
final ShardId shardId,
51+
final List<Translog.Operation> operations,
52+
final int totalTranslogOps,
53+
final long maxSeenAutoIdTimestampOnPrimary,
54+
final long maxSeqNoOfUpdatesOrDeletesOnPrimary,
55+
final RetentionLeases retentionLeases) {
4856
this.recoveryId = recoveryId;
4957
this.shardId = shardId;
5058
this.operations = operations;
5159
this.totalTranslogOps = totalTranslogOps;
5260
this.maxSeenAutoIdTimestampOnPrimary = maxSeenAutoIdTimestampOnPrimary;
5361
this.maxSeqNoOfUpdatesOrDeletesOnPrimary = maxSeqNoOfUpdatesOrDeletesOnPrimary;
62+
this.retentionLeases = retentionLeases;
5463
}
5564

5665
public long recoveryId() {
@@ -77,6 +86,10 @@ public long maxSeqNoOfUpdatesOrDeletesOnPrimary() {
7786
return maxSeqNoOfUpdatesOrDeletesOnPrimary;
7887
}
7988

89+
public RetentionLeases retentionLeases() {
90+
return retentionLeases;
91+
}
92+
8093
@Override
8194
public void readFrom(StreamInput in) throws IOException {
8295
super.readFrom(in);
@@ -95,6 +108,11 @@ public void readFrom(StreamInput in) throws IOException {
95108
// UNASSIGNED_SEQ_NO means uninitialized and replica won't enable optimization using seq_no
96109
maxSeqNoOfUpdatesOrDeletesOnPrimary = SequenceNumbers.UNASSIGNED_SEQ_NO;
97110
}
111+
if (in.getVersion().onOrAfter(Version.V_6_7_0)) {
112+
retentionLeases = new RetentionLeases(in);
113+
} else {
114+
retentionLeases = RetentionLeases.EMPTY;
115+
}
98116
}
99117

100118
@Override
@@ -110,5 +128,8 @@ public void writeTo(StreamOutput out) throws IOException {
110128
if (out.getVersion().onOrAfter(Version.V_6_5_0)) {
111129
out.writeZLong(maxSeqNoOfUpdatesOrDeletesOnPrimary);
112130
}
131+
if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
132+
retentionLeases.writeTo(out);
133+
}
113134
}
114135
}

server/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.elasticsearch.cluster.node.DiscoveryNode;
2727
import org.elasticsearch.common.bytes.BytesReference;
2828
import org.elasticsearch.index.seqno.ReplicationTracker;
29+
import org.elasticsearch.index.seqno.RetentionLeases;
2930
import org.elasticsearch.index.shard.ShardId;
3031
import org.elasticsearch.index.store.Store;
3132
import org.elasticsearch.index.store.StoreFileMetaData;
@@ -113,10 +114,21 @@ public void handoffPrimaryContext(final ReplicationTracker.PrimaryContext primar
113114
}
114115

115116
@Override
116-
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps, long maxSeenAutoIdTimestampOnPrimary,
117-
long maxSeqNoOfDeletesOrUpdatesOnPrimary, ActionListener<Long> listener) {
117+
public void indexTranslogOperations(
118+
final List<Translog.Operation> operations,
119+
final int totalTranslogOps,
120+
final long maxSeenAutoIdTimestampOnPrimary,
121+
final long maxSeqNoOfDeletesOrUpdatesOnPrimary,
122+
final RetentionLeases retentionLeases,
123+
final ActionListener<Long> listener) {
118124
final RecoveryTranslogOperationsRequest request = new RecoveryTranslogOperationsRequest(
119-
recoveryId, shardId, operations, totalTranslogOps, maxSeenAutoIdTimestampOnPrimary, maxSeqNoOfDeletesOrUpdatesOnPrimary);
125+
recoveryId,
126+
shardId,
127+
operations,
128+
totalTranslogOps,
129+
maxSeenAutoIdTimestampOnPrimary,
130+
maxSeqNoOfDeletesOrUpdatesOnPrimary,
131+
retentionLeases);
120132
transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.TRANSLOG_OPS, request, translogOpsRequestOptions,
121133
new ActionListenerResponseHandler<>(ActionListener.wrap(r -> listener.onResponse(r.localCheckpoint), listener::onFailure),
122134
RecoveryTranslogOperationsResponse::new, ThreadPool.Names.GENERIC));

0 commit comments

Comments
 (0)