Skip to content

[CCR] Move leader_index and leader_cluster parameters from resume follow to put follow api #34638

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,6 @@ protected boolean preserveClusterUponCompletion() {
return true;
}

public void testResumeFollow() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The resume api now looks up the follow index metadata in order to get the leader index name and leader cluster name and because of that this fails with a different error. We can't execute a put follow to create the follow index, because the leader cluster runs with a basic license. So I don't see a way how to test this in another way.

if (runningAgainstLeaderCluster == false) {
final Request request = new Request("POST", "/follower/_ccr/resume_follow");
request.setJsonEntity("{\"leader_cluster\": \"leader_cluster\", \"leader_index\": \"leader\"}");
assertNonCompliantLicense(request);
}
}

public void testFollow() {
if (runningAgainstLeaderCluster == false) {
final Request request = new Request("PUT", "/follower/_ccr/follow");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void testFollowIndex() throws Exception {
refresh(allowedIndex);
verifyDocuments(adminClient(), allowedIndex, numDocs);
} else {
follow(allowedIndex, allowedIndex);
follow(client(), allowedIndex, allowedIndex);
assertBusy(() -> verifyDocuments(client(), allowedIndex, numDocs));
assertThat(countCcrNodeTasks(), equalTo(1));
assertBusy(() -> verifyCcrMonitoring(allowedIndex, allowedIndex));
Expand All @@ -93,7 +93,7 @@ public void testFollowIndex() throws Exception {
assertThat(countCcrNodeTasks(), equalTo(0));
});

resumeFollow(allowedIndex, allowedIndex);
resumeFollow(allowedIndex);
assertThat(countCcrNodeTasks(), equalTo(1));
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/pause_follow")));
// Make sure that there are no other ccr relates operations running:
Expand All @@ -106,11 +106,11 @@ public void testFollowIndex() throws Exception {

assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_close")));
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/unfollow")));
Exception e = expectThrows(ResponseException.class, () -> resumeFollow(allowedIndex, allowedIndex));
Exception e = expectThrows(ResponseException.class, () -> resumeFollow(allowedIndex));
assertThat(e.getMessage(), containsString("follow index [" + allowedIndex + "] does not have ccr metadata"));

// User does not have manage_follow_index index privilege for 'unallowedIndex':
e = expectThrows(ResponseException.class, () -> follow(unallowedIndex, unallowedIndex));
e = expectThrows(ResponseException.class, () -> follow(client(), unallowedIndex, unallowedIndex));
assertThat(e.getMessage(),
containsString("action [indices:admin/xpack/ccr/put_follow] is unauthorized for user [test_ccr]"));
// Verify that the follow index has not been created and no node tasks are running
Expand All @@ -119,24 +119,28 @@ public void testFollowIndex() throws Exception {

// User does have manage_follow_index index privilege on 'allowed' index,
// but not read / monitor roles on 'disallowed' index:
e = expectThrows(ResponseException.class, () -> follow(unallowedIndex, allowedIndex));
e = expectThrows(ResponseException.class, () -> follow(client(), unallowedIndex, allowedIndex));
assertThat(e.getMessage(), containsString("insufficient privileges to follow index [unallowed-index], " +
"privilege for action [indices:monitor/stats] is missing, " +
"privilege for action [indices:data/read/xpack/ccr/shard_changes] is missing"));
// Verify that the follow index has not been created and no node tasks are running
assertThat(indexExists(adminClient(), unallowedIndex), is(false));
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));

e = expectThrows(ResponseException.class, () -> resumeFollow(unallowedIndex, unallowedIndex));
follow(adminClient(), unallowedIndex, unallowedIndex);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Execute put follow api with admin client, so that we can check whether we fail with the right error when a normal user resumes following an unallowed index.

pauseFollow(adminClient(), unallowedIndex);

e = expectThrows(ResponseException.class, () -> resumeFollow(unallowedIndex));
assertThat(e.getMessage(), containsString("insufficient privileges to follow index [unallowed-index], " +
"privilege for action [indices:monitor/stats] is missing, " +
"privilege for action [indices:data/read/xpack/ccr/shard_changes] is missing"));
assertThat(indexExists(adminClient(), unallowedIndex), is(false));
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));

e = expectThrows(ResponseException.class,
() -> client().performRequest(new Request("POST", "/" + unallowedIndex + "/_ccr/unfollow")));
assertThat(e.getMessage(), containsString("action [indices:admin/xpack/ccr/unfollow] is unauthorized for user [test_ccr]"));
assertOK(adminClient().performRequest(new Request("POST", "/" + unallowedIndex + "/_close")));
assertOK(adminClient().performRequest(new Request("POST", "/" + unallowedIndex + "/_ccr/unfollow")));
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
}
}

Expand Down Expand Up @@ -187,7 +191,7 @@ public void testAutoFollowPatterns() throws Exception {
// Cleanup by deleting auto follow pattern and pause following:
request = new Request("DELETE", "/_ccr/auto_follow/leader_cluster");
assertOK(client().performRequest(request));
pauseFollow(allowedIndex);
pauseFollow(client(), allowedIndex);
}

private int countCcrNodeTasks() throws IOException {
Expand Down Expand Up @@ -228,18 +232,17 @@ private static void refresh(String index) throws IOException {
assertOK(adminClient().performRequest(new Request("POST", "/" + index + "/_refresh")));
}

private static void resumeFollow(String leaderIndex, String followIndex) throws IOException {
private static void resumeFollow(String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/resume_follow");
request.setJsonEntity("{\"leader_cluster\": \"leader_cluster\", \"leader_index\": \"" + leaderIndex +
"\", \"poll_timeout\": \"10ms\"}");
request.setJsonEntity("{\"poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
}

private static void follow(String leaderIndex, String followIndex) throws IOException {
private static void follow(RestClient client, String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow");
request.setJsonEntity("{\"leader_cluster\": \"leader_cluster\", \"leader_index\": \"" + leaderIndex +
"\", \"poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
assertOK(client.performRequest(request));
}

void verifyDocuments(RestClient client, String index, int expectedNumDocs) throws IOException {
Expand Down Expand Up @@ -302,7 +305,7 @@ private static boolean indexExists(RestClient client, String index) throws IOExc
return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode();
}

private static void pauseFollow(String followIndex) throws IOException {
private static void pauseFollow(RestClient client, String followIndex) throws IOException {
assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/pause_follow")));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void testFollowIndex() throws Exception {
assertBusy(() -> verifyDocuments(followIndexName, numDocs));
// unfollow and then follow and then index a few docs in leader index:
pauseFollow(followIndexName);
resumeFollow(leaderIndexName, followIndexName);
resumeFollow(followIndexName);
try (RestClient leaderClient = buildLeaderClient()) {
int id = numDocs;
index(leaderClient, leaderIndexName, Integer.toString(id), "field", id, "filtered_field", "true");
Expand All @@ -84,14 +84,14 @@ public void testFollowIndex() throws Exception {
pauseFollow(followIndexName);
assertOK(client().performRequest(new Request("POST", "/" + followIndexName + "/_close")));
assertOK(client().performRequest(new Request("POST", "/" + followIndexName + "/_ccr/unfollow")));
Exception e = expectThrows(ResponseException.class, () -> resumeFollow(leaderIndexName, followIndexName));
Exception e = expectThrows(ResponseException.class, () -> resumeFollow(followIndexName));
assertThat(e.getMessage(), containsString("follow index [" + followIndexName + "] does not have ccr metadata"));
}
}

public void testFollowNonExistingLeaderIndex() throws Exception {
assumeFalse("Test should only run when both clusters are running", runningAgainstLeaderCluster);
ResponseException e = expectThrows(ResponseException.class, () -> resumeFollow("non-existing-index", "non-existing-index"));
ResponseException e = expectThrows(ResponseException.class, () -> resumeFollow("non-existing-index"));
assertThat(e.getMessage(), containsString("no such index"));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404));

Expand Down Expand Up @@ -151,10 +151,9 @@ private static void refresh(String index) throws IOException {
assertOK(client().performRequest(new Request("POST", "/" + index + "/_refresh")));
}

private static void resumeFollow(String leaderIndex, String followIndex) throws IOException {
private static void resumeFollow(String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/resume_follow");
request.setJsonEntity("{\"leader_cluster\": \"leader_cluster\", \"leader_index\": \"" + leaderIndex +
"\", \"poll_timeout\": \"10ms\"}");
request.setJsonEntity("{\"poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@
- do:
ccr.resume_follow:
index: bar
body:
leader_cluster: local
leader_index: foo
body: {}
- is_true: acknowledged

- do:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
public static final String CCR_CUSTOM_METADATA_KEY = "ccr";
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS = "leader_index_shard_history_uuids";
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY = "leader_index_uuid";
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY = "leader_index_name";
public static final String CCR_CUSTOM_METADATA_LEADER_CLUSTER_NAME_KEY = "leader_cluster_name";

private final boolean enabled;
private final Settings settings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,10 @@ void getLeaderClusterState(final Map<String, String> headers,

@Override
void createAndFollow(Map<String, String> headers,
ResumeFollowAction.Request followRequest,
PutFollowAction.Request request,
Runnable successHandler,
Consumer<Exception> failureHandler) {
Client followerClient = CcrLicenseChecker.wrapClient(client, headers);
PutFollowAction.Request request = new PutFollowAction.Request(followRequest);
followerClient.execute(
PutFollowAction.INSTANCE,
request,
Expand Down Expand Up @@ -243,31 +242,31 @@ void autoFollowIndices() {
int i = 0;
for (Map.Entry<String, AutoFollowPattern> entry : autoFollowMetadata.getPatterns().entrySet()) {
final int slot = i;
final String clusterAlias = entry.getKey();
final String leaderCluster = entry.getKey();
final AutoFollowPattern autoFollowPattern = entry.getValue();

Map<String, String> headers = autoFollowMetadata.getHeaders().get(clusterAlias);
getLeaderClusterState(headers, clusterAlias, (leaderClusterState, e) -> {
Map<String, String> headers = autoFollowMetadata.getHeaders().get(leaderCluster);
getLeaderClusterState(headers, leaderCluster, (leaderClusterState, e) -> {
if (leaderClusterState != null) {
assert e == null;
final List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(clusterAlias);
final List<Index> leaderIndicesToFollow = getLeaderIndicesToFollow(clusterAlias, autoFollowPattern,
final List<String> followedIndices = autoFollowMetadata.getFollowedLeaderIndexUUIDs().get(leaderCluster);
final List<Index> leaderIndicesToFollow = getLeaderIndicesToFollow(leaderCluster, autoFollowPattern,
leaderClusterState, followerClusterState, followedIndices);
if (leaderIndicesToFollow.isEmpty()) {
finalise(slot, new AutoFollowResult(clusterAlias));
finalise(slot, new AutoFollowResult(leaderCluster));
} else {
Consumer<AutoFollowResult> resultHandler = result -> finalise(slot, result);
checkAutoFollowPattern(clusterAlias, autoFollowPattern, leaderIndicesToFollow, headers, resultHandler);
checkAutoFollowPattern(leaderCluster, autoFollowPattern, leaderIndicesToFollow, headers, resultHandler);
}
} else {
finalise(slot, new AutoFollowResult(clusterAlias, e));
finalise(slot, new AutoFollowResult(leaderCluster, e));
}
});
i++;
}
}

private void checkAutoFollowPattern(String clusterAlias,
private void checkAutoFollowPattern(String leaderCluster,
AutoFollowPattern autoFollowPattern,
List<Index> leaderIndicesToFollow,
Map<String, String> headers,
Expand All @@ -278,42 +277,45 @@ private void checkAutoFollowPattern(String clusterAlias,
for (int i = 0; i < leaderIndicesToFollow.size(); i++) {
final Index indexToFollow = leaderIndicesToFollow.get(i);
final int slot = i;
followLeaderIndex(clusterAlias, indexToFollow, autoFollowPattern, headers, error -> {
followLeaderIndex(leaderCluster, indexToFollow, autoFollowPattern, headers, error -> {
results.set(slot, new Tuple<>(indexToFollow, error));
if (leaderIndicesCountDown.countDown()) {
resultHandler.accept(new AutoFollowResult(clusterAlias, results.asList()));
resultHandler.accept(new AutoFollowResult(leaderCluster, results.asList()));
}
});
}
}

private void followLeaderIndex(String clusterAlias,
private void followLeaderIndex(String leaderCluster,
Index indexToFollow,
AutoFollowPattern pattern,
Map<String,String> headers,
Consumer<Exception> onResult) {
final String leaderIndexName = indexToFollow.getName();
final String followIndexName = getFollowerIndexName(pattern, leaderIndexName);

ResumeFollowAction.Request request = new ResumeFollowAction.Request();
request.setLeaderCluster(clusterAlias);
ResumeFollowAction.Request followRequest = new ResumeFollowAction.Request();
followRequest.setFollowerIndex(followIndexName);
followRequest.setMaxBatchOperationCount(pattern.getMaxBatchOperationCount());
followRequest.setMaxConcurrentReadBatches(pattern.getMaxConcurrentReadBatches());
followRequest.setMaxBatchSize(pattern.getMaxBatchSize());
followRequest.setMaxConcurrentWriteBatches(pattern.getMaxConcurrentWriteBatches());
followRequest.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize());
followRequest.setMaxRetryDelay(pattern.getMaxRetryDelay());
followRequest.setPollTimeout(pattern.getPollTimeout());

PutFollowAction.Request request = new PutFollowAction.Request();
request.setLeaderCluster(leaderCluster);
request.setLeaderIndex(indexToFollow.getName());
request.setFollowerIndex(followIndexName);
request.setMaxBatchOperationCount(pattern.getMaxBatchOperationCount());
request.setMaxConcurrentReadBatches(pattern.getMaxConcurrentReadBatches());
request.setMaxBatchSize(pattern.getMaxBatchSize());
request.setMaxConcurrentWriteBatches(pattern.getMaxConcurrentWriteBatches());
request.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize());
request.setMaxRetryDelay(pattern.getMaxRetryDelay());
request.setPollTimeout(pattern.getPollTimeout());
request.setFollowRequest(followRequest);

// Execute if the create and follow api call succeeds:
Runnable successHandler = () -> {
LOGGER.info("Auto followed leader index [{}] as follow index [{}]", leaderIndexName, followIndexName);

// This function updates the auto follow metadata in the cluster to record that the leader index has been followed:
// (so that we do not try to follow it in subsequent auto follow runs)
Function<ClusterState, ClusterState> function = recordLeaderIndexAsFollowFunction(clusterAlias, indexToFollow);
Function<ClusterState, ClusterState> function = recordLeaderIndexAsFollowFunction(leaderCluster, indexToFollow);
// The coordinator always runs on the elected master node, so we can update cluster state here:
updateAutoFollowMetadata(function, onResult);
};
Expand Down Expand Up @@ -391,7 +393,7 @@ abstract void getLeaderClusterState(

abstract void createAndFollow(
Map<String, String> headers,
ResumeFollowAction.Request followRequest,
PutFollowAction.Request followRequest,
Runnable successHandler,
Consumer<Exception> failureHandler
);
Expand Down
Loading