Skip to content

Commit 8a85251

Browse files
authored
[CCR] Auto follow Coordinator fetch cluster state in system context (#35120)
Auto follow Coordinator should fetch the leader cluster state using system context.
1 parent f915475 commit 8a85251

File tree

4 files changed

+33
-40
lines changed

4 files changed

+33
-40
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/CcrLicenseChecker.java

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -103,9 +103,8 @@ public boolean isCcrAllowed() {
103103
* @param leaderIndex the name of the leader index
104104
* @param onFailure the failure consumer
105105
* @param consumer the consumer for supplying the leader index metadata and historyUUIDs of all leader shards
106-
* @param <T> the type of response the listener is waiting for
107106
*/
108-
public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
107+
public void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
109108
final Client client,
110109
final String clusterAlias,
111110
final String leaderIndex,
@@ -118,8 +117,8 @@ public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUU
118117
request.indices(leaderIndex);
119118
checkRemoteClusterLicenseAndFetchClusterState(
120119
client,
121-
Collections.emptyMap(),
122120
clusterAlias,
121+
client.getRemoteClusterClient(clusterAlias),
123122
request,
124123
onFailure,
125124
leaderClusterState -> {
@@ -151,22 +150,20 @@ public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUU
151150
*
152151
* @param client the client
153152
* @param clusterAlias the remote cluster alias
154-
* @param headers the headers to use for leader client
155153
* @param request the cluster state request
156154
* @param onFailure the failure consumer
157155
* @param leaderClusterStateConsumer the leader cluster state consumer
158156
*/
159157
public void checkRemoteClusterLicenseAndFetchClusterState(
160158
final Client client,
161-
final Map<String, String> headers,
162159
final String clusterAlias,
163160
final ClusterStateRequest request,
164161
final Consumer<Exception> onFailure,
165162
final Consumer<ClusterState> leaderClusterStateConsumer) {
166163
checkRemoteClusterLicenseAndFetchClusterState(
167164
client,
168-
headers,
169165
clusterAlias,
166+
systemClient(client.getRemoteClusterClient(clusterAlias)),
170167
request,
171168
onFailure,
172169
leaderClusterStateConsumer,
@@ -182,18 +179,17 @@ public void checkRemoteClusterLicenseAndFetchClusterState(
182179
*
183180
* @param client the client
184181
* @param clusterAlias the remote cluster alias
185-
* @param headers the headers to use for leader client
182+
* @param leaderClient the leader client to use to execute cluster state API
186183
* @param request the cluster state request
187184
* @param onFailure the failure consumer
188185
* @param leaderClusterStateConsumer the leader cluster state consumer
189186
* @param nonCompliantLicense the supplier for when the license state of the remote cluster is non-compliant
190187
* @param unknownLicense the supplier for when the license state of the remote cluster is unknown due to failure
191-
* @param <T> the type of response the listener is waiting for
192188
*/
193-
private <T> void checkRemoteClusterLicenseAndFetchClusterState(
189+
private void checkRemoteClusterLicenseAndFetchClusterState(
194190
final Client client,
195-
final Map<String, String> headers,
196191
final String clusterAlias,
192+
final Client leaderClient,
197193
final ClusterStateRequest request,
198194
final Consumer<Exception> onFailure,
199195
final Consumer<ClusterState> leaderClusterStateConsumer,
@@ -207,7 +203,6 @@ private <T> void checkRemoteClusterLicenseAndFetchClusterState(
207203
@Override
208204
public void onResponse(final RemoteClusterLicenseChecker.LicenseCheck licenseCheck) {
209205
if (licenseCheck.isSuccess()) {
210-
final Client leaderClient = wrapClient(client.getRemoteClusterClient(clusterAlias), headers);
211206
final ActionListener<ClusterStateResponse> clusterStateListener =
212207
ActionListener.wrap(s -> leaderClusterStateConsumer.accept(s.getState()), onFailure);
213208
// following an index in remote cluster, so use remote client to fetch leader index metadata
@@ -361,6 +356,21 @@ void doExecute(Action<Response> action, Request request, ActionListener<Response
361356
}
362357
}
363358

359+
private static Client systemClient(Client client) {
360+
final ThreadContext threadContext = client.threadPool().getThreadContext();
361+
return new FilterClient(client) {
362+
@Override
363+
protected <Request extends ActionRequest, Response extends ActionResponse>
364+
void doExecute(Action<Response> action, Request request, ActionListener<Response> listener) {
365+
final Supplier<ThreadContext.StoredContext> supplier = threadContext.newRestorableContext(false);
366+
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
367+
threadContext.markAsSystemContext();
368+
super.doExecute(action, request, new ContextPreservingActionListener<>(supplier, listener));
369+
}
370+
}
371+
};
372+
}
373+
364374
private static ThreadContext.StoredContext stashWithHeaders(ThreadContext threadContext, Map<String, String> headers) {
365375
final ThreadContext.StoredContext storedContext = threadContext.stashContext();
366376
threadContext.copyHeaders(headers.entrySet());

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -159,16 +159,14 @@ private void doAutoFollow() {
159159
AutoFollower operation = new AutoFollower(handler, followerClusterState) {
160160

161161
@Override
162-
void getLeaderClusterState(final Map<String, String> headers,
163-
final String remoteCluster,
162+
void getLeaderClusterState(final String remoteCluster,
164163
final BiConsumer<ClusterState, Exception> handler) {
165164
final ClusterStateRequest request = new ClusterStateRequest();
166165
request.clear();
167166
request.metaData(true);
168167
// TODO: set non-compliant status on auto-follow coordination that can be viewed via a stats API
169168
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchClusterState(
170169
client,
171-
headers,
172170
remoteCluster,
173171
request,
174172
e -> handler.accept(null, e),
@@ -249,7 +247,7 @@ void autoFollowIndices() {
249247
final String remoteCluster = autoFollowPattern.getRemoteCluster();
250248

251249
Map<String, String> headers = autoFollowMetadata.getHeaders().get(autoFollowPattenName);
252-
getLeaderClusterState(headers, remoteCluster, (leaderClusterState, e) -> {
250+
getLeaderClusterState(remoteCluster, (leaderClusterState, e) -> {
253251
if (leaderClusterState != null) {
254252
assert e == null;
255253
final List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(autoFollowPattenName);
@@ -413,13 +411,10 @@ static Function<ClusterState, ClusterState> recordLeaderIndexAsFollowFunction(St
413411

414412
/**
415413
* Fetch the cluster state from the leader with the specified cluster alias
416-
*
417-
* @param headers the client headers
418414
* @param remoteCluster the name of the leader cluster
419415
* @param handler the callback to invoke
420416
*/
421417
abstract void getLeaderClusterState(
422-
Map<String, String> headers,
423418
String remoteCluster,
424419
BiConsumer<ClusterState, Exception> handler
425420
);

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -105,20 +105,12 @@ protected void masterOperation(
105105
client.getRemoteClusterClient(remoteCluster);
106106

107107
String leaderIndex = request.getLeaderIndex();
108-
createFollowerIndexAndFollowRemoteIndex(request, remoteCluster, leaderIndex, listener);
109-
}
110-
111-
private void createFollowerIndexAndFollowRemoteIndex(
112-
final PutFollowAction.Request request,
113-
final String remoteCluster,
114-
final String leaderIndex,
115-
final ActionListener<PutFollowAction.Response> listener) {
116108
ccrLicenseChecker.checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(
117-
client,
118-
remoteCluster,
119-
leaderIndex,
120-
listener::onFailure,
121-
(historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, historyUUID, request, listener));
109+
client,
110+
remoteCluster,
111+
leaderIndex,
112+
listener::onFailure,
113+
(historyUUID, leaderIndexMetaData) -> createFollowerIndex(leaderIndexMetaData, historyUUID, request, listener));
122114
}
123115

124116
private void createFollowerIndex(

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinatorTests.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,9 @@ public void testAutoFollower() {
8383
};
8484
AutoFollower autoFollower = new AutoFollower(handler, currentState) {
8585
@Override
86-
void getLeaderClusterState(Map<String, String> headers,
87-
String remoteCluster,
86+
void getLeaderClusterState(String remoteCluster,
8887
BiConsumer<ClusterState, Exception> handler) {
89-
assertThat(headers, equalTo(autoFollowHeaders.get("remote")));
88+
assertThat(remoteCluster, equalTo("remote"));
9089
handler.accept(leaderState, null);
9190
}
9291

@@ -143,8 +142,7 @@ public void testAutoFollowerClusterStateApiFailure() {
143142
};
144143
AutoFollower autoFollower = new AutoFollower(handler, followerState) {
145144
@Override
146-
void getLeaderClusterState(Map<String, String> headers,
147-
String remoteCluster,
145+
void getLeaderClusterState(String remoteCluster,
148146
BiConsumer<ClusterState, Exception> handler) {
149147
handler.accept(null, failure);
150148
}
@@ -204,8 +202,7 @@ public void testAutoFollowerUpdateClusterStateFailure() {
204202
};
205203
AutoFollower autoFollower = new AutoFollower(handler, followerState) {
206204
@Override
207-
void getLeaderClusterState(Map<String, String> headers,
208-
String remoteCluster,
205+
void getLeaderClusterState(String remoteCluster,
209206
BiConsumer<ClusterState, Exception> handler) {
210207
handler.accept(leaderState, null);
211208
}
@@ -267,8 +264,7 @@ public void testAutoFollowerCreateAndFollowApiCallFailure() {
267264
};
268265
AutoFollower autoFollower = new AutoFollower(handler, followerState) {
269266
@Override
270-
void getLeaderClusterState(Map<String, String> headers,
271-
String remoteCluster,
267+
void getLeaderClusterState(String remoteCluster,
272268
BiConsumer<ClusterState, Exception> handler) {
273269
handler.accept(leaderState, null);
274270
}

0 commit comments

Comments
 (0)