Skip to content

Commit 2794ab7

Browse files
authored
Execute retention lease syncs under system context (#53838)
The retention lease syncs need to occur under the system context, because they are internal actions executed on behalf of the user. Today we are relying on this happening for background syncs by virtue of the fact that the context the syncs are created under is the system context. This is due to these occurring on the cluster state applier thread. However, there are situations where this does not hold such as when a timed out cluster state publication occurs, and the node where the shard is allocated is the elected master node. In that case, the context will be empty due to the fact that we do not reschedule publication under the system context. Currently, doing so runs us into some troubles with losing the existing context, possibly dropping deprecation headers. We could copy that context over when marking the current context as the system context, but the implications of that require some more investigation. For now, we explicitly mark the retention lease syncs as executing under the system context, as this is situation that we can reason about.
1 parent 76cd638 commit 2794ab7

File tree

3 files changed

+82
-70
lines changed

3 files changed

+82
-70
lines changed

server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractAsyncTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public synchronized void rescheduleIfNecessary() {
9191
if (logger.isTraceEnabled()) {
9292
logger.trace("scheduling {} every {}", toString(), interval);
9393
}
94-
cancellable = threadPool.schedule(threadPool.preserveContext(this), interval, getThreadPool());
94+
cancellable = threadPool.schedule(this, interval, getThreadPool());
9595
isScheduledOrRunning = true;
9696
} else {
9797
logger.trace("scheduled {} disabled", toString());

server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java

Lines changed: 42 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.common.io.stream.StreamInput;
3838
import org.elasticsearch.common.io.stream.StreamOutput;
3939
import org.elasticsearch.common.settings.Settings;
40+
import org.elasticsearch.common.util.concurrent.ThreadContext;
4041
import org.elasticsearch.gateway.WriteStateException;
4142
import org.elasticsearch.index.shard.IndexShard;
4243
import org.elasticsearch.index.shard.IndexShardClosedException;
@@ -101,44 +102,49 @@ protected void doExecute(Task task, Request request, ActionListener<ReplicationR
101102
}
102103

103104
final void backgroundSync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases) {
104-
final Request request = new Request(shardId, retentionLeases);
105-
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_background_sync", request);
106-
transportService.sendChildRequest(clusterService.localNode(), transportPrimaryAction,
107-
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
108-
task,
109-
transportOptions,
110-
new TransportResponseHandler<ReplicationResponse>() {
111-
@Override
112-
public ReplicationResponse read(StreamInput in) throws IOException {
113-
return newResponseInstance(in);
114-
}
115-
116-
@Override
117-
public String executor() {
118-
return ThreadPool.Names.SAME;
119-
}
120-
121-
@Override
122-
public void handleResponse(ReplicationResponse response) {
123-
task.setPhase("finished");
124-
taskManager.unregister(task);
125-
}
126-
127-
@Override
128-
public void handleException(TransportException e) {
129-
task.setPhase("finished");
130-
taskManager.unregister(task);
131-
if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) {
132-
// node shutting down
133-
return;
105+
final ThreadContext threadContext = threadPool.getThreadContext();
106+
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
107+
// we have to execute under the system context so that if security is enabled the sync is authorized
108+
threadContext.markAsSystemContext();
109+
final Request request = new Request(shardId, retentionLeases);
110+
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_background_sync", request);
111+
transportService.sendChildRequest(clusterService.localNode(), transportPrimaryAction,
112+
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
113+
task,
114+
transportOptions,
115+
new TransportResponseHandler<ReplicationResponse>() {
116+
@Override
117+
public ReplicationResponse read(StreamInput in) throws IOException {
118+
return newResponseInstance(in);
134119
}
135-
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) != null) {
136-
// the shard is closed
137-
return;
120+
121+
@Override
122+
public String executor() {
123+
return ThreadPool.Names.SAME;
124+
}
125+
126+
@Override
127+
public void handleResponse(ReplicationResponse response) {
128+
task.setPhase("finished");
129+
taskManager.unregister(task);
138130
}
139-
getLogger().warn(new ParameterizedMessage("{} retention lease background sync failed", shardId), e);
140-
}
141-
});
131+
132+
@Override
133+
public void handleException(TransportException e) {
134+
task.setPhase("finished");
135+
taskManager.unregister(task);
136+
if (ExceptionsHelper.unwrap(e, NodeClosedException.class) != null) {
137+
// node shutting down
138+
return;
139+
}
140+
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) != null) {
141+
// the shard is closed
142+
return;
143+
}
144+
getLogger().warn(new ParameterizedMessage("{} retention lease background sync failed", shardId), e);
145+
}
146+
});
147+
}
142148
}
143149

144150
@Override

server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.elasticsearch.common.io.stream.StreamInput;
4040
import org.elasticsearch.common.io.stream.StreamOutput;
4141
import org.elasticsearch.common.settings.Settings;
42+
import org.elasticsearch.common.util.concurrent.ThreadContext;
4243
import org.elasticsearch.gateway.WriteStateException;
4344
import org.elasticsearch.index.shard.IndexShard;
4445
import org.elasticsearch.index.shard.IndexShardClosedException;
@@ -99,40 +100,45 @@ protected void doExecute(Task parentTask, Request request, ActionListener<Respon
99100

100101
final void sync(ShardId shardId, String primaryAllocationId, long primaryTerm, RetentionLeases retentionLeases,
101102
ActionListener<ReplicationResponse> listener) {
102-
final Request request = new Request(shardId, retentionLeases);
103-
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_sync", request);
104-
transportService.sendChildRequest(clusterService.localNode(), transportPrimaryAction,
105-
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
106-
task,
107-
transportOptions,
108-
new TransportResponseHandler<ReplicationResponse>() {
109-
@Override
110-
public ReplicationResponse read(StreamInput in) throws IOException {
111-
return newResponseInstance(in);
112-
}
113-
114-
@Override
115-
public String executor() {
116-
return ThreadPool.Names.SAME;
117-
}
118-
119-
@Override
120-
public void handleResponse(ReplicationResponse response) {
121-
task.setPhase("finished");
122-
taskManager.unregister(task);
123-
listener.onResponse(response);
124-
}
125-
126-
@Override
127-
public void handleException(TransportException e) {
128-
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
129-
getLogger().warn(new ParameterizedMessage("{} retention lease sync failed", shardId), e);
103+
final ThreadContext threadContext = threadPool.getThreadContext();
104+
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
105+
// we have to execute under the system context so that if security is enabled the sync is authorized
106+
threadContext.markAsSystemContext();
107+
final Request request = new Request(shardId, retentionLeases);
108+
final ReplicationTask task = (ReplicationTask) taskManager.register("transport", "retention_lease_sync", request);
109+
transportService.sendChildRequest(clusterService.localNode(), transportPrimaryAction,
110+
new ConcreteShardRequest<>(request, primaryAllocationId, primaryTerm),
111+
task,
112+
transportOptions,
113+
new TransportResponseHandler<ReplicationResponse>() {
114+
@Override
115+
public ReplicationResponse read(StreamInput in) throws IOException {
116+
return newResponseInstance(in);
130117
}
131-
task.setPhase("finished");
132-
taskManager.unregister(task);
133-
listener.onFailure(e);
134-
}
135-
});
118+
119+
@Override
120+
public String executor() {
121+
return ThreadPool.Names.SAME;
122+
}
123+
124+
@Override
125+
public void handleResponse(ReplicationResponse response) {
126+
task.setPhase("finished");
127+
taskManager.unregister(task);
128+
listener.onResponse(response);
129+
}
130+
131+
@Override
132+
public void handleException(TransportException e) {
133+
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
134+
getLogger().warn(new ParameterizedMessage("{} retention lease sync failed", shardId), e);
135+
}
136+
task.setPhase("finished");
137+
taskManager.unregister(task);
138+
listener.onFailure(e);
139+
}
140+
});
141+
}
136142
}
137143

138144
@Override

0 commit comments

Comments
 (0)