Skip to content

[CCR] Move leader_index and leader_cluster parameters from resume follow to put follow api #34638

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,6 @@ protected boolean preserveClusterUponCompletion() {
return true;
}

public void testResumeFollow() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The resume api now looks up the follow index metadata in order to get the leader index name and leader cluster name and because of that this fails with a different error. We can't execute a put follow to create the follow index, because the leader cluster runs with a basic license. So I don't see a way how to test this in another way.

if (runningAgainstLeaderCluster == false) {
final Request request = new Request("POST", "/follower/_ccr/resume_follow");
request.setJsonEntity("{\"leader_cluster\": \"leader_cluster\", \"leader_index\": \"leader\"}");
assertNonCompliantLicense(request);
}
}

public void testFollow() {
if (runningAgainstLeaderCluster == false) {
final Request request = new Request("PUT", "/follower/_ccr/follow");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void testFollowIndex() throws Exception {
refresh(allowedIndex);
verifyDocuments(adminClient(), allowedIndex, numDocs);
} else {
follow(allowedIndex, allowedIndex);
follow(client(), allowedIndex, allowedIndex);
assertBusy(() -> verifyDocuments(client(), allowedIndex, numDocs));
assertThat(countCcrNodeTasks(), equalTo(1));
assertBusy(() -> verifyCcrMonitoring(allowedIndex, allowedIndex));
Expand All @@ -93,7 +93,7 @@ public void testFollowIndex() throws Exception {
assertThat(countCcrNodeTasks(), equalTo(0));
});

resumeFollow(allowedIndex, allowedIndex);
resumeFollow(allowedIndex);
assertThat(countCcrNodeTasks(), equalTo(1));
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/pause_follow")));
// Make sure that there are no other ccr relates operations running:
Expand All @@ -106,11 +106,11 @@ public void testFollowIndex() throws Exception {

assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_close")));
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/unfollow")));
Exception e = expectThrows(ResponseException.class, () -> resumeFollow(allowedIndex, allowedIndex));
Exception e = expectThrows(ResponseException.class, () -> resumeFollow(allowedIndex));
assertThat(e.getMessage(), containsString("follow index [" + allowedIndex + "] does not have ccr metadata"));

// User does not have manage_follow_index index privilege for 'unallowedIndex':
e = expectThrows(ResponseException.class, () -> follow(unallowedIndex, unallowedIndex));
e = expectThrows(ResponseException.class, () -> follow(client(), unallowedIndex, unallowedIndex));
assertThat(e.getMessage(),
containsString("action [indices:admin/xpack/ccr/put_follow] is unauthorized for user [test_ccr]"));
// Verify that the follow index has not been created and no node tasks are running
Expand All @@ -119,24 +119,28 @@ public void testFollowIndex() throws Exception {

// User does have manage_follow_index index privilege on 'allowed' index,
// but not read / monitor roles on 'disallowed' index:
e = expectThrows(ResponseException.class, () -> follow(unallowedIndex, allowedIndex));
e = expectThrows(ResponseException.class, () -> follow(client(), unallowedIndex, allowedIndex));
assertThat(e.getMessage(), containsString("insufficient privileges to follow index [unallowed-index], " +
"privilege for action [indices:monitor/stats] is missing, " +
"privilege for action [indices:data/read/xpack/ccr/shard_changes] is missing"));
// Verify that the follow index has not been created and no node tasks are running
assertThat(indexExists(adminClient(), unallowedIndex), is(false));
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));

e = expectThrows(ResponseException.class, () -> resumeFollow(unallowedIndex, unallowedIndex));
follow(adminClient(), unallowedIndex, unallowedIndex);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Execute put follow api with admin client, so that we can check whether we fail with the right error when a normal user resumes following an unallowed index.

pauseFollow(adminClient(), unallowedIndex);

e = expectThrows(ResponseException.class, () -> resumeFollow(unallowedIndex));
assertThat(e.getMessage(), containsString("insufficient privileges to follow index [unallowed-index], " +
"privilege for action [indices:monitor/stats] is missing, " +
"privilege for action [indices:data/read/xpack/ccr/shard_changes] is missing"));
assertThat(indexExists(adminClient(), unallowedIndex), is(false));
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));

e = expectThrows(ResponseException.class,
() -> client().performRequest(new Request("POST", "/" + unallowedIndex + "/_ccr/unfollow")));
assertThat(e.getMessage(), containsString("action [indices:admin/xpack/ccr/unfollow] is unauthorized for user [test_ccr]"));
assertOK(adminClient().performRequest(new Request("POST", "/" + unallowedIndex + "/_close")));
assertOK(adminClient().performRequest(new Request("POST", "/" + unallowedIndex + "/_ccr/unfollow")));
assertBusy(() -> assertThat(countCcrNodeTasks(), equalTo(0)));
}
}

Expand Down Expand Up @@ -187,7 +191,7 @@ public void testAutoFollowPatterns() throws Exception {
// Cleanup by deleting auto follow pattern and pause following:
request = new Request("DELETE", "/_ccr/auto_follow/test_pattern");
assertOK(client().performRequest(request));
pauseFollow(allowedIndex);
pauseFollow(client(), allowedIndex);
}

private int countCcrNodeTasks() throws IOException {
Expand Down Expand Up @@ -228,18 +232,17 @@ private static void refresh(String index) throws IOException {
assertOK(adminClient().performRequest(new Request("POST", "/" + index + "/_refresh")));
}

private static void resumeFollow(String leaderIndex, String followIndex) throws IOException {
private static void resumeFollow(String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/resume_follow");
request.setJsonEntity("{\"leader_cluster\": \"leader_cluster\", \"leader_index\": \"" + leaderIndex +
"\", \"poll_timeout\": \"10ms\"}");
request.setJsonEntity("{\"poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
}

private static void follow(String leaderIndex, String followIndex) throws IOException {
private static void follow(RestClient client, String leaderIndex, String followIndex) throws IOException {
final Request request = new Request("PUT", "/" + followIndex + "/_ccr/follow");
request.setJsonEntity("{\"leader_cluster\": \"leader_cluster\", \"leader_index\": \"" + leaderIndex +
"\", \"poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
assertOK(client.performRequest(request));
}

void verifyDocuments(RestClient client, String index, int expectedNumDocs) throws IOException {
Expand Down Expand Up @@ -302,7 +305,7 @@ private static boolean indexExists(RestClient client, String index) throws IOExc
return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode();
}

private static void pauseFollow(String followIndex) throws IOException {
private static void pauseFollow(RestClient client, String followIndex) throws IOException {
assertOK(client().performRequest(new Request("POST", "/" + followIndex + "/_ccr/pause_follow")));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void testFollowIndex() throws Exception {
assertBusy(() -> verifyDocuments(followIndexName, numDocs));
// unfollow and then follow and then index a few docs in leader index:
pauseFollow(followIndexName);
resumeFollow(leaderIndexName, followIndexName);
resumeFollow(followIndexName);
try (RestClient leaderClient = buildLeaderClient()) {
int id = numDocs;
index(leaderClient, leaderIndexName, Integer.toString(id), "field", id, "filtered_field", "true");
Expand All @@ -84,14 +84,14 @@ public void testFollowIndex() throws Exception {
pauseFollow(followIndexName);
assertOK(client().performRequest(new Request("POST", "/" + followIndexName + "/_close")));
assertOK(client().performRequest(new Request("POST", "/" + followIndexName + "/_ccr/unfollow")));
Exception e = expectThrows(ResponseException.class, () -> resumeFollow(leaderIndexName, followIndexName));
Exception e = expectThrows(ResponseException.class, () -> resumeFollow(followIndexName));
assertThat(e.getMessage(), containsString("follow index [" + followIndexName + "] does not have ccr metadata"));
}
}

public void testFollowNonExistingLeaderIndex() throws Exception {
assumeFalse("Test should only run when both clusters are running", runningAgainstLeaderCluster);
ResponseException e = expectThrows(ResponseException.class, () -> resumeFollow("non-existing-index", "non-existing-index"));
ResponseException e = expectThrows(ResponseException.class, () -> resumeFollow("non-existing-index"));
assertThat(e.getMessage(), containsString("no such index"));
assertThat(e.getResponse().getStatusLine().getStatusCode(), equalTo(404));

Expand Down Expand Up @@ -151,10 +151,9 @@ private static void refresh(String index) throws IOException {
assertOK(client().performRequest(new Request("POST", "/" + index + "/_refresh")));
}

private static void resumeFollow(String leaderIndex, String followIndex) throws IOException {
private static void resumeFollow(String followIndex) throws IOException {
final Request request = new Request("POST", "/" + followIndex + "/_ccr/resume_follow");
request.setJsonEntity("{\"leader_cluster\": \"leader_cluster\", \"leader_index\": \"" + leaderIndex +
"\", \"poll_timeout\": \"10ms\"}");
request.setJsonEntity("{\"poll_timeout\": \"10ms\"}");
assertOK(client().performRequest(request));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,7 @@
- do:
ccr.resume_follow:
index: bar
body:
leader_cluster: local
leader_index: foo
body: {}
- is_true: acknowledged

- do:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
public static final String CCR_CUSTOM_METADATA_KEY = "ccr";
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS = "leader_index_shard_history_uuids";
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY = "leader_index_uuid";
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY = "leader_index_name";
public static final String CCR_CUSTOM_METADATA_LEADER_CLUSTER_NAME_KEY = "leader_cluster_name";

private final boolean enabled;
private final Settings settings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,10 @@ void getLeaderClusterState(final Map<String, String> headers,

@Override
void createAndFollow(Map<String, String> headers,
ResumeFollowAction.Request followRequest,
PutFollowAction.Request request,
Runnable successHandler,
Consumer<Exception> failureHandler) {
Client followerClient = CcrLicenseChecker.wrapClient(client, headers);
PutFollowAction.Request request = new PutFollowAction.Request(followRequest);
followerClient.execute(
PutFollowAction.INSTANCE,
request,
Expand Down Expand Up @@ -278,7 +277,7 @@ void autoFollowIndices() {
}

private void checkAutoFollowPattern(String autoFollowPattenName,
String clusterAlias,
String leaderCluster,
AutoFollowPattern autoFollowPattern,
List<Index> leaderIndicesToFollow,
Map<String, String> headers,
Expand All @@ -302,7 +301,7 @@ private void checkAutoFollowPattern(String autoFollowPattenName,
resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList()));
}
} else {
followLeaderIndex(autoFollowPattenName, clusterAlias, indexToFollow, autoFollowPattern, headers, error -> {
followLeaderIndex(autoFollowPattenName, leaderCluster, indexToFollow, autoFollowPattern, headers, error -> {
results.set(slot, new Tuple<>(indexToFollow, error));
if (leaderIndicesCountDown.countDown()) {
resultHandler.accept(new AutoFollowResult(autoFollowPattenName, results.asList()));
Expand All @@ -314,25 +313,28 @@ private void checkAutoFollowPattern(String autoFollowPattenName,
}

private void followLeaderIndex(String autoFollowPattenName,
String clusterAlias,
String leaderCluster,
Index indexToFollow,
AutoFollowPattern pattern,
Map<String,String> headers,
Consumer<Exception> onResult) {
final String leaderIndexName = indexToFollow.getName();
final String followIndexName = getFollowerIndexName(pattern, leaderIndexName);

ResumeFollowAction.Request request = new ResumeFollowAction.Request();
request.setLeaderCluster(clusterAlias);
ResumeFollowAction.Request followRequest = new ResumeFollowAction.Request();
followRequest.setFollowerIndex(followIndexName);
followRequest.setMaxBatchOperationCount(pattern.getMaxBatchOperationCount());
followRequest.setMaxConcurrentReadBatches(pattern.getMaxConcurrentReadBatches());
followRequest.setMaxBatchSize(pattern.getMaxBatchSize());
followRequest.setMaxConcurrentWriteBatches(pattern.getMaxConcurrentWriteBatches());
followRequest.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize());
followRequest.setMaxRetryDelay(pattern.getMaxRetryDelay());
followRequest.setPollTimeout(pattern.getPollTimeout());

PutFollowAction.Request request = new PutFollowAction.Request();
request.setLeaderCluster(leaderCluster);
request.setLeaderIndex(indexToFollow.getName());
request.setFollowerIndex(followIndexName);
request.setMaxBatchOperationCount(pattern.getMaxBatchOperationCount());
request.setMaxConcurrentReadBatches(pattern.getMaxConcurrentReadBatches());
request.setMaxBatchSize(pattern.getMaxBatchSize());
request.setMaxConcurrentWriteBatches(pattern.getMaxConcurrentWriteBatches());
request.setMaxWriteBufferSize(pattern.getMaxWriteBufferSize());
request.setMaxRetryDelay(pattern.getMaxRetryDelay());
request.setPollTimeout(pattern.getPollTimeout());
request.setFollowRequest(followRequest);

// Execute if the create and follow api call succeeds:
Runnable successHandler = () -> {
Expand Down Expand Up @@ -418,7 +420,7 @@ abstract void getLeaderClusterState(

abstract void createAndFollow(
Map<String, String> headers,
ResumeFollowAction.Request followRequest,
PutFollowAction.Request followRequest,
Runnable successHandler,
Consumer<Exception> failureHandler
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,11 @@ protected void masterOperation(
listener.onFailure(LicenseUtils.newComplianceException("ccr"));
return;
}
String leaderCluster = request.getFollowRequest().getLeaderCluster();
String leaderCluster = request.getLeaderCluster();
// Validates whether the leader cluster has been configured properly:
client.getRemoteClusterClient(leaderCluster);

String leaderIndex = request.getFollowRequest().getLeaderIndex();
String leaderIndex = request.getLeaderIndex();
createFollowerIndexAndFollowRemoteIndex(request, leaderCluster, leaderIndex, listener);
}

Expand All @@ -121,8 +121,7 @@ private void createFollowerIndex(
final PutFollowAction.Request request,
final ActionListener<PutFollowAction.Response> listener) {
if (leaderIndexMetaData == null) {
listener.onFailure(new IllegalArgumentException("leader index [" + request.getFollowRequest().getLeaderIndex() +
"] does not exist"));
listener.onFailure(new IllegalArgumentException("leader index [" + request.getLeaderIndex() + "] does not exist"));
return;
}

Expand Down Expand Up @@ -159,6 +158,8 @@ public ClusterState execute(final ClusterState currentState) throws Exception {
Map<String, String> metadata = new HashMap<>();
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, String.join(",", historyUUIDs));
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY, leaderIndexMetaData.getIndexUUID());
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY, leaderIndexMetaData.getIndex().getName());
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_CLUSTER_NAME_KEY, request.getLeaderCluster());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By remember this two additional key value pairs the resume api no longer needs to accept a leader index / cluster parameters.

The leader index name isn't really necessary as the leader index uuid can also be used to lookup the leader index metadata in the leader cluster. The CcrLicenseChecker#checkRemoteClusterLicenseAndFetchLeaderIndexMetadataAndHistoryUUIDs(...) only accepts a leader index name now. Put follow api also uses this method and that would mean I another version of this method is needed to knows how to work with index uuid. I found that this would make CcrLicenseChecker too complicated, so I decided to remember the leader index name inside ccr custom index metadata instead.

imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, metadata);

// Copy all settings, but overwrite a few settings.
Expand Down
Loading