Skip to content

Use retention lease in peer recovery of closed indices #48430

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Nov 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,6 @@
import org.elasticsearch.gateway.TransportNodesListGatewayStartedShards;
import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.seqno.RetentionLeaseActions;
import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction;
import org.elasticsearch.index.seqno.RetentionLeaseSyncAction;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.persistent.CompletionPersistentTaskAction;
Expand Down Expand Up @@ -548,8 +546,6 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg

// internal actions
actions.register(GlobalCheckpointSyncAction.TYPE, GlobalCheckpointSyncAction.class);
actions.register(RetentionLeaseBackgroundSyncAction.TYPE, RetentionLeaseBackgroundSyncAction.class);
actions.register(RetentionLeaseSyncAction.TYPE, RetentionLeaseSyncAction.class);
actions.register(TransportNodesSnapshotsStatus.TYPE, TransportNodesSnapshotsStatus.class);
actions.register(TransportNodesListGatewayMetaState.TYPE, TransportNodesListGatewayMetaState.class);
actions.register(TransportVerifyShardBeforeCloseAction.TYPE, TransportVerifyShardBeforeCloseAction.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public synchronized void rescheduleIfNecessary() {
if (logger.isTraceEnabled()) {
logger.trace("scheduling {} every {}", toString(), interval);
}
cancellable = threadPool.schedule(this, interval, getThreadPool());
cancellable = threadPool.schedule(threadPool.preserveContext(this), interval, getThreadPool());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious about why this change is necessary?

The concrete tasks all seem to be system like and thus should not really depend on the caller context. If there is some dependency, this could become problematic if a user triggers the creation of an IndexService?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ThreadPool#schedule does not itself preserve the context of the caller and instead runs the scheduled task in the default context which is not a system context.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I knew about that. The question was more about the motivation for moving from simply setting the system context inside the task to preserving it here. I did some double checking on the callers and AFAICS it looks ok, just seemed odd to me to make this change for this specific PR. On second thought, I think it makes sense to preserve the context here, given that the AbstractAsyncTask is not specific to IndexService, but could be good to maybe add an assertion about being in system-context to IndexService?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I prefer preserving an existing context here since the security implications are much clearer - security bugs lurk in areas where privileges change, so the less of that we do the better. IMO it's a bit trappy that EsThreadPoolExecutor#execute preserves the caller's context but ThreadPool#schedule does not, although this is one of the very few places where that matters right now.

It's possible we could assert that we are in system context here, but that seems an overly strong statement to make. We already have tests to assert that we're in a context with appropriate permissions which I think is enough.

isScheduledOrRunning = true;
} else {
logger.trace("scheduled {} disabled", toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -827,10 +827,7 @@ private boolean invariant() {
assert checkpoints.get(aId) != null : "aId [" + aId + "] is pending in sync but isn't tracked";
}

if (primaryMode
&& indexSettings.isSoftDeleteEnabled()
&& indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN
&& hasAllPeerRecoveryRetentionLeases) {
if (primaryMode && indexSettings.isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases) {
// all tracked shard copies have a corresponding peer-recovery retention lease
for (final ShardRouting shardRouting : routingTable.assignedShards()) {
if (checkpoints.get(shardRouting.allocationId().getId()).tracked) {
Expand Down Expand Up @@ -898,7 +895,9 @@ public ReplicationTracker(
this.pendingInSync = new HashSet<>();
this.routingTable = null;
this.replicationGroup = null;
this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0);
this.hasAllPeerRecoveryRetentionLeases = indexSettings.getIndexVersionCreated().onOrAfter(Version.V_8_0_0) ||
(indexSettings.getIndexVersionCreated().onOrAfter(Version.V_7_4_0) &&
indexSettings.getIndexMetaData().getState() == IndexMetaData.State.OPEN);
this.fileBasedRecoveryThreshold = IndexSettings.FILE_BASED_RECOVERY_THRESHOLD_SETTING.get(indexSettings.getSettings());
this.safeCommitInfoSupplier = safeCommitInfoSupplier;
assert Version.V_EMPTY.equals(indexSettings.getIndexVersionCreated()) == false;
Expand Down Expand Up @@ -1011,34 +1010,32 @@ private void addPeerRecoveryRetentionLeaseForSolePrimary() {
assert primaryMode;
assert Thread.holdsLock(this);

if (indexSettings().getIndexMetaData().getState() == IndexMetaData.State.OPEN) {
final ShardRouting primaryShard = routingTable.primaryShard();
final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard);
if (retentionLeases.get(leaseId) == null) {
if (replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard))) {
assert primaryShard.allocationId().getId().equals(shardAllocationId)
: routingTable.assignedShards() + " vs " + shardAllocationId;
// Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication
// group.
logger.trace("addPeerRecoveryRetentionLeaseForSolePrimary: adding lease [{}]", leaseId);
innerAddRetentionLease(leaseId, Math.max(0L, checkpoints.get(shardAllocationId).globalCheckpoint + 1),
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
hasAllPeerRecoveryRetentionLeases = true;
} else {
/*
* We got here here via a rolling upgrade from an older version that doesn't create peer recovery retention
* leases for every shard copy, but in this case we do not expect any leases to exist.
*/
assert hasAllPeerRecoveryRetentionLeases == false : routingTable + " vs " + retentionLeases;
logger.debug("{} becoming primary of {} with missing lease: {}", primaryShard, routingTable, retentionLeases);
}
} else if (hasAllPeerRecoveryRetentionLeases == false && routingTable.assignedShards().stream().allMatch(shardRouting ->
retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting))
|| checkpoints.get(shardRouting.allocationId().getId()).tracked == false)) {
// Although this index is old enough not to have all the expected peer recovery retention leases, in fact it does, so we
// don't need to do any more work.
final ShardRouting primaryShard = routingTable.primaryShard();
final String leaseId = getPeerRecoveryRetentionLeaseId(primaryShard);
if (retentionLeases.get(leaseId) == null) {
if (replicationGroup.getReplicationTargets().equals(Collections.singletonList(primaryShard))) {
assert primaryShard.allocationId().getId().equals(shardAllocationId)
: routingTable.assignedShards() + " vs " + shardAllocationId;
// Safe to call innerAddRetentionLease() without a subsequent sync since there are no other members of this replication
// group.
logger.trace("addPeerRecoveryRetentionLeaseForSolePrimary: adding lease [{}]", leaseId);
innerAddRetentionLease(leaseId, Math.max(0L, checkpoints.get(shardAllocationId).globalCheckpoint + 1),
PEER_RECOVERY_RETENTION_LEASE_SOURCE);
hasAllPeerRecoveryRetentionLeases = true;
} else {
/*
* We got here here via a rolling upgrade from an older version that doesn't create peer recovery retention
* leases for every shard copy, but in this case we do not expect any leases to exist.
*/
assert hasAllPeerRecoveryRetentionLeases == false : routingTable + " vs " + retentionLeases;
logger.debug("{} becoming primary of {} with missing lease: {}", primaryShard, routingTable, retentionLeases);
}
} else if (hasAllPeerRecoveryRetentionLeases == false && routingTable.assignedShards().stream().allMatch(shardRouting ->
retentionLeases.contains(getPeerRecoveryRetentionLeaseId(shardRouting))
|| checkpoints.get(shardRouting.allocationId().getId()).tracked == false)) {
// Although this index is old enough not to have all the expected peer recovery retention leases, in fact it does, so we
// don't need to do any more work.
hasAllPeerRecoveryRetentionLeases = true;
}
}

Expand Down Expand Up @@ -1356,10 +1353,7 @@ private synchronized void setHasAllPeerRecoveryRetentionLeases() {
* prior to {@link Version#V_7_4_0} that does not create peer-recovery retention leases.
*/
public synchronized void createMissingPeerRecoveryRetentionLeases(ActionListener<Void> listener) {
if (indexSettings().isSoftDeleteEnabled()
&& indexSettings().getIndexMetaData().getState() == IndexMetaData.State.OPEN
&& hasAllPeerRecoveryRetentionLeases == false) {

if (indexSettings().isSoftDeleteEnabled() && hasAllPeerRecoveryRetentionLeases == false) {
final List<ShardRouting> shardRoutings = routingTable.assignedShards();
final GroupedActionListener<ReplicationResponse> groupedActionListener = new GroupedActionListener<>(ActionListener.wrap(vs -> {
setHasAllPeerRecoveryRetentionLeases();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.ReplicationTask;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand All @@ -37,12 +40,19 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.WriteStateException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;

/**
Expand All @@ -56,9 +66,7 @@ public class RetentionLeaseBackgroundSyncAction extends TransportReplicationActi
RetentionLeaseBackgroundSyncAction.Request,
ReplicationResponse> {

public static String ACTION_NAME = "indices:admin/seq_no/retention_lease_background_sync";
public static ActionType<ReplicationResponse> TYPE = new ActionType<>(ACTION_NAME, ReplicationResponse::new);

public static final String ACTION_NAME = "indices:admin/seq_no/retention_lease_background_sync";
private static final Logger LOGGER = LogManager.getLogger(RetentionLeaseSyncAction.class);

protected Logger getLogger() {
Expand Down Expand Up @@ -90,6 +98,52 @@ public RetentionLeaseBackgroundSyncAction(
ThreadPool.Names.MANAGEMENT);
}

@Override
protected void doExecute(Task task, Request request, ActionListener<ReplicationResponse> listener) {
assert false : "use RetentionLeaseBackgroundSyncAction#backgroundSync";
}

final void backgroundSync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases) {
final Request request = new Request(shardId, retentionLeases);
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_background_sync", request);
transportService.sendChildRequest(clusterService.localNode(), transportPrimaryAction,
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
task,
transportOptions,
new TransportResponseHandler<ReplicationResponse>() {
@Override
public ReplicationResponse read(StreamInput in) throws IOException {
return newResponseInstance(in);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(ReplicationResponse response) {
task.setPhase("finished");
taskManager.unregister(task);
}

@Override
public void handleException(TransportException e) {
task.setPhase("finished");
taskManager.unregister(task);
if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) {
// node shutting down
return;
}
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) != null) {
// the shard is closed
return;
}
getLogger().warn(new ParameterizedMessage("{} retention lease background sync failed", shardId), e);
}
});
}

@Override
protected void shardOperationOnPrimary(
final Request request,
Expand Down Expand Up @@ -137,6 +191,11 @@ public void writeTo(final StreamOutput out) throws IOException {
retentionLeases.writeTo(out);
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new ReplicationTask(id, type, action, "retention_lease_background_sync shardId=" + shardId, parentTaskId, headers);
}

@Override
public String toString() {
return "RetentionLeaseBackgroundSyncAction.Request{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.WriteResponse;
import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.ReplicationTask;
import org.elasticsearch.action.support.replication.TransportWriteAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
Expand All @@ -39,12 +42,18 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.WriteStateException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardClosedException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;

/**
Expand All @@ -54,9 +63,7 @@
public class RetentionLeaseSyncAction extends
TransportWriteAction<RetentionLeaseSyncAction.Request, RetentionLeaseSyncAction.Request, RetentionLeaseSyncAction.Response> {

public static String ACTION_NAME = "indices:admin/seq_no/retention_lease_sync";
public static ActionType<Response> TYPE = new ActionType<>(ACTION_NAME, Response::new);

public static final String ACTION_NAME = "indices:admin/seq_no/retention_lease_sync";
private static final Logger LOGGER = LogManager.getLogger(RetentionLeaseSyncAction.class);

protected Logger getLogger() {
Expand Down Expand Up @@ -88,6 +95,49 @@ public RetentionLeaseSyncAction(
ThreadPool.Names.MANAGEMENT, false);
}

@Override
protected void doExecute(Task parentTask, Request request, ActionListener<Response> listener) {
assert false : "use RetentionLeaseSyncAction#sync";
}

final void sync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases,
ActionListener<ReplicationResponse> listener) {
final Request request = new Request(shardId, retentionLeases);
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_sync", request);
transportService.sendChildRequest(clusterService.localNode(), transportPrimaryAction,
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
task,
transportOptions,
new TransportResponseHandler<ReplicationResponse>() {
@Override
public ReplicationResponse read(StreamInput in) throws IOException {
return newResponseInstance(in);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}

@Override
public void handleResponse(ReplicationResponse response) {
task.setPhase("finished");
taskManager.unregister(task);
listener.onResponse(response);
}

@Override
public void handleException(TransportException e) {
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
getLogger().warn(new ParameterizedMessage("{} retention lease sync failed", shardId), e);
}
task.setPhase("finished");
taskManager.unregister(task);
listener.onFailure(e);
}
});
}

@Override
protected void shardOperationOnPrimary(Request request, IndexShard primary,
ActionListener<PrimaryResult<Request, Response>> listener) {
Expand Down Expand Up @@ -141,6 +191,11 @@ public void writeTo(final StreamOutput out) throws IOException {
retentionLeases.writeTo(out);
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new ReplicationTask(id, type, action, "retention_lease_sync shardId=" + shardId, parentTaskId, headers);
}

@Override
public String toString() {
return "RetentionLeaseSyncAction.Request{" +
Expand Down
Loading