Skip to content

Commit d8feff5

Browse files
committed
Fix index filtering in follow info api. (#37752)
The filtering by follower index was completely broken. Also the wrong persistent tasks were selected, causing the wrong status to be reported. Closes #37738
1 parent 7dbab69 commit d8feff5

File tree

4 files changed

+228
-12
lines changed

4 files changed

+228
-12
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowInfoAction.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -67,19 +67,28 @@ protected void masterOperation(FollowInfoAction.Request request,
6767
List<String> concreteFollowerIndices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(state,
6868
IndicesOptions.STRICT_EXPAND_OPEN_CLOSED, request.getFollowerIndices()));
6969

70+
List<FollowerInfo> followerInfos = getFollowInfos(concreteFollowerIndices, state);
71+
listener.onResponse(new FollowInfoAction.Response(followerInfos));
72+
}
73+
74+
@Override
75+
protected ClusterBlockException checkBlock(FollowInfoAction.Request request, ClusterState state) {
76+
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
77+
}
7078

79+
static List<FollowerInfo> getFollowInfos(List<String> concreteFollowerIndices, ClusterState state) {
7180
List<FollowerInfo> followerInfos = new ArrayList<>();
7281
PersistentTasksCustomMetaData persistentTasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
7382

74-
for (IndexMetaData indexMetaData : state.metaData()) {
83+
for (String index : concreteFollowerIndices) {
84+
IndexMetaData indexMetaData = state.metaData().index(index);
7585
Map<String, String> ccrCustomData = indexMetaData.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
7686
if (ccrCustomData != null) {
7787
Optional<ShardFollowTask> result;
7888
if (persistentTasks != null) {
79-
result = persistentTasks.taskMap().values().stream()
80-
.map(persistentTask -> (ShardFollowTask) persistentTask.getParams())
81-
.filter(shardFollowTask -> concreteFollowerIndices.isEmpty() ||
82-
concreteFollowerIndices.contains(shardFollowTask.getFollowShardId().getIndexName()))
89+
result = persistentTasks.findTasks(ShardFollowTask.NAME, task -> true).stream()
90+
.map(task -> (ShardFollowTask) task.getParams())
91+
.filter(shardFollowTask -> index.equals(shardFollowTask.getFollowShardId().getIndexName()))
8392
.findAny();
8493
} else {
8594
result = Optional.empty();
@@ -109,11 +118,6 @@ protected void masterOperation(FollowInfoAction.Request request,
109118
}
110119
}
111120

112-
listener.onResponse(new FollowInfoAction.Response(followerInfos));
113-
}
114-
115-
@Override
116-
protected ClusterBlockException checkBlock(FollowInfoAction.Request request, ClusterState state) {
117-
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
121+
return followerInfos;
118122
}
119123
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.ccr;
8+
9+
import org.elasticsearch.common.xcontent.XContentType;
10+
import org.elasticsearch.index.IndexNotFoundException;
11+
import org.elasticsearch.index.IndexSettings;
12+
import org.elasticsearch.xpack.CcrSingleNodeTestCase;
13+
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction;
14+
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
15+
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
16+
17+
import java.util.Comparator;
18+
19+
import static java.util.Collections.singletonMap;
20+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
21+
import static org.elasticsearch.xpack.ccr.LocalIndexFollowingIT.getIndexSettings;
22+
import static org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.Status;
23+
import static org.hamcrest.Matchers.equalTo;
24+
import static org.hamcrest.Matchers.notNullValue;
25+
import static org.hamcrest.Matchers.nullValue;
26+
27+
public class FollowInfoIT extends CcrSingleNodeTestCase {
28+
29+
public void testFollowInfoApiFollowerIndexFiltering() throws Exception {
30+
final String leaderIndexSettings = getIndexSettings(1, 0,
31+
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
32+
assertAcked(client().admin().indices().prepareCreate("leader1").setSource(leaderIndexSettings, XContentType.JSON));
33+
ensureGreen("leader1");
34+
assertAcked(client().admin().indices().prepareCreate("leader2").setSource(leaderIndexSettings, XContentType.JSON));
35+
ensureGreen("leader2");
36+
37+
PutFollowAction.Request followRequest = getPutFollowRequest("leader1", "follower1");
38+
client().execute(PutFollowAction.INSTANCE, followRequest).get();
39+
40+
followRequest = getPutFollowRequest("leader2", "follower2");
41+
client().execute(PutFollowAction.INSTANCE, followRequest).get();
42+
43+
FollowInfoAction.Request request = new FollowInfoAction.Request();
44+
request.setFollowerIndices("follower1");
45+
FollowInfoAction.Response response = client().execute(FollowInfoAction.INSTANCE, request).actionGet();
46+
assertThat(response.getFollowInfos().size(), equalTo(1));
47+
assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower1"));
48+
assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader1"));
49+
assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.ACTIVE));
50+
assertThat(response.getFollowInfos().get(0).getParameters(), notNullValue());
51+
52+
request = new FollowInfoAction.Request();
53+
request.setFollowerIndices("follower2");
54+
response = client().execute(FollowInfoAction.INSTANCE, request).actionGet();
55+
assertThat(response.getFollowInfos().size(), equalTo(1));
56+
assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower2"));
57+
assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader2"));
58+
assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.ACTIVE));
59+
assertThat(response.getFollowInfos().get(0).getParameters(), notNullValue());
60+
61+
request = new FollowInfoAction.Request();
62+
request.setFollowerIndices("_all");
63+
response = client().execute(FollowInfoAction.INSTANCE, request).actionGet();
64+
response.getFollowInfos().sort(Comparator.comparing(FollowInfoAction.Response.FollowerInfo::getFollowerIndex));
65+
assertThat(response.getFollowInfos().size(), equalTo(2));
66+
assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower1"));
67+
assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader1"));
68+
assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.ACTIVE));
69+
assertThat(response.getFollowInfos().get(0).getParameters(), notNullValue());
70+
assertThat(response.getFollowInfos().get(1).getFollowerIndex(), equalTo("follower2"));
71+
assertThat(response.getFollowInfos().get(1).getLeaderIndex(), equalTo("leader2"));
72+
assertThat(response.getFollowInfos().get(1).getStatus(), equalTo(Status.ACTIVE));
73+
assertThat(response.getFollowInfos().get(1).getParameters(), notNullValue());
74+
75+
// Pause follower1 index and check the follower info api:
76+
assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet());
77+
78+
request = new FollowInfoAction.Request();
79+
request.setFollowerIndices("follower1");
80+
response = client().execute(FollowInfoAction.INSTANCE, request).actionGet();
81+
assertThat(response.getFollowInfos().size(), equalTo(1));
82+
assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower1"));
83+
assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader1"));
84+
assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.PAUSED));
85+
assertThat(response.getFollowInfos().get(0).getParameters(), nullValue());
86+
87+
request = new FollowInfoAction.Request();
88+
request.setFollowerIndices("follower2");
89+
response = client().execute(FollowInfoAction.INSTANCE, request).actionGet();
90+
assertThat(response.getFollowInfos().size(), equalTo(1));
91+
assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower2"));
92+
assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader2"));
93+
assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.ACTIVE));
94+
assertThat(response.getFollowInfos().get(0).getParameters(), notNullValue());
95+
96+
request = new FollowInfoAction.Request();
97+
request.setFollowerIndices("_all");
98+
response = client().execute(FollowInfoAction.INSTANCE, request).actionGet();
99+
response.getFollowInfos().sort(Comparator.comparing(FollowInfoAction.Response.FollowerInfo::getFollowerIndex));
100+
assertThat(response.getFollowInfos().size(), equalTo(2));
101+
assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower1"));
102+
assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader1"));
103+
assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.PAUSED));
104+
assertThat(response.getFollowInfos().get(0).getParameters(), nullValue());
105+
assertThat(response.getFollowInfos().get(1).getFollowerIndex(), equalTo("follower2"));
106+
assertThat(response.getFollowInfos().get(1).getLeaderIndex(), equalTo("leader2"));
107+
assertThat(response.getFollowInfos().get(1).getStatus(), equalTo(Status.ACTIVE));
108+
assertThat(response.getFollowInfos().get(1).getParameters(), notNullValue());
109+
110+
assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower2")).actionGet());
111+
}
112+
113+
public void testFollowInfoApiIndexMissing() throws Exception {
114+
final String leaderIndexSettings = getIndexSettings(1, 0,
115+
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
116+
assertAcked(client().admin().indices().prepareCreate("leader1").setSource(leaderIndexSettings, XContentType.JSON));
117+
ensureGreen("leader1");
118+
assertAcked(client().admin().indices().prepareCreate("leader2").setSource(leaderIndexSettings, XContentType.JSON));
119+
ensureGreen("leader2");
120+
121+
PutFollowAction.Request followRequest = getPutFollowRequest("leader1", "follower1");
122+
client().execute(PutFollowAction.INSTANCE, followRequest).get();
123+
124+
followRequest = getPutFollowRequest("leader2", "follower2");
125+
client().execute(PutFollowAction.INSTANCE, followRequest).get();
126+
127+
FollowInfoAction.Request request1 = new FollowInfoAction.Request();
128+
request1.setFollowerIndices("follower3");
129+
expectThrows(IndexNotFoundException.class, () -> client().execute(FollowInfoAction.INSTANCE, request1).actionGet());
130+
131+
FollowInfoAction.Request request2 = new FollowInfoAction.Request();
132+
request2.setFollowerIndices("follower2", "follower3");
133+
expectThrows(IndexNotFoundException.class, () -> client().execute(FollowInfoAction.INSTANCE, request2).actionGet());
134+
135+
assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet());
136+
assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower2")).actionGet());
137+
}
138+
139+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.ccr.action;
8+
9+
import org.elasticsearch.Version;
10+
import org.elasticsearch.cluster.ClusterName;
11+
import org.elasticsearch.cluster.ClusterState;
12+
import org.elasticsearch.cluster.metadata.IndexMetaData;
13+
import org.elasticsearch.cluster.metadata.MetaData;
14+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
15+
import org.elasticsearch.test.ESTestCase;
16+
import org.elasticsearch.xpack.ccr.Ccr;
17+
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response;
18+
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowerInfo;
19+
20+
import java.util.Arrays;
21+
import java.util.HashMap;
22+
import java.util.List;
23+
24+
import static org.elasticsearch.xpack.ccr.action.TransportFollowStatsActionTests.createShardFollowTask;
25+
import static org.hamcrest.Matchers.equalTo;
26+
27+
public class TransportFollowInfoActionTests extends ESTestCase {
28+
29+
public void testGetFollowInfos() {
30+
ClusterState state = createCS(
31+
new String[] {"follower1", "follower2", "follower3", "index4"},
32+
new boolean[]{true, true, true, false},
33+
new boolean[]{true, true, false, false}
34+
);
35+
List<String> concreteIndices = Arrays.asList("follower1", "follower3");
36+
37+
List<FollowerInfo> result = TransportFollowInfoAction.getFollowInfos(concreteIndices, state);
38+
assertThat(result.size(), equalTo(2));
39+
assertThat(result.get(0).getFollowerIndex(), equalTo("follower1"));
40+
assertThat(result.get(0).getStatus(), equalTo(Response.Status.ACTIVE));
41+
assertThat(result.get(1).getFollowerIndex(), equalTo("follower3"));
42+
assertThat(result.get(1).getStatus(), equalTo(Response.Status.PAUSED));
43+
}
44+
45+
private static ClusterState createCS(String[] indices, boolean[] followerIndices, boolean[] statuses) {
46+
PersistentTasksCustomMetaData.Builder persistentTasks = PersistentTasksCustomMetaData.builder();
47+
MetaData.Builder mdBuilder = MetaData.builder();
48+
for (int i = 0; i < indices.length; i++) {
49+
String index = indices[i];
50+
boolean isFollowIndex = followerIndices[i];
51+
boolean active = statuses[i];
52+
53+
IndexMetaData.Builder imdBuilder = IndexMetaData.builder(index)
54+
.settings(settings(Version.CURRENT))
55+
.numberOfShards(1)
56+
.numberOfReplicas(0);
57+
58+
if (isFollowIndex) {
59+
imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, new HashMap<>());
60+
if (active) {
61+
persistentTasks.addTask(Integer.toString(i), ShardFollowTask.NAME, createShardFollowTask(index), null);
62+
}
63+
}
64+
mdBuilder.put(imdBuilder);
65+
}
66+
67+
mdBuilder.putCustom(PersistentTasksCustomMetaData.TYPE, persistentTasks.build());
68+
return ClusterState.builder(new ClusterName("_cluster"))
69+
.metaData(mdBuilder.build())
70+
.build();
71+
}
72+
73+
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsActionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public void testFindFollowerIndicesFromShardFollowTasks() {
4444
assertThat(result.size(), equalTo(0));
4545
}
4646

47-
private static ShardFollowTask createShardFollowTask(String followerIndex) {
47+
static ShardFollowTask createShardFollowTask(String followerIndex) {
4848
return new ShardFollowTask(
4949
null,
5050
new ShardId(followerIndex, "", 0),

0 commit comments

Comments
 (0)