Skip to content

Commit a69ae6b

Browse files
authored
[CCR] Add metadata to keep track of the index uuid of the leader index in the follow index (#33367)
The follow index api checks if the recorded uuid in the follow index matches with uuid of the leader index and fails otherwise. This validation will prevent a follow index from following an incompatible leader index. The create_and_follow api will automatically add this custom index metadata when it creates the follow index. Closes #31505
1 parent 7d3b99a commit a69ae6b

File tree

5 files changed

+92
-56
lines changed

5 files changed

+92
-56
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
8787
public static final String CCR_THREAD_POOL_NAME = "ccr";
8888
public static final String CCR_CUSTOM_METADATA_KEY = "ccr";
8989
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS = "leader_index_shard_history_uuids";
90+
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY = "leader_index_uuid";
9091

9192
private final boolean enabled;
9293
private final Settings settings;

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportCreateAndFollowIndexAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ public ClusterState execute(final ClusterState currentState) throws Exception {
183183
// Adding the leader index uuid for each shard as custom metadata:
184184
Map<String, String> metadata = new HashMap<>();
185185
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, String.join(",", historyUUIDs));
186+
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY, leaderIndexMetaData.getIndexUUID());
186187
imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, metadata);
187188

188189
// Copy all settings, but overwrite a few settings.

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,14 @@ static void validate(
245245
if (followIndex == null) {
246246
throw new IllegalArgumentException("follow index [" + request.getFollowerIndex() + "] does not exist");
247247
}
248+
String leaderIndexUUID = leaderIndex.getIndex().getUUID();
249+
String recordedLeaderIndexUUID = followIndex
250+
.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY)
251+
.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY);
252+
if (leaderIndexUUID.equals(recordedLeaderIndexUUID) == false) {
253+
throw new IllegalArgumentException("follow index [" + request.getFollowerIndex() + "] should reference [" + leaderIndexUUID +
254+
"] as leader index but instead reference [" + recordedLeaderIndexUUID + "] as leader index");
255+
}
248256

249257
String[] recordedHistoryUUIDs = extractIndexShardHistoryUUIDs(followIndex);
250258
assert recordedHistoryUUIDs.length == leaderIndexHistoryUUID.length;

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/ShardChangesIT.java

Lines changed: 42 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,13 @@
3939
import org.elasticsearch.test.ESIntegTestCase;
4040
import org.elasticsearch.test.MockHttpTransport;
4141
import org.elasticsearch.test.discovery.TestZenDiscovery;
42-
import org.elasticsearch.test.junit.annotations.TestLogging;
43-
import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
44-
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
4542
import org.elasticsearch.xpack.ccr.action.ShardChangesAction;
4643
import org.elasticsearch.xpack.ccr.action.ShardFollowTask;
47-
import org.elasticsearch.xpack.core.ccr.action.UnfollowIndexAction;
4844
import org.elasticsearch.xpack.core.XPackSettings;
4945
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
46+
import org.elasticsearch.xpack.core.ccr.action.CreateAndFollowIndexAction;
47+
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
48+
import org.elasticsearch.xpack.core.ccr.action.UnfollowIndexAction;
5049

5150
import java.io.IOException;
5251
import java.util.Arrays;
@@ -314,10 +313,7 @@ public void testFollowIndexAndCloseNode() throws Exception {
314313
internalCluster().ensureAtLeastNumDataNodes(3);
315314
String leaderIndexSettings = getIndexSettings(3, 1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
316315
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
317-
318-
String followerIndexSettings = getIndexSettings(3, 1, singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true"));
319-
assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON));
320-
ensureGreen("index1", "index2");
316+
ensureGreen("index1");
321317

322318
AtomicBoolean run = new AtomicBoolean(true);
323319
Thread thread = new Thread(() -> {
@@ -339,7 +335,7 @@ public void testFollowIndexAndCloseNode() throws Exception {
339335
final FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", randomIntBetween(32, 2048),
340336
randomIntBetween(2, 10), Long.MAX_VALUE, randomIntBetween(2, 10),
341337
FollowIndexAction.DEFAULT_MAX_WRITE_BUFFER_SIZE, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
342-
client().execute(FollowIndexAction.INSTANCE, followRequest).get();
338+
client().execute(CreateAndFollowIndexAction.INSTANCE, new CreateAndFollowIndexAction.Request(followRequest)).get();
343339

344340
long maxNumDocsReplicated = Math.min(1000, randomLongBetween(followRequest.getMaxBatchOperationCount(),
345341
followRequest.getMaxBatchOperationCount() * 10));
@@ -416,34 +412,6 @@ public void testFollowNonExistentIndex() throws Exception {
416412
expectThrows(IndexNotFoundException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest3).actionGet());
417413
}
418414

419-
@TestLogging("_root:DEBUG")
420-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33379")
421-
public void testValidateFollowingIndexSettings() throws Exception {
422-
assertAcked(client().admin().indices().prepareCreate("test-leader")
423-
.setSettings(Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)));
424-
// TODO: indexing should be optional but the current mapping logic requires for now.
425-
client().prepareIndex("test-leader", "doc", "id").setSource("{\"f\": \"v\"}", XContentType.JSON).get();
426-
assertAcked(client().admin().indices().prepareCreate("test-follower").get());
427-
IllegalArgumentException followError = expectThrows(IllegalArgumentException.class, () -> client().execute(
428-
FollowIndexAction.INSTANCE, createFollowRequest("test-leader", "test-follower")).actionGet());
429-
assertThat(followError.getMessage(), equalTo("the following index [test-follower] is not ready to follow;" +
430-
" the setting [index.xpack.ccr.following_index] must be enabled."));
431-
// updating the `following_index` with an open index must not be allowed.
432-
IllegalArgumentException updateError = expectThrows(IllegalArgumentException.class, () -> {
433-
client().admin().indices().prepareUpdateSettings("test-follower")
434-
.setSettings(Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)).get();
435-
});
436-
assertThat(updateError.getMessage(), containsString("Can't update non dynamic settings " +
437-
"[[index.xpack.ccr.following_index]] for open indices [[test-follower/"));
438-
assertAcked(client().admin().indices().prepareClose("test-follower"));
439-
assertAcked(client().admin().indices().prepareUpdateSettings("test-follower")
440-
.setSettings(Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)));
441-
assertAcked(client().admin().indices().prepareOpen("test-follower"));
442-
assertAcked(client().execute(FollowIndexAction.INSTANCE,
443-
createFollowRequest("test-leader", "test-follower")).actionGet());
444-
unfollowIndex("test-follower");
445-
}
446-
447415
public void testFollowIndex_lowMaxTranslogBytes() throws Exception {
448416
final String leaderIndexSettings = getIndexSettings(1, between(0, 1),
449417
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
@@ -478,6 +446,37 @@ public void testFollowIndex_lowMaxTranslogBytes() throws Exception {
478446
unfollowIndex("index2");
479447
}
480448

449+
public void testDontFollowTheWrongIndex() throws Exception {
450+
String leaderIndexSettings = getIndexSettings(1, 0,
451+
Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
452+
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
453+
ensureGreen("index1");
454+
assertAcked(client().admin().indices().prepareCreate("index3").setSource(leaderIndexSettings, XContentType.JSON));
455+
ensureGreen("index3");
456+
457+
FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", 1024, 1, 1024L,
458+
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
459+
CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
460+
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
461+
462+
followRequest = new FollowIndexAction.Request("index3", "index4", 1024, 1, 1024L,
463+
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
464+
createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
465+
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
466+
unfollowIndex("index2", "index4");
467+
468+
FollowIndexAction.Request wrongRequest1 = new FollowIndexAction.Request("index1", "index4", 1024, 1, 1024L,
469+
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
470+
Exception e = expectThrows(IllegalArgumentException.class,
471+
() -> client().execute(FollowIndexAction.INSTANCE, wrongRequest1).actionGet());
472+
assertThat(e.getMessage(), containsString("follow index [index4] should reference"));
473+
474+
FollowIndexAction.Request wrongRequest2 = new FollowIndexAction.Request("index3", "index2", 1024, 1, 1024L,
475+
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
476+
e = expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, wrongRequest2).actionGet());
477+
assertThat(e.getMessage(), containsString("follow index [index2] should reference"));
478+
}
479+
481480
private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, final Map<ShardId, Long> numDocsPerShard) {
482481
return () -> {
483482
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
@@ -514,10 +513,12 @@ private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, f
514513
};
515514
}
516515

517-
private void unfollowIndex(String index) throws Exception {
518-
final UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request();
519-
unfollowRequest.setFollowIndex(index);
520-
client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get();
516+
private void unfollowIndex(String... indices) throws Exception {
517+
for (String index : indices) {
518+
final UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request();
519+
unfollowRequest.setFollowIndex(index);
520+
client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get();
521+
}
521522
assertBusy(() -> {
522523
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
523524
final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);

0 commit comments

Comments
 (0)