Skip to content

Commit af887be

Browse files
committed
Hide orphaned tasks from follower stats (#48901)
CCR follower stats can return information for persistent tasks that are in the process of being cleaned up. This is problematic for tests where CCR follower indices have been deleted, but their persistent follower task is only cleaned up asynchronously afterwards. If one of the following tests then accesses the follower stats, it might still get the stats for that follower task. In addition, some tests were not cleaning up their auto-follow patterns, leaving orphaned patterns behind. Other tests cleaned up their auto-follow patterns. As always the same name was used, it just depended on the test execution order whether this led to a failure or not. This commit fixes the offensive tests, and will also automatically remove auto-follow-patterns at the end of tests, like we do for many other features. Closes #48700
1 parent 8835142 commit af887be

File tree

6 files changed

+93
-13
lines changed

6 files changed

+93
-13
lines changed

docs/reference/ccr/apis/auto-follow/pause-auto-follow-pattern.asciidoc

+7
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,13 @@ PUT /_ccr/auto_follow/my_auto_follow_pattern
6363
// TEST[setup:remote_cluster]
6464
// TESTSETUP
6565
66+
[source,console]
67+
--------------------------------------------------
68+
DELETE /_ccr/auto_follow/my_auto_follow_pattern
69+
--------------------------------------------------
70+
// TEST
71+
// TEARDOWN
72+
6673
//////////////////////////
6774

6875
[source,console]

docs/reference/ccr/apis/auto-follow/resume-auto-follow-pattern.asciidoc

+7
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ PUT /_ccr/auto_follow/my_auto_follow_pattern
5959
// TEST[setup:remote_cluster]
6060
// TESTSETUP
6161
62+
[source,console]
63+
--------------------------------------------------
64+
DELETE /_ccr/auto_follow/my_auto_follow_pattern
65+
--------------------------------------------------
66+
// TEST
67+
// TEARDOWN
68+
6269
[source,console]
6370
--------------------------------------------------
6471
POST /_ccr/auto_follow/my_auto_follow_pattern/pause

test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java

+38
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,15 @@ protected boolean preserveILMPoliciesUponCompletion() {
471471
return false;
472472
}
473473

474+
/**
475+
* Returns whether to preserve auto-follow patterns. Defaults to not
476+
* preserving them. Only runs at all if xpack is installed on the cluster
477+
* being tested.
478+
*/
479+
protected boolean preserveAutoFollowPatternsUponCompletion() {
480+
return false;
481+
}
482+
474483
/**
475484
* Returns whether to wait to make absolutely certain that all snapshots
476485
* have been deleted.
@@ -553,6 +562,10 @@ private void wipeCluster() throws Exception {
553562
deleteAllILMPolicies();
554563
}
555564

565+
if (hasXPack && false == preserveAutoFollowPatternsUponCompletion()) {
566+
deleteAllAutoFollowPatterns();
567+
}
568+
556569
assertThat("Found in progress snapshots [" + inProgressSnapshots.get() + "].", inProgressSnapshots.get(), anEmptyMap());
557570
}
558571

@@ -718,6 +731,31 @@ private static void deleteAllSLMPolicies() throws IOException {
718731
}
719732
}
720733

734+
private static void deleteAllAutoFollowPatterns() throws IOException {
735+
final List<Map<?, ?>> patterns;
736+
737+
try {
738+
Response response = adminClient().performRequest(new Request("GET", "/_ccr/auto_follow"));
739+
patterns = (List<Map<?, ?>>) entityAsMap(response).get("patterns");
740+
} catch (ResponseException e) {
741+
if (RestStatus.METHOD_NOT_ALLOWED.getStatus() == e.getResponse().getStatusLine().getStatusCode() ||
742+
RestStatus.BAD_REQUEST.getStatus() == e.getResponse().getStatusLine().getStatusCode()) {
743+
// If bad request returned, CCR is not enabled.
744+
return;
745+
}
746+
throw e;
747+
}
748+
749+
if (patterns == null || patterns.isEmpty()) {
750+
return;
751+
}
752+
753+
for (Map<?, ?> pattern : patterns) {
754+
String patternName = (String) pattern.get("name");
755+
adminClient().performRequest(new Request("DELETE", "/_ccr/auto_follow/" + patternName));
756+
}
757+
}
758+
721759
/**
722760
* Logs a message if there are still running tasks. The reasoning is that any tasks still running are state the is trying to bleed into
723761
* other tests.

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsAction.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@
1313
import org.elasticsearch.action.support.ActionFilters;
1414
import org.elasticsearch.action.support.tasks.TransportTasksAction;
1515
import org.elasticsearch.cluster.ClusterState;
16+
import org.elasticsearch.cluster.metadata.MetaData;
1617
import org.elasticsearch.cluster.service.ClusterService;
1718
import org.elasticsearch.common.Strings;
1819
import org.elasticsearch.common.inject.Inject;
20+
import org.elasticsearch.index.Index;
1921
import org.elasticsearch.license.LicenseUtils;
2022
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
2123
import org.elasticsearch.tasks.Task;
@@ -116,15 +118,17 @@ static Set<String> findFollowerIndicesFromShardFollowTasks(ClusterState state, S
116118
if (persistentTasksMetaData == null) {
117119
return Collections.emptySet();
118120
}
119-
121+
final MetaData metaData = state.metaData();
120122
final Set<String> requestedFollowerIndices = indices != null ?
121123
new HashSet<>(Arrays.asList(indices)) : Collections.emptySet();
122124
return persistentTasksMetaData.tasks().stream()
123125
.filter(persistentTask -> persistentTask.getTaskName().equals(ShardFollowTask.NAME))
124126
.map(persistentTask -> {
125127
ShardFollowTask shardFollowTask = (ShardFollowTask) persistentTask.getParams();
126-
return shardFollowTask.getFollowShardId().getIndexName();
128+
return shardFollowTask.getFollowShardId().getIndex();
127129
})
130+
.filter(followerIndex -> metaData.index(followerIndex) != null) // hide tasks that are orphaned (see ShardFollowTaskCleaner)
131+
.map(Index::getName)
128132
.filter(followerIndex -> Strings.isAllOrWildcard(indices) || requestedFollowerIndices.contains(followerIndex))
129133
.collect(Collectors.toSet());
130134
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportFollowInfoActionTests.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.cluster.ClusterState;
1212
import org.elasticsearch.cluster.metadata.IndexMetaData;
1313
import org.elasticsearch.cluster.metadata.MetaData;
14+
import org.elasticsearch.index.Index;
1415
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
1516
import org.elasticsearch.test.ESTestCase;
1617
import org.elasticsearch.xpack.ccr.Ccr;
@@ -58,7 +59,8 @@ private static ClusterState createCS(String[] indices, boolean[] followerIndices
5859
if (isFollowIndex) {
5960
imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, new HashMap<>());
6061
if (active) {
61-
persistentTasks.addTask(Integer.toString(i), ShardFollowTask.NAME, createShardFollowTask(index), null);
62+
persistentTasks.addTask(Integer.toString(i), ShardFollowTask.NAME,
63+
createShardFollowTask(new Index(index, IndexMetaData.INDEX_UUID_NA_VALUE)), null);
6264
}
6365
}
6466
mdBuilder.put(imdBuilder);

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/TransportFollowStatsActionTests.java

+32-10
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,16 @@
55
*/
66
package org.elasticsearch.xpack.ccr.action;
77

8+
import org.elasticsearch.Version;
89
import org.elasticsearch.cluster.ClusterName;
910
import org.elasticsearch.cluster.ClusterState;
11+
import org.elasticsearch.cluster.metadata.IndexMetaData;
1012
import org.elasticsearch.cluster.metadata.MetaData;
13+
import org.elasticsearch.common.settings.Settings;
1114
import org.elasticsearch.common.unit.ByteSizeUnit;
1215
import org.elasticsearch.common.unit.ByteSizeValue;
1316
import org.elasticsearch.common.unit.TimeValue;
17+
import org.elasticsearch.index.Index;
1418
import org.elasticsearch.index.shard.ShardId;
1519
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
1620
import org.elasticsearch.test.ESTestCase;
@@ -24,30 +28,48 @@
2428
public class TransportFollowStatsActionTests extends ESTestCase {
2529

2630
public void testFindFollowerIndicesFromShardFollowTasks() {
31+
Settings indexSettings = Settings.builder()
32+
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
33+
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
34+
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
35+
.build();
36+
37+
IndexMetaData index1 = IndexMetaData.builder("index1").settings(indexSettings).build();
38+
IndexMetaData index2 = IndexMetaData.builder("index2").settings(indexSettings).build();
39+
IndexMetaData index3 = IndexMetaData.builder("index3").settings(indexSettings).build();
40+
2741
PersistentTasksCustomMetaData.Builder persistentTasks = PersistentTasksCustomMetaData.builder()
28-
.addTask("1", ShardFollowTask.NAME, createShardFollowTask("abc"), null)
29-
.addTask("2", ShardFollowTask.NAME, createShardFollowTask("def"), null);
42+
.addTask("1", ShardFollowTask.NAME, createShardFollowTask(index1.getIndex()), null)
43+
.addTask("2", ShardFollowTask.NAME, createShardFollowTask(index2.getIndex()), null)
44+
.addTask("3", ShardFollowTask.NAME, createShardFollowTask(index3.getIndex()), null);
3045

3146
ClusterState clusterState = ClusterState.builder(new ClusterName("_cluster"))
32-
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, persistentTasks.build()).build())
47+
.metaData(MetaData.builder()
48+
.putCustom(PersistentTasksCustomMetaData.TYPE, persistentTasks.build())
49+
// only add index1 and index2
50+
.put(index1, false)
51+
.put(index2, false)
52+
.build())
3353
.build();
3454
Set<String> result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState, null);
3555
assertThat(result.size(), equalTo(2));
36-
assertThat(result.contains("abc"), is(true));
37-
assertThat(result.contains("def"), is(true));
56+
assertThat(result.contains(index1.getIndex().getName()), is(true));
57+
assertThat(result.contains(index2.getIndex().getName()), is(true));
3858

39-
result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState, new String[]{"def"});
59+
result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState,
60+
new String[]{index2.getIndex().getName()});
4061
assertThat(result.size(), equalTo(1));
41-
assertThat(result.contains("def"), is(true));
62+
assertThat(result.contains(index2.getIndex().getName()), is(true));
4263

43-
result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState, new String[]{"ghi"});
64+
result = TransportFollowStatsAction.findFollowerIndicesFromShardFollowTasks(clusterState,
65+
new String[]{index3.getIndex().getName()});
4466
assertThat(result.size(), equalTo(0));
4567
}
4668

47-
static ShardFollowTask createShardFollowTask(String followerIndex) {
69+
static ShardFollowTask createShardFollowTask(Index followerIndex) {
4870
return new ShardFollowTask(
4971
null,
50-
new ShardId(followerIndex, "", 0),
72+
new ShardId(followerIndex, 0),
5173
new ShardId("leader_index", "", 0),
5274
1024,
5375
1024,

0 commit comments

Comments
 (0)