7
7
package org .elasticsearch .xpack .ccr ;
8
8
9
9
import org .elasticsearch .ElasticsearchStatusException ;
10
+ import org .elasticsearch .action .Action ;
10
11
import org .elasticsearch .action .ActionListener ;
12
+ import org .elasticsearch .action .ActionRequest ;
13
+ import org .elasticsearch .action .ActionRequestBuilder ;
14
+ import org .elasticsearch .action .ActionResponse ;
11
15
import org .elasticsearch .action .admin .cluster .state .ClusterStateRequest ;
12
16
import org .elasticsearch .action .admin .cluster .state .ClusterStateResponse ;
17
+ import org .elasticsearch .action .support .ContextPreservingActionListener ;
13
18
import org .elasticsearch .action .admin .indices .stats .IndexShardStats ;
14
19
import org .elasticsearch .action .admin .indices .stats .IndexStats ;
15
20
import org .elasticsearch .action .admin .indices .stats .IndicesStatsRequest ;
16
21
import org .elasticsearch .action .admin .indices .stats .IndicesStatsResponse ;
17
22
import org .elasticsearch .action .admin .indices .stats .ShardStats ;
18
23
import org .elasticsearch .client .Client ;
24
+ import org .elasticsearch .client .FilterClient ;
19
25
import org .elasticsearch .cluster .ClusterState ;
20
26
import org .elasticsearch .cluster .metadata .IndexMetaData ;
27
+ import org .elasticsearch .common .util .concurrent .ThreadContext ;
21
28
import org .elasticsearch .common .CheckedConsumer ;
22
29
import org .elasticsearch .index .engine .CommitStats ;
23
30
import org .elasticsearch .index .engine .Engine ;
24
31
import org .elasticsearch .index .shard .ShardId ;
25
32
import org .elasticsearch .license .RemoteClusterLicenseChecker ;
26
33
import org .elasticsearch .license .XPackLicenseState ;
27
34
import org .elasticsearch .rest .RestStatus ;
35
+ import org .elasticsearch .xpack .ccr .action .ShardFollowTask ;
28
36
import org .elasticsearch .xpack .core .XPackPlugin ;
29
37
30
38
import java .util .Collections ;
31
39
import java .util .Locale ;
40
+ import java .util .Map ;
32
41
import java .util .Objects ;
33
42
import java .util .function .BiConsumer ;
34
43
import java .util .function .BooleanSupplier ;
35
44
import java .util .function .Consumer ;
36
45
import java .util .function .Function ;
46
+ import java .util .function .Supplier ;
47
+ import java .util .stream .Collectors ;
37
48
38
49
/**
39
50
* Encapsulates licensing checking for CCR.
@@ -93,6 +104,7 @@ public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUU
93
104
request .indices (leaderIndex );
94
105
checkRemoteClusterLicenseAndFetchClusterState (
95
106
client ,
107
+ Collections .emptyMap (),
96
108
clusterAlias ,
97
109
request ,
98
110
onFailure ,
@@ -115,19 +127,22 @@ public <T> void checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUU
115
127
*
116
128
* @param client the client
117
129
* @param clusterAlias the remote cluster alias
130
+ * @param headers the headers to use for leader client
118
131
* @param request the cluster state request
119
132
* @param onFailure the failure consumer
120
133
* @param leaderClusterStateConsumer the leader cluster state consumer
121
134
* @param <T> the type of response the listener is waiting for
122
135
*/
123
136
public <T > void checkRemoteClusterLicenseAndFetchClusterState (
124
137
final Client client ,
138
+ final Map <String , String > headers ,
125
139
final String clusterAlias ,
126
140
final ClusterStateRequest request ,
127
141
final Consumer <Exception > onFailure ,
128
142
final Consumer <ClusterState > leaderClusterStateConsumer ) {
129
143
checkRemoteClusterLicenseAndFetchClusterState (
130
144
client ,
145
+ headers ,
131
146
clusterAlias ,
132
147
request ,
133
148
onFailure ,
@@ -144,6 +159,7 @@ public <T> void checkRemoteClusterLicenseAndFetchClusterState(
144
159
*
145
160
* @param client the client
146
161
* @param clusterAlias the remote cluster alias
162
+ * @param headers the headers to use for leader client
147
163
* @param request the cluster state request
148
164
* @param onFailure the failure consumer
149
165
* @param leaderClusterStateConsumer the leader cluster state consumer
@@ -153,6 +169,7 @@ public <T> void checkRemoteClusterLicenseAndFetchClusterState(
153
169
*/
154
170
private <T > void checkRemoteClusterLicenseAndFetchClusterState (
155
171
final Client client ,
172
+ final Map <String , String > headers ,
156
173
final String clusterAlias ,
157
174
final ClusterStateRequest request ,
158
175
final Consumer <Exception > onFailure ,
@@ -167,7 +184,7 @@ private <T> void checkRemoteClusterLicenseAndFetchClusterState(
167
184
@ Override
168
185
public void onResponse (final RemoteClusterLicenseChecker .LicenseCheck licenseCheck ) {
169
186
if (licenseCheck .isSuccess ()) {
170
- final Client leaderClient = client .getRemoteClusterClient (clusterAlias );
187
+ final Client leaderClient = wrapClient ( client .getRemoteClusterClient (clusterAlias ), headers );
171
188
final ActionListener <ClusterStateResponse > clusterStateListener =
172
189
ActionListener .wrap (s -> leaderClusterStateConsumer .accept (s .getState ()), onFailure );
173
190
// following an index in remote cluster, so use remote client to fetch leader index metadata
@@ -237,6 +254,34 @@ public void fetchLeaderHistoryUUIDs(
237
254
leaderClient .admin ().indices ().stats (request , ActionListener .wrap (indicesStatsHandler , onFailure ));
238
255
}
239
256
257
+ public static Client wrapClient (Client client , Map <String , String > headers ) {
258
+ if (headers .isEmpty ()) {
259
+ return client ;
260
+ } else {
261
+ final ThreadContext threadContext = client .threadPool ().getThreadContext ();
262
+ Map <String , String > filteredHeaders = headers .entrySet ().stream ()
263
+ .filter (e -> ShardFollowTask .HEADER_FILTERS .contains (e .getKey ()))
264
+ .collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue ));
265
+ return new FilterClient (client ) {
266
+ @ Override
267
+ protected <Request extends ActionRequest , Response extends ActionResponse ,
268
+ RequestBuilder extends ActionRequestBuilder <Request , Response , RequestBuilder >>
269
+ void doExecute (Action <Request , Response , RequestBuilder > action , Request request , ActionListener <Response > listener ) {
270
+ final Supplier <ThreadContext .StoredContext > supplier = threadContext .newRestorableContext (false );
271
+ try (ThreadContext .StoredContext ignore = stashWithHeaders (threadContext , filteredHeaders )) {
272
+ super .doExecute (action , request , new ContextPreservingActionListener <>(supplier , listener ));
273
+ }
274
+ }
275
+ };
276
+ }
277
+ }
278
+
279
+ private static ThreadContext .StoredContext stashWithHeaders (ThreadContext threadContext , Map <String , String > headers ) {
280
+ final ThreadContext .StoredContext storedContext = threadContext .stashContext ();
281
+ threadContext .copyHeaders (headers .entrySet ());
282
+ return storedContext ;
283
+ }
284
+
240
285
private static ElasticsearchStatusException indexMetadataNonCompliantRemoteLicense (
241
286
final String leaderIndex , final RemoteClusterLicenseChecker .LicenseCheck licenseCheck ) {
242
287
final String clusterAlias = licenseCheck .remoteClusterLicenseInfo ().clusterAlias ();
0 commit comments