Skip to content

Commit 7754e62

Browse files
dnhatnDaveCTurner
andauthored
Use retention lease in peer recovery of closed indices (#48430)
Today we do not use retention leases in peer recovery for closed indices because we can't sync retention leases on closed indices. This change allows that ability and adjusts peer recovery to use retention leases for all indices with soft-deletes enabled. Relates #45136 Co-authored-by: David Turner <[email protected]>
1 parent eca6003 commit 7754e62

File tree

17 files changed

+264
-399
lines changed

17 files changed

+264
-399
lines changed

server/src/main/java/org/elasticsearch/action/ActionModule.java

-4
Original file line numberDiff line numberDiff line change
@@ -218,8 +218,6 @@
218218
import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards;
219219
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
220220
import org.elasticsearch.index.seqno.RetentionLeaseActions;
221-
import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction;
222-
import org.elasticsearch.index.seqno.RetentionLeaseSyncAction;
223221
import org.elasticsearch.indices.breaker.CircuitBreakerService;
224222
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
225223
import org.elasticsearch.persistent.CompletionPersistentTaskAction;
@@ -548,8 +546,6 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
548546

549547
// internal actions
550548
actions.register(GlobalCheckpointSyncAction.TYPE, GlobalCheckpointSyncAction.class);
551-
actions.register(RetentionLeaseBackgroundSyncAction.TYPE, RetentionLeaseBackgroundSyncAction.class);
552-
actions.register(RetentionLeaseSyncAction.TYPE, RetentionLeaseSyncAction.class);
553549
actions.register(TransportNodesSnapshotsStatus.TYPE, TransportNodesSnapshotsStatus.class);
554550
actions.register(TransportNodesListGatewayMetaState.TYPE, TransportNodesListGatewayMetaState.class);
555551
actions.register(TransportVerifyShardBeforeCloseAction.TYPE, TransportVerifyShardBeforeCloseAction.class);

server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public synchronized void rescheduleIfNecessary() {
9191
if (logger.isTraceEnabled()) {
9292
logger.trace("scheduling {} every {}", toString(), interval);
9393
}
94-
cancellable = threadPool.schedule(this, interval, getThreadPool());
94+
cancellable = threadPool.schedule(threadPool.preserveContext(this), interval, getThreadPool());
9595
isScheduledOrRunning = true;
9696
} else {
9797
logger.trace("scheduled {} disabled", toString());

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

+29-35
Original file line numberDiff line numberDiff line change
@@ -827,10 +827,7 @@ private boolean invariant() {
827827
assert checkpoints.get(aId) != null : "aId [" + aId + "] is pending in sync but isn't tracked";
828828
}
829829

830-
if (primaryMode
831-
&& indexSettings.isSoftDeleteEnabled()
832-
&& indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN
833-
&& hasAllPeerRecoveryRetentionLeases) {
830+
if (primaryMode && indexSettings.isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases) {
834831
// all tracked shard copies have a corresponding peer-recovery retention lease
835832
for (final ShardRouting shardRouting : routingTable.assignedShards()) {
836833
if (checkpoints.get(shardRouting.allocationId().getId()).tracked) {
@@ -898,7 +895,9 @@ public ReplicationTracker(
898895
this.pendingInSync = new HashSet<>();
899896
this.routingTable = null;
900897
this.replicationGroup = null;
901-
this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0);
898+
this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_8_0_0) ||
899+
(indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0) &&
900+
indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN);
902901
this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings());
903902
this.safeCommitInfoSupplier = safeCommitInfoSupplier;
904903
assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false;
@@ -1011,34 +1010,32 @@ private void addPeerRecoveryRetentionLeaseForSolePrimary() {
10111010
assert primaryMode;
10121011
assert Thread.holdsLock(this);
10131012

1014-
if (indexSettings().getIndexMetaData().getState() == IndexMetaData.State.OPEN) {
1015-
final ShardRouting primaryShard = routingTable.primaryShard();
1016-
final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard);
1017-
if (retentionLeases.get(leaseId) == null) {
1018-
if (replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard))) {
1019-
assert primaryShard.allocationId().getId().equals(shardAllocationId)
1020-
: routingTable.assignedShards() + " vs " + shardAllocationId;
1021-
// Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication
1022-
// group.
1023-
logger.trace("addPeerRecoveryRetentionLeaseForSolePrimary: adding lease [{}]", leaseId);
1024-
innerAddRetentionLease(leaseId, Math.max(0L, checkpoints.get(shardAllocationId).globalCheckpoint + 1),
1025-
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
1026-
hasAllPeerRecoveryRetentionLeases = true;
1027-
} else {
1028-
/*
1029-
* We got here here via a rolling upgrade from an older version that doesn't create peer recovery retention
1030-
* leases for every shard copy, but in this case we do not expect any leases to exist.
1031-
*/
1032-
assert hasAllPeerRecoveryRetentionLeases == false : routingTable + " vs " + retentionLeases;
1033-
logger.debug("{} becoming primary of {} with missing lease: {}", primaryShard, routingTable, retentionLeases);
1034-
}
1035-
} else if (hasAllPeerRecoveryRetentionLeases == false && routingTable.assignedShards().stream().allMatch(shardRouting ->
1036-
retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting))
1037-
|| checkpoints.get(shardRouting.allocationId().getId()).tracked == false)) {
1038-
// Although this index is old enough not to have all the expected peer recovery retention leases, in fact it does, so we
1039-
// don't need to do any more work.
1013+
final ShardRouting primaryShard = routingTable.primaryShard();
1014+
final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard);
1015+
if (retentionLeases.get(leaseId) == null) {
1016+
if (replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard))) {
1017+
assert primaryShard.allocationId().getId().equals(shardAllocationId)
1018+
: routingTable.assignedShards() + " vs " + shardAllocationId;
1019+
// Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication
1020+
// group.
1021+
logger.trace("addPeerRecoveryRetentionLeaseForSolePrimary: adding lease [{}]", leaseId);
1022+
innerAddRetentionLease(leaseId, Math.max(0L, checkpoints.get(shardAllocationId).globalCheckpoint + 1),
1023+
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
10401024
hasAllPeerRecoveryRetentionLeases = true;
1025+
} else {
1026+
/*
1027+
* We got here here via a rolling upgrade from an older version that doesn't create peer recovery retention
1028+
* leases for every shard copy, but in this case we do not expect any leases to exist.
1029+
*/
1030+
assert hasAllPeerRecoveryRetentionLeases == false : routingTable + " vs " + retentionLeases;
1031+
logger.debug("{} becoming primary of {} with missing lease: {}", primaryShard, routingTable, retentionLeases);
10411032
}
1033+
} else if (hasAllPeerRecoveryRetentionLeases == false && routingTable.assignedShards().stream().allMatch(shardRouting ->
1034+
retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting))
1035+
|| checkpoints.get(shardRouting.allocationId().getId()).tracked == false)) {
1036+
// Although this index is old enough not to have all the expected peer recovery retention leases, in fact it does, so we
1037+
// don't need to do any more work.
1038+
hasAllPeerRecoveryRetentionLeases = true;
10421039
}
10431040
}
10441041

@@ -1356,10 +1353,7 @@ private synchronized void setHasAllPeerRecoveryRetentionLeases() {
13561353
* prior to {@link Version#V_7_4_0} that does not create peer-recovery retention leases.
13571354
*/
13581355
public synchronized void createMissingPeerRecoveryRetentionLeases(ActionListener<Void> listener) {
1359-
if (indexSettings().isSoftDeleteEnabled()
1360-
&& indexSettings().getIndexMetaData().getState() == IndexMetaData.State.OPEN
1361-
&& hasAllPeerRecoveryRetentionLeases == false) {
1362-
1356+
if (indexSettings().isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases == false) {
13631357
final List<ShardRouting> shardRoutings = routingTable.assignedShards();
13641358
final GroupedActionListener<ReplicationResponse> groupedActionListener = new GroupedActionListener<>(ActionListener.wrap(vs -> {
13651359
setHasAllPeerRecoveryRetentionLeases();

server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java

+63-4
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,15 @@
2121

2222
import org.apache.logging.log4j.LogManager;
2323
import org.apache.logging.log4j.Logger;
24+
import org.apache.logging.log4j.message.ParameterizedMessage;
25+
import org.apache.lucene.store.AlreadyClosedException;
26+
import org.elasticsearch.ExceptionsHelper;
2427
import org.elasticsearch.action.ActionListener;
25-
import org.elasticsearch.action.ActionType;
2628
import org.elasticsearch.action.support.ActionFilters;
2729
import org.elasticsearch.action.support.ActiveShardCount;
2830
import org.elasticsearch.action.support.replication.ReplicationRequest;
2931
import org.elasticsearch.action.support.replication.ReplicationResponse;
32+
import org.elasticsearch.action.support.replication.ReplicationTask;
3033
import org.elasticsearch.action.support.replication.TransportReplicationAction;
3134
import org.elasticsearch.cluster.action.shard.ShardStateAction;
3235
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -37,12 +40,19 @@
3740
import org.elasticsearch.common.settings.Settings;
3841
import org.elasticsearch.gateway.WriteStateException;
3942
import org.elasticsearch.index.shard.IndexShard;
43+
import org.elasticsearch.index.shard.IndexShardClosedException;
4044
import org.elasticsearch.index.shard.ShardId;
4145
import org.elasticsearch.indices.IndicesService;
46+
import org.elasticsearch.node.NodeClosedException;
47+
import org.elasticsearch.tasks.Task;
48+
import org.elasticsearch.tasks.TaskId;
4249
import org.elasticsearch.threadpool.ThreadPool;
50+
import org.elasticsearch.transport.TransportException;
51+
import org.elasticsearch.transport.TransportResponseHandler;
4352
import org.elasticsearch.transport.TransportService;
4453

4554
import java.io.IOException;
55+
import java.util.Map;
4656
import java.util.Objects;
4757

4858
/**
@@ -56,9 +66,7 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi
5666
RetentionLeaseBackgroundSyncAction.Request,
5767
ReplicationResponse> {
5868

59-
public static String ACTION_NAME = "indices:admin/seq_no/retention_lease_background_sync";
60-
public static ActionType<ReplicationResponse> TYPE = new ActionType<>(ACTION_NAME, ReplicationResponse::new);
61-
69+
public static final String ACTION_NAME = "indices:admin/seq_no/retention_lease_background_sync";
6270
private static final Logger LOGGER = LogManager.getLogger(RetentionLeaseSyncAction.class);
6371

6472
protected Logger getLogger() {
@@ -90,6 +98,52 @@ public RetentionLeaseBackgroundSyncAction(
9098
ThreadPool.Names.MANAGEMENT);
9199
}
92100

101+
@Override
102+
protected void doExecute(Task task, Request request, ActionListener<ReplicationResponse> listener) {
103+
assert false : "use RetentionLeaseBackgroundSyncAction#backgroundSync";
104+
}
105+
106+
final void backgroundSync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases) {
107+
final Request request = new Request(shardId, retentionLeases);
108+
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_background_sync", request);
109+
transportService.sendChildRequest(clusterService.localNode(), transportPrimaryAction,
110+
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
111+
task,
112+
transportOptions,
113+
new TransportResponseHandler<ReplicationResponse>() {
114+
@Override
115+
public ReplicationResponse read(StreamInput in) throws IOException {
116+
return newResponseInstance(in);
117+
}
118+
119+
@Override
120+
public String executor() {
121+
return ThreadPool.Names.SAME;
122+
}
123+
124+
@Override
125+
public void handleResponse(ReplicationResponse response) {
126+
task.setPhase("finished");
127+
taskManager.unregister(task);
128+
}
129+
130+
@Override
131+
public void handleException(TransportException e) {
132+
task.setPhase("finished");
133+
taskManager.unregister(task);
134+
if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) {
135+
// node shutting down
136+
return;
137+
}
138+
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) != null) {
139+
// the shard is closed
140+
return;
141+
}
142+
getLogger().warn(new ParameterizedMessage("{} retention lease background sync failed", shardId), e);
143+
}
144+
});
145+
}
146+
93147
@Override
94148
protected void shardOperationOnPrimary(
95149
final Request request,
@@ -137,6 +191,11 @@ public void writeTo(final StreamOutput out) throws IOException {
137191
retentionLeases.writeTo(out);
138192
}
139193

194+
@Override
195+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
196+
return new ReplicationTask(id, type, action, "retention_lease_background_sync shardId=" + shardId, parentTaskId, headers);
197+
}
198+
140199
@Override
141200
public String toString() {
142201
return "RetentionLeaseBackgroundSyncAction.Request{" +

server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java

+59-4
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@
2121

2222
import org.apache.logging.log4j.LogManager;
2323
import org.apache.logging.log4j.Logger;
24+
import org.apache.logging.log4j.message.ParameterizedMessage;
25+
import org.apache.lucene.store.AlreadyClosedException;
26+
import org.elasticsearch.ExceptionsHelper;
2427
import org.elasticsearch.action.ActionListener;
25-
import org.elasticsearch.action.ActionType;
2628
import org.elasticsearch.action.support.ActionFilters;
2729
import org.elasticsearch.action.support.ActiveShardCount;
2830
import org.elasticsearch.action.support.WriteResponse;
2931
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
3032
import org.elasticsearch.action.support.replication.ReplicationResponse;
33+
import org.elasticsearch.action.support.replication.ReplicationTask;
3134
import org.elasticsearch.action.support.replication.TransportWriteAction;
3235
import org.elasticsearch.cluster.action.shard.ShardStateAction;
3336
import org.elasticsearch.cluster.block.ClusterBlockLevel;
@@ -39,12 +42,18 @@
3942
import org.elasticsearch.common.settings.Settings;
4043
import org.elasticsearch.gateway.WriteStateException;
4144
import org.elasticsearch.index.shard.IndexShard;
45+
import org.elasticsearch.index.shard.IndexShardClosedException;
4246
import org.elasticsearch.index.shard.ShardId;
4347
import org.elasticsearch.indices.IndicesService;
48+
import org.elasticsearch.tasks.Task;
49+
import org.elasticsearch.tasks.TaskId;
4450
import org.elasticsearch.threadpool.ThreadPool;
51+
import org.elasticsearch.transport.TransportException;
52+
import org.elasticsearch.transport.TransportResponseHandler;
4553
import org.elasticsearch.transport.TransportService;
4654

4755
import java.io.IOException;
56+
import java.util.Map;
4857
import java.util.Objects;
4958

5059
/**
@@ -54,9 +63,7 @@
5463
public class RetentionLeaseSyncAction extends
5564
TransportWriteAction<RetentionLeaseSyncAction.Request, RetentionLeaseSyncAction.Request, RetentionLeaseSyncAction.Response> {
5665

57-
public static String ACTION_NAME = "indices:admin/seq_no/retention_lease_sync";
58-
public static ActionType<Response> TYPE = new ActionType<>(ACTION_NAME, Response::new);
59-
66+
public static final String ACTION_NAME = "indices:admin/seq_no/retention_lease_sync";
6067
private static final Logger LOGGER = LogManager.getLogger(RetentionLeaseSyncAction.class);
6168

6269
protected Logger getLogger() {
@@ -88,6 +95,49 @@ public RetentionLeaseSyncAction(
8895
ThreadPool.Names.MANAGEMENT, false);
8996
}
9097

98+
@Override
99+
protected void doExecute(Task parentTask, Request request, ActionListener<Response> listener) {
100+
assert false : "use RetentionLeaseSyncAction#sync";
101+
}
102+
103+
final void sync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases,
104+
ActionListener<ReplicationResponse> listener) {
105+
final Request request = new Request(shardId, retentionLeases);
106+
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_sync", request);
107+
transportService.sendChildRequest(clusterService.localNode(), transportPrimaryAction,
108+
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
109+
task,
110+
transportOptions,
111+
new TransportResponseHandler<ReplicationResponse>() {
112+
@Override
113+
public ReplicationResponse read(StreamInput in) throws IOException {
114+
return newResponseInstance(in);
115+
}
116+
117+
@Override
118+
public String executor() {
119+
return ThreadPool.Names.SAME;
120+
}
121+
122+
@Override
123+
public void handleResponse(ReplicationResponse response) {
124+
task.setPhase("finished");
125+
taskManager.unregister(task);
126+
listener.onResponse(response);
127+
}
128+
129+
@Override
130+
public void handleException(TransportException e) {
131+
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
132+
getLogger().warn(new ParameterizedMessage("{} retention lease sync failed", shardId), e);
133+
}
134+
task.setPhase("finished");
135+
taskManager.unregister(task);
136+
listener.onFailure(e);
137+
}
138+
});
139+
}
140+
91141
@Override
92142
protected void shardOperationOnPrimary(Request request, IndexShard primary,
93143
ActionListener<PrimaryResult<Request, Response>> listener) {
@@ -141,6 +191,11 @@ public void writeTo(final StreamOutput out) throws IOException {
141191
retentionLeases.writeTo(out);
142192
}
143193

194+
@Override
195+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
196+
return new ReplicationTask(id, type, action, "retention_lease_sync shardId=" + shardId, parentTaskId, headers);
197+
}
198+
144199
@Override
145200
public String toString() {
146201
return "RetentionLeaseSyncAction.Request{" +

0 commit comments

Comments
 (0)