Skip to content

Commit dabd1d5

Browse files
committed
Fix follow index filtering in follow info api.
The filtering by follower index was completely broken. Also the wrong persistent tasks were selected, causing the wrong status to be reported. Closes elastic#37738
1 parent 534ba1d commit dabd1d5

File tree

4 files changed

+226
-10
lines changed

4 files changed

+226
-10
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowInfoAction.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -65,19 +65,28 @@ protected void masterOperation(FollowInfoAction.Request request,
6565
List<String> concreteFollowerIndices = Arrays.asList(indexNameExpressionResolver.concreteIndexNames(state,
6666
IndicesOptions.STRICT_EXPAND_OPEN_CLOSED, request.getFollowerIndices()));
6767

68+
List<FollowerInfo> followerInfos = getFollowInfos(concreteFollowerIndices, state);
69+
listener.onResponse(new FollowInfoAction.Response(followerInfos));
70+
}
71+
72+
@Override
73+
protected ClusterBlockException checkBlock(FollowInfoAction.Request request, ClusterState state) {
74+
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
75+
}
6876

77+
static List<FollowerInfo> getFollowInfos(List<String> concreteFollowerIndices, ClusterState state) {
6978
List<FollowerInfo> followerInfos = new ArrayList<>();
7079
PersistentTasksCustomMetaData persistentTasks = state.metaData().custom(PersistentTasksCustomMetaData.TYPE);
7180

72-
for (IndexMetaData indexMetaData : state.metaData()) {
81+
for (String index : concreteFollowerIndices) {
82+
IndexMetaData indexMetaData = state.metaData().index(index);
7383
Map<String, String> ccrCustomData = indexMetaData.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
7484
if (ccrCustomData != null) {
7585
Optional<ShardFollowTask> result;
7686
if (persistentTasks != null) {
7787
result = persistentTasks.taskMap().values().stream()
7888
.map(persistentTask -> (ShardFollowTask) persistentTask.getParams())
79-
.filter(shardFollowTask -> concreteFollowerIndices.isEmpty() ||
80-
concreteFollowerIndices.contains(shardFollowTask.getFollowShardId().getIndexName()))
89+
.filter(shardFollowTask -> index.equals(shardFollowTask.getFollowShardId().getIndexName()))
8190
.findAny();
8291
} else {
8392
result = Optional.empty();
@@ -107,11 +116,6 @@ protected void masterOperation(FollowInfoAction.Request request,
107116
}
108117
}
109118

110-
listener.onResponse(new FollowInfoAction.Response(followerInfos));
111-
}
112-
113-
@Override
114-
protected ClusterBlockException checkBlock(FollowInfoAction.Request request, ClusterState state) {
115-
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
119+
return followerInfos;
116120
}
117121
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.ccr;
8+
9+
import org.elasticsearch.common.xcontent.XContentType;
10+
import org.elasticsearch.index.IndexNotFoundException;
11+
import org.elasticsearch.index.IndexSettings;
12+
import org.elasticsearch.xpack.CcrSingleNodeTestCase;
13+
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction;
14+
import org.elasticsearch.xpack.core.ccr.action.PauseFollowAction;
15+
import org.elasticsearch.xpack.core.ccr.action.PutFollowAction;
16+
17+
import java.util.Comparator;
18+
19+
import static java.util.Collections.singletonMap;
20+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
21+
import static org.elasticsearch.xpack.ccr.LocalIndexFollowingIT.getIndexSettings;
22+
import static org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.Status;
23+
import static org.hamcrest.Matchers.equalTo;
24+
import static org.hamcrest.Matchers.notNullValue;
25+
import static org.hamcrest.Matchers.nullValue;
26+
27+
public class FollowInfoIT extends CcrSingleNodeTestCase {
28+
29+
public void testFollowInfoApiFollowerIndexFiltering() throws Exception {
30+
final String leaderIndexSettings = getIndexSettings(1, 0,
31+
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
32+
assertAcked(client().admin().indices().prepareCreate("leader1").setSource(leaderIndexSettings, XContentType.JSON));
33+
ensureGreen("leader1");
34+
assertAcked(client().admin().indices().prepareCreate("leader2").setSource(leaderIndexSettings, XContentType.JSON));
35+
ensureGreen("leader2");
36+
37+
PutFollowAction.Request followRequest = getPutFollowRequest("leader1", "follower1");
38+
client().execute(PutFollowAction.INSTANCE, followRequest).get();
39+
40+
followRequest = getPutFollowRequest("leader2", "follower2");
41+
client().execute(PutFollowAction.INSTANCE, followRequest).get();
42+
43+
FollowInfoAction.Request request = new FollowInfoAction.Request();
44+
request.setFollowerIndices("follower1");
45+
FollowInfoAction.Response response = client().execute(FollowInfoAction.INSTANCE, request).actionGet();
46+
assertThat(response.getFollowInfos().size(), equalTo(1));
47+
assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower1"));
48+
assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader1"));
49+
assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.ACTIVE));
50+
assertThat(response.getFollowInfos().get(0).getParameters(), notNullValue());
51+
52+
request = new FollowInfoAction.Request();
53+
request.setFollowerIndices("follower2");
54+
response = client().execute(FollowInfoAction.INSTANCE, request).actionGet();
55+
assertThat(response.getFollowInfos().size(), equalTo(1));
56+
assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower2"));
57+
assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader2"));
58+
assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.ACTIVE));
59+
assertThat(response.getFollowInfos().get(0).getParameters(), notNullValue());
60+
61+
request = new FollowInfoAction.Request();
62+
request.setFollowerIndices("_all");
63+
response = client().execute(FollowInfoAction.INSTANCE, request).actionGet();
64+
response.getFollowInfos().sort(Comparator.comparing(FollowInfoAction.Response.FollowerInfo::getFollowerIndex));
65+
assertThat(response.getFollowInfos().size(), equalTo(2));
66+
assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower1"));
67+
assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader1"));
68+
assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.ACTIVE));
69+
assertThat(response.getFollowInfos().get(0).getParameters(), notNullValue());
70+
assertThat(response.getFollowInfos().get(1).getFollowerIndex(), equalTo("follower2"));
71+
assertThat(response.getFollowInfos().get(1).getLeaderIndex(), equalTo("leader2"));
72+
assertThat(response.getFollowInfos().get(1).getStatus(), equalTo(Status.ACTIVE));
73+
assertThat(response.getFollowInfos().get(1).getParameters(), notNullValue());
74+
75+
// Pause follower1 index and check the follower info api:
76+
assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet());
77+
78+
request = new FollowInfoAction.Request();
79+
request.setFollowerIndices("follower1");
80+
response = client().execute(FollowInfoAction.INSTANCE, request).actionGet();
81+
assertThat(response.getFollowInfos().size(), equalTo(1));
82+
assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower1"));
83+
assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader1"));
84+
assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.PAUSED));
85+
assertThat(response.getFollowInfos().get(0).getParameters(), nullValue());
86+
87+
request = new FollowInfoAction.Request();
88+
request.setFollowerIndices("follower2");
89+
response = client().execute(FollowInfoAction.INSTANCE, request).actionGet();
90+
assertThat(response.getFollowInfos().size(), equalTo(1));
91+
assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower2"));
92+
assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader2"));
93+
assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.ACTIVE));
94+
assertThat(response.getFollowInfos().get(0).getParameters(), notNullValue());
95+
96+
request = new FollowInfoAction.Request();
97+
request.setFollowerIndices("_all");
98+
response = client().execute(FollowInfoAction.INSTANCE, request).actionGet();
99+
response.getFollowInfos().sort(Comparator.comparing(FollowInfoAction.Response.FollowerInfo::getFollowerIndex));
100+
assertThat(response.getFollowInfos().size(), equalTo(2));
101+
assertThat(response.getFollowInfos().get(0).getFollowerIndex(), equalTo("follower1"));
102+
assertThat(response.getFollowInfos().get(0).getLeaderIndex(), equalTo("leader1"));
103+
assertThat(response.getFollowInfos().get(0).getStatus(), equalTo(Status.PAUSED));
104+
assertThat(response.getFollowInfos().get(0).getParameters(), nullValue());
105+
assertThat(response.getFollowInfos().get(1).getFollowerIndex(), equalTo("follower2"));
106+
assertThat(response.getFollowInfos().get(1).getLeaderIndex(), equalTo("leader2"));
107+
assertThat(response.getFollowInfos().get(1).getStatus(), equalTo(Status.ACTIVE));
108+
assertThat(response.getFollowInfos().get(1).getParameters(), notNullValue());
109+
110+
assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower2")).actionGet());
111+
}
112+
113+
public void testFollowInfoApiIndexMissing() throws Exception {
114+
final String leaderIndexSettings = getIndexSettings(1, 0,
115+
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
116+
assertAcked(client().admin().indices().prepareCreate("leader1").setSource(leaderIndexSettings, XContentType.JSON));
117+
ensureGreen("leader1");
118+
assertAcked(client().admin().indices().prepareCreate("leader2").setSource(leaderIndexSettings, XContentType.JSON));
119+
ensureGreen("leader2");
120+
121+
PutFollowAction.Request followRequest = getPutFollowRequest("leader1", "follower1");
122+
client().execute(PutFollowAction.INSTANCE, followRequest).get();
123+
124+
followRequest = getPutFollowRequest("leader2", "follower2");
125+
client().execute(PutFollowAction.INSTANCE, followRequest).get();
126+
127+
FollowInfoAction.Request request1 = new FollowInfoAction.Request();
128+
request1.setFollowerIndices("follower3");
129+
expectThrows(IndexNotFoundException.class, () -> client().execute(FollowInfoAction.INSTANCE, request1).actionGet());
130+
131+
FollowInfoAction.Request request2 = new FollowInfoAction.Request();
132+
request2.setFollowerIndices("follower2", "follower3");
133+
expectThrows(IndexNotFoundException.class, () -> client().execute(FollowInfoAction.INSTANCE, request2).actionGet());
134+
135+
assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower1")).actionGet());
136+
assertAcked(client().execute(PauseFollowAction.INSTANCE, new PauseFollowAction.Request("follower2")).actionGet());
137+
}
138+
139+
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.ccr.action;
8+
9+
import org.elasticsearch.Version;
10+
import org.elasticsearch.cluster.ClusterName;
11+
import org.elasticsearch.cluster.ClusterState;
12+
import org.elasticsearch.cluster.metadata.IndexMetaData;
13+
import org.elasticsearch.cluster.metadata.MetaData;
14+
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
15+
import org.elasticsearch.test.ESTestCase;
16+
import org.elasticsearch.xpack.ccr.Ccr;
17+
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response;
18+
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowerInfo;
19+
20+
import java.util.Arrays;
21+
import java.util.HashMap;
22+
import java.util.List;
23+
24+
import static org.elasticsearch.xpack.ccr.action.TransportFollowStatsActionTests.createShardFollowTask;
25+
import static org.hamcrest.Matchers.equalTo;
26+
27+
public class TransportFollowInfoActionTests extends ESTestCase {
28+
29+
public void testGetFollowInfos() {
30+
ClusterState state = createCS(
31+
new String[] {"follower1", "follower2", "follower3", "index4"},
32+
new boolean[]{true, true, true, false},
33+
new boolean[]{true, true, false, false}
34+
);
35+
List<String> concreteIndices = Arrays.asList("follower1", "follower3");
36+
37+
List<FollowerInfo> result = TransportFollowInfoAction.getFollowInfos(concreteIndices, state);
38+
assertThat(result.size(), equalTo(2));
39+
assertThat(result.get(0).getFollowerIndex(), equalTo("follower1"));
40+
assertThat(result.get(0).getStatus(), equalTo(Response.Status.ACTIVE));
41+
assertThat(result.get(1).getFollowerIndex(), equalTo("follower3"));
42+
assertThat(result.get(1).getStatus(), equalTo(Response.Status.PAUSED));
43+
}
44+
45+
private static ClusterState createCS(String[] indices, boolean[] followerIndices, boolean[] statuses) {
46+
PersistentTasksCustomMetaData.Builder persistentTasks = PersistentTasksCustomMetaData.builder();
47+
MetaData.Builder mdBuilder = MetaData.builder();
48+
for (int i = 0; i < indices.length; i++) {
49+
String index = indices[i];
50+
boolean isFollowIndex = followerIndices[i];
51+
boolean active = statuses[i];
52+
53+
IndexMetaData.Builder imdBuilder = IndexMetaData.builder(index)
54+
.settings(settings(Version.CURRENT))
55+
.numberOfShards(1)
56+
.numberOfReplicas(0);
57+
58+
if (isFollowIndex) {
59+
imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, new HashMap<>());
60+
if (active) {
61+
persistentTasks.addTask(Integer.toString(i), ShardFollowTask.NAME, createShardFollowTask(index), null);
62+
}
63+
}
64+
mdBuilder.put(imdBuilder);
65+
}
66+
67+
mdBuilder.putCustom(PersistentTasksCustomMetaData.TYPE, persistentTasks.build());
68+
return ClusterState.builder(new ClusterName("_cluster"))
69+
.metaData(mdBuilder.build())
70+
.build();
71+
}
72+
73+
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsActionTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public void testFindFollowerIndicesFromShardFollowTasks() {
4444
assertThat(result.size(), equalTo(0));
4545
}
4646

47-
private static ShardFollowTask createShardFollowTask(String followerIndex) {
47+
static ShardFollowTask createShardFollowTask(String followerIndex) {
4848
return new ShardFollowTask(
4949
null,
5050
new ShardId(followerIndex, "", 0),

0 commit comments

Comments
 (0)