Skip to content

Commit ed817fb

Browse files
authored
[CCR] Move leader_index and leader_cluster parameters from resume follow to put follow api (#34638)
As part of this change the leader index name and leader cluster name are stored in the CCR metadata in the follow index. The resume follow api will read that when a resume follow request is executed.
1 parent c447fc2 commit ed817fb

File tree

19 files changed

+305
-263
lines changed

19 files changed

+305
-263
lines changed

x-pack/plugin/ccr/qa/multi-cluster-with-non-compliant-license/src/test/java/org/elasticsearch/xpack/ccr/CcrMultiClusterLicenseIT.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,6 @@ protected boolean preserveClusterUponCompletion() {
3131
return true;
3232
}
3333

34-
public void testResumeFollow() {
35-
if (runningAgainstLeaderCluster == false) {
36-
final Request request = new Request("POST", "/follower/_ccr/resume_follow");
37-
request.setJsonEntity("{\"leader_cluster\": \"leader_cluster\", \"leader_index\": \"leader\"}");
38-
assertNonCompliantLicense(request);
39-
}
40-
}
41-
4234
public void testFollow() {
4335
if (runningAgainstLeaderCluster == false) {
4436
final Request request = new Request("PUT", "/follower/_ccr/follow");

x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public void testFollowIndex() throws Exception {
8080
refresh(allowedIndex);
8181
verifyDocuments(adminClient(), allowedIndex, numDocs);
8282
} else {
83-
follow(allowedIndex, allowedIndex);
83+
follow(client(), allowedIndex, allowedIndex);
8484
assertBusy(() -> verifyDocuments(client(), allowedIndex, numDocs));
8585
assertThat(countCcrNodeTasks(), equalTo(1));
8686
assertBusy(() -> verifyCcrMonitoring(allowedIndex, allowedIndex));
@@ -93,7 +93,7 @@ public void testFollowIndex() throws Exception {
9393
assertThat(countCcrNodeTasks(), equalTo(0));
9494
});
9595

96-
resumeFollow(allowedIndex, allowedIndex);
96+
resumeFollow(allowedIndex);
9797
assertThat(countCcrNodeTasks(), equalTo(1));
9898
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/pause_follow")));
9999
// Make sure that there are no other ccr relates operations running:
@@ -106,11 +106,11 @@ public void testFollowIndex() throws Exception {
106106

107107
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_close")));
108108
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/unfollow")));
109-
Exception e = expectThrows(ResponseException.class, () -> resumeFollow(allowedIndex, allowedIndex));
109+
Exception e = expectThrows(ResponseException.class, () -> resumeFollow(allowedIndex));
110110
assertThat(e.getMessage(), containsString("follow index [" + allowedIndex + "] does not have ccr metadata"));
111111

112112
// User does not have manage_follow_index index privilege for 'unallowedIndex':
113-
e = expectThrows(ResponseException.class, () -> follow(unallowedIndex, unallowedIndex));
113+
e = expectThrows(ResponseException.class, () -> follow(client(), unallowedIndex, unallowedIndex));
114114
assertThat(e.getMessage(),
115115
containsString("action [indices:admin/xpack/ccr/put_follow] is unauthorized for user [test_ccr]"));
116116
// Verify that the follow index has not been created and no node tasks are running
@@ -119,24 +119,28 @@ public void testFollowIndex() throws Exception {
119119

120120
// User does have manage_follow_index index privilege on 'allowed' index,
121121
// but not read / monitor roles on 'disallowed' index:
122-
e = expectThrows(ResponseException.class, () -> follow(unallowedIndex, allowedIndex));
122+
e = expectThrows(ResponseException.class, () -> follow(client(), unallowedIndex, allowedIndex));
123123
assertThat(e.getMessage(), containsString("insufficient privileges to follow index [unallowed-index], " +
124124
"privilege for action [indices:monitor/stats] is missing, " +
125125
"privilege for action [indices:data/read/xpack/ccr/shard_changes] is missing"));
126126
// Verify that the follow index has not been created and no node tasks are running
127127
assertThat(indexExists(adminClient(), unallowedIndex), is(false));
128128
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
129129

130-
e = expectThrows(ResponseException.class, () -> resumeFollow(unallowedIndex, unallowedIndex));
130+
follow(adminClient(), unallowedIndex, unallowedIndex);
131+
pauseFollow(adminClient(), unallowedIndex);
132+
133+
e = expectThrows(ResponseException.class, () -> resumeFollow(unallowedIndex));
131134
assertThat(e.getMessage(), containsString("insufficient privileges to follow index [unallowed-index], " +
132135
"privilege for action [indices:monitor/stats] is missing, " +
133136
"privilege for action [indices:data/read/xpack/ccr/shard_changes] is missing"));
134-
assertThat(indexExists(adminClient(), unallowedIndex), is(false));
135-
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
136137

137138
e = expectThrows(ResponseException.class,
138139
() -> client().performRequest(new Request("POST", "/" + unallowedIndex + "/_ccr/unfollow")));
139140
assertThat(e.getMessage(), containsString("action [indices:admin/xpack/ccr/unfollow] is unauthorized for user [test_ccr]"));
141+
assertOK(adminClient().performRequest(new Request("POST", "/" + unallowedIndex + "/_close")));
142+
assertOK(adminClient().performRequest(new Request("POST", "/" + unallowedIndex + "/_ccr/unfollow")));
143+
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
140144
}
141145
}
142146

@@ -187,7 +191,7 @@ public void testAutoFollowPatterns() throws Exception {
187191
// Cleanup by deleting auto follow pattern and pause following:
188192
request = new Request("DELETE", "/_ccr/auto_follow/test_pattern");
189193
assertOK(client().performRequest(request));
190-
pauseFollow(allowedIndex);
194+
pauseFollow(client(), allowedIndex);
191195
}
192196

193197
private int countCcrNodeTasks() throws IOException {
@@ -228,18 +232,17 @@ private static void refresh(String index) throws IOException {
228232
assertOK(adminClient().performRequest(new Request("POST", "/" + index + "/_refresh")));
229233
}
230234

231-
private static void resumeFollow(String leaderIndex, String followIndex) throws IOException {
235+
private static void resumeFollow(String followIndex) throws IOException {
232236
final Request request = new Request("POST", "/" + followIndex + "/_ccr/resume_follow");
233-
request.setJsonEntity("{\"leader_cluster\": \"leader_cluster\", \"leader_index\": \"" + leaderIndex +
234-
"\", \"poll_timeout\": \"10ms\"}");
237+
request.setJsonEntity("{\"poll_timeout\": \"10ms\"}");
235238
assertOK(client().performRequest(request));
236239
}
237240

238-
private static void follow(String leaderIndex, String followIndex) throws IOException {
241+
private static void follow(RestClient client, String leaderIndex, String followIndex) throws IOException {
239242
final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow");
240243
request.setJsonEntity("{\"leader_cluster\": \"leader_cluster\", \"leader_index\": \"" + leaderIndex +
241244
"\", \"poll_timeout\": \"10ms\"}");
242-
assertOK(client().performRequest(request));
245+
assertOK(client.performRequest(request));
243246
}
244247

245248
void verifyDocuments(RestClient client, String index, int expectedNumDocs) throws IOException {
@@ -302,7 +305,7 @@ private static boolean indexExists(RestClient client, String index) throws IOExc
302305
return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode();
303306
}
304307

305-
private static void pauseFollow(String followIndex) throws IOException {
308+
private static void pauseFollow(RestClient client, String followIndex) throws IOException {
306309
assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/pause_follow")));
307310
}
308311

x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public void testFollowIndex() throws Exception {
7171
assertBusy(() -> verifyDocuments(followIndexName, numDocs));
7272
// unfollow and then follow and then index a few docs in leader index:
7373
pauseFollow(followIndexName);
74-
resumeFollow(leaderIndexName, followIndexName);
74+
resumeFollow(followIndexName);
7575
try (RestClient leaderClient = buildLeaderClient()) {
7676
int id = numDocs;
7777
index(leaderClient, leaderIndexName, Integer.toString(id), "field", id, "filtered_field", "true");
@@ -84,14 +84,14 @@ public void testFollowIndex() throws Exception {
8484
pauseFollow(followIndexName);
8585
assertOK(client().performRequest(new Request("POST", "/" + followIndexName + "/_close")));
8686
assertOK(client().performRequest(new Request("POST", "/" + followIndexName + "/_ccr/unfollow")));
87-
Exception e = expectThrows(ResponseException.class, () -> resumeFollow(leaderIndexName, followIndexName));
87+
Exception e = expectThrows(ResponseException.class, () -> resumeFollow(followIndexName));
8888
assertThat(e.getMessage(), containsString("follow index [" + followIndexName + "] does not have ccr metadata"));
8989
}
9090
}
9191

9292
public void testFollowNonExistingLeaderIndex() throws Exception {
9393
assumeFalse("Test should only run when both clusters are running", runningAgainstLeaderCluster);
94-
ResponseException e = expectThrows(ResponseException.class, () -> resumeFollow("non-existing-index", "non-existing-index"));
94+
ResponseException e = expectThrows(ResponseException.class, () -> resumeFollow("non-existing-index"));
9595
assertThat(e.getMessage(), containsString("no such index"));
9696
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404));
9797

@@ -151,10 +151,9 @@ private static void refresh(String index) throws IOException {
151151
assertOK(client().performRequest(new Request("POST", "/" + index + "/_refresh")));
152152
}
153153

154-
private static void resumeFollow(String leaderIndex, String followIndex) throws IOException {
154+
private static void resumeFollow(String followIndex) throws IOException {
155155
final Request request = new Request("POST", "/" + followIndex + "/_ccr/resume_follow");
156-
request.setJsonEntity("{\"leader_cluster\": \"leader_cluster\", \"leader_index\": \"" + leaderIndex +
157-
"\", \"poll_timeout\": \"10ms\"}");
156+
request.setJsonEntity("{\"poll_timeout\": \"10ms\"}");
158157
assertOK(client().performRequest(request));
159158
}
160159

x-pack/plugin/ccr/qa/rest/src/test/resources/rest-api-spec/test/ccr/follow_and_unfollow.yml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,7 @@
5252
- do:
5353
ccr.resume_follow:
5454
index: bar
55-
body:
56-
leader_cluster: local
57-
leader_index: foo
55+
body: {}
5856
- is_true: acknowledged
5957

6058
- do:

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
9797
public static final String CCR_CUSTOM_METADATA_KEY = "ccr";
9898
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS = "leader_index_shard_history_uuids";
9999
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY = "leader_index_uuid";
100+
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY = "leader_index_name";
101+
public static final String CCR_CUSTOM_METADATA_LEADER_CLUSTER_NAME_KEY = "leader_cluster_name";
100102

101103
private final boolean enabled;
102104
private final Settings settings;

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/AutoFollowCoordinator.java

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -176,11 +176,10 @@ void getLeaderClusterState(final Map<String, String> headers,
176176

177177
@Override
178178
void createAndFollow(Map<String, String> headers,
179-
ResumeFollowAction.Request followRequest,
179+
PutFollowAction.Request request,
180180
Runnable successHandler,
181181
Consumer<Exception> failureHandler) {
182182
Client followerClient = CcrLicenseChecker.wrapClient(client, headers);
183-
PutFollowAction.Request request = new PutFollowAction.Request(followRequest);
184183
followerClient.execute(
185184
PutFollowAction.INSTANCE,
186185
request,
@@ -278,7 +277,7 @@ void autoFollowIndices() {
278277
}
279278

280279
private void checkAutoFollowPattern(String autoFollowPattenName,
281-
String clusterAlias,
280+
String leaderCluster,
282281
AutoFollowPattern autoFollowPattern,
283282
List<Index> leaderIndicesToFollow,
284283
Map<String, String> headers,
@@ -302,7 +301,7 @@ private void checkAutoFollowPattern(String autoFollowPattenName,
302301
resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList()));
303302
}
304303
} else {
305-
followLeaderIndex(autoFollowPattenName, clusterAlias, indexToFollow, autoFollowPattern, headers, error -> {
304+
followLeaderIndex(autoFollowPattenName, leaderCluster, indexToFollow, autoFollowPattern, headers, error -> {
306305
results.set(slot, new Tuple<>(indexToFollow, error));
307306
if (leaderIndicesCountDown.countDown()) {
308307
resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList()));
@@ -314,25 +313,28 @@ private void checkAutoFollowPattern(String autoFollowPattenName,
314313
}
315314

316315
private void followLeaderIndex(String autoFollowPattenName,
317-
String clusterAlias,
316+
String leaderCluster,
318317
Index indexToFollow,
319318
AutoFollowPattern pattern,
320319
Map<String,String> headers,
321320
Consumer<Exception> onResult) {
322321
final String leaderIndexName = indexToFollow.getName();
323322
final String followIndexName = getFollowerIndexName(pattern, leaderIndexName);
324323

325-
ResumeFollowAction.Request request = new ResumeFollowAction.Request();
326-
request.setLeaderCluster(clusterAlias);
324+
ResumeFollowAction.Request followRequest = new ResumeFollowAction.Request();
325+
followRequest.setFollowerIndex(followIndexName);
326+
followRequest.setMaxBatchOperationCount(pattern.getMaxBatchOperationCount());
327+
followRequest.setMaxConcurrentReadBatches(pattern.getMaxConcurrentReadBatches());
328+
followRequest.setMaxBatchSize(pattern.getMaxBatchSize());
329+
followRequest.setMaxConcurrentWriteBatches(pattern.getMaxConcurrentWriteBatches());
330+
followRequest.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize());
331+
followRequest.setMaxRetryDelay(pattern.getMaxRetryDelay());
332+
followRequest.setPollTimeout(pattern.getPollTimeout());
333+
334+
PutFollowAction.Request request = new PutFollowAction.Request();
335+
request.setLeaderCluster(leaderCluster);
327336
request.setLeaderIndex(indexToFollow.getName());
328-
request.setFollowerIndex(followIndexName);
329-
request.setMaxBatchOperationCount(pattern.getMaxBatchOperationCount());
330-
request.setMaxConcurrentReadBatches(pattern.getMaxConcurrentReadBatches());
331-
request.setMaxBatchSize(pattern.getMaxBatchSize());
332-
request.setMaxConcurrentWriteBatches(pattern.getMaxConcurrentWriteBatches());
333-
request.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize());
334-
request.setMaxRetryDelay(pattern.getMaxRetryDelay());
335-
request.setPollTimeout(pattern.getPollTimeout());
337+
request.setFollowRequest(followRequest);
336338

337339
// Execute if the create and follow api call succeeds:
338340
Runnable successHandler = () -> {
@@ -418,7 +420,7 @@ abstract void getLeaderClusterState(
418420

419421
abstract void createAndFollow(
420422
Map<String, String> headers,
421-
ResumeFollowAction.Request followRequest,
423+
PutFollowAction.Request followRequest,
422424
Runnable successHandler,
423425
Consumer<Exception> failureHandler
424426
);

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutFollowAction.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,11 +95,11 @@ protected void masterOperation(
9595
listener.onFailure(LicenseUtils.newComplianceException("ccr"));
9696
return;
9797
}
98-
String leaderCluster = request.getFollowRequest().getLeaderCluster();
98+
String leaderCluster = request.getLeaderCluster();
9999
// Validates whether the leader cluster has been configured properly:
100100
client.getRemoteClusterClient(leaderCluster);
101101

102-
String leaderIndex = request.getFollowRequest().getLeaderIndex();
102+
String leaderIndex = request.getLeaderIndex();
103103
createFollowerIndexAndFollowRemoteIndex(request, leaderCluster, leaderIndex, listener);
104104
}
105105

@@ -122,8 +122,7 @@ private void createFollowerIndex(
122122
final PutFollowAction.Request request,
123123
final ActionListener<PutFollowAction.Response> listener) {
124124
if (leaderIndexMetaData == null) {
125-
listener.onFailure(new IllegalArgumentException("leader index [" + request.getFollowRequest().getLeaderIndex() +
126-
"] does not exist"));
125+
listener.onFailure(new IllegalArgumentException("leader index [" + request.getLeaderIndex() + "] does not exist"));
127126
return;
128127
}
129128

@@ -160,6 +159,8 @@ public ClusterState execute(final ClusterState currentState) throws Exception {
160159
Map<String, String> metadata = new HashMap<>();
161160
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, String.join(",", historyUUIDs));
162161
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY, leaderIndexMetaData.getIndexUUID());
162+
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY, leaderIndexMetaData.getIndex().getName());
163+
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_CLUSTER_NAME_KEY, request.getLeaderCluster());
163164
imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, metadata);
164165

165166
// Copy all settings, but overwrite a few settings.

0 commit comments

Comments
 (0)