Skip to content

Commit eb715d5

Browse files
authored
Add follower index to CCR monitoring and status (#33645)
This commit adds the follower index to CCR shard follow task status, and to monitoring.
1 parent b5d8495 commit eb715d5

File tree

9 files changed

+47
-27
lines changed

9 files changed

+47
-27
lines changed

x-pack/plugin/ccr/qa/multi-cluster-with-security/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexSecurityIT.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public void testFollowIndex() throws Exception {
8282
createAndFollowIndex("leader_cluster:" + allowedIndex, allowedIndex);
8383
assertBusy(() -> verifyDocuments(client(), allowedIndex, numDocs));
8484
assertThat(countCcrNodeTasks(), equalTo(1));
85-
assertBusy(() -> verifyCcrMonitoring(allowedIndex));
85+
assertBusy(() -> verifyCcrMonitoring(allowedIndex, allowedIndex));
8686
assertOK(client().performRequest(new Request("POST", "/" + allowedIndex + "/_ccr/unfollow")));
8787
// Make sure that there are no other ccr relates operations running:
8888
assertBusy(() -> {
@@ -206,7 +206,7 @@ private static boolean indexExists(RestClient client, String index) throws IOExc
206206
return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode();
207207
}
208208

209-
private static void verifyCcrMonitoring(String expectedLeaderIndex) throws IOException {
209+
private static void verifyCcrMonitoring(String expectedLeaderIndex, String expectedFollowerIndex) throws IOException {
210210
ensureYellow(".monitoring-*");
211211

212212
Request request = new Request("GET", "/.monitoring-*/_search");
@@ -222,7 +222,10 @@ private static void verifyCcrMonitoring(String expectedLeaderIndex) throws IOExc
222222
for (int i = 0; i < hits.size(); i++) {
223223
Map<?, ?> hit = (Map<?, ?>) hits.get(i);
224224
String leaderIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.leader_index", hit);
225-
assertThat(leaderIndex, endsWith(leaderIndex));
225+
assertThat(leaderIndex, endsWith(expectedLeaderIndex));
226+
227+
final String followerIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.follower_index", hit);
228+
assertThat(followerIndex, equalTo(expectedFollowerIndex));
226229

227230
int foundNumberOfOperationsReceived =
228231
(int) XContentMapValues.extractValue("_source.ccr_stats.operations_received", hit);

x-pack/plugin/ccr/qa/multi-cluster/src/test/java/org/elasticsearch/xpack/ccr/FollowIndexIT.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public void testFollowIndex() throws Exception {
7777
index(leaderClient, leaderIndexName, Integer.toString(id + 2), "field", id + 2, "filtered_field", "true");
7878
}
7979
assertBusy(() -> verifyDocuments(followIndexName, numDocs + 3));
80-
assertBusy(() -> verifyCcrMonitoring(leaderIndexName));
80+
assertBusy(() -> verifyCcrMonitoring(leaderIndexName, followIndexName));
8181
}
8282
}
8383

@@ -107,7 +107,7 @@ public void testAutoFollowPatterns() throws Exception {
107107
ensureYellow("logs-20190101");
108108
verifyDocuments("logs-20190101", 5);
109109
});
110-
assertBusy(() -> verifyCcrMonitoring("logs-20190101"));
110+
assertBusy(() -> verifyCcrMonitoring("logs-20190101", "logs-20190101"));
111111
}
112112

113113
private static void index(RestClient client, String index, String id, Object... fields) throws IOException {
@@ -159,7 +159,7 @@ private static void verifyDocuments(String index, int expectedNumDocs) throws IO
159159
}
160160
}
161161

162-
private static void verifyCcrMonitoring(String expectedLeaderIndex) throws IOException {
162+
private static void verifyCcrMonitoring(final String expectedLeaderIndex, final String expectedFollowerIndex) throws IOException {
163163
ensureYellow(".monitoring-*");
164164

165165
Request request = new Request("GET", "/.monitoring-*/_search");
@@ -175,7 +175,10 @@ private static void verifyCcrMonitoring(String expectedLeaderIndex) throws IOExc
175175
for (int i = 0; i < hits.size(); i++) {
176176
Map<?, ?> hit = (Map<?, ?>) hits.get(i);
177177
String leaderIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.leader_index", hit);
178-
assertThat(leaderIndex, endsWith(leaderIndex));
178+
assertThat(leaderIndex, endsWith(expectedLeaderIndex));
179+
180+
final String followerIndex = (String) XContentMapValues.extractValue("_source.ccr_stats.follower_index", hit);
181+
assertThat(followerIndex, equalTo(expectedFollowerIndex));
179182

180183
int foundNumberOfOperationsReceived =
181184
(int) XContentMapValues.extractValue("_source.ccr_stats.operations_received", hit);

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,7 @@ public synchronized ShardFollowNodeTaskStatus getStatus() {
418418
}
419419
return new ShardFollowNodeTaskStatus(
420420
leaderIndex,
421+
params.getFollowShardId().getIndexName(),
421422
getFollowShardId().getId(),
422423
leaderGlobalCheckpoint,
423424
leaderMaxSeqNo,

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportCcrStatsAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ protected void taskOperation(
106106
final CcrStatsAction.StatsRequest request,
107107
final ShardFollowNodeTask task,
108108
final ActionListener<CcrStatsAction.StatsResponse> listener) {
109-
listener.onResponse(new CcrStatsAction.StatsResponse(task.getFollowShardId(), task.getStatus()));
109+
listener.onResponse(new CcrStatsAction.StatsResponse(task.getStatus()));
110110
}
111111

112112
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ protected ShardFollowNodeTaskStatus doParseInstance(XContentParser parser) throw
3333
protected ShardFollowNodeTaskStatus createTestInstance() {
3434
// if you change this constructor, reflect the changes in the hand-written assertions below
3535
return new ShardFollowNodeTaskStatus(
36+
randomAlphaOfLength(4),
3637
randomAlphaOfLength(4),
3738
randomInt(),
3839
randomNonNegativeLong(),
@@ -61,6 +62,7 @@ protected ShardFollowNodeTaskStatus createTestInstance() {
6162
protected void assertEqualInstances(final ShardFollowNodeTaskStatus expectedInstance, final ShardFollowNodeTaskStatus newInstance) {
6263
assertNotSame(expectedInstance, newInstance);
6364
assertThat(newInstance.leaderIndex(), equalTo(expectedInstance.leaderIndex()));
65+
assertThat(newInstance.followerIndex(), equalTo(expectedInstance.followerIndex()));
6466
assertThat(newInstance.getShardId(), equalTo(expectedInstance.getShardId()));
6567
assertThat(newInstance.leaderGlobalCheckpoint(), equalTo(expectedInstance.leaderGlobalCheckpoint()));
6668
assertThat(newInstance.leaderMaxSeqNo(), equalTo(expectedInstance.leaderMaxSeqNo()));

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
3434
public static final String STATUS_PARSER_NAME = "shard-follow-node-task-status";
3535

3636
private static final ParseField LEADER_INDEX = new ParseField("leader_index");
37+
private static final ParseField FOLLOWER_INDEX = new ParseField("follower_index");
3738
private static final ParseField SHARD_ID = new ParseField("shard_id");
3839
private static final ParseField LEADER_GLOBAL_CHECKPOINT_FIELD = new ParseField("leader_global_checkpoint");
3940
private static final ParseField LEADER_MAX_SEQ_NO_FIELD = new ParseField("leader_max_seq_no");
@@ -62,16 +63,16 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
6263
STATUS_PARSER_NAME,
6364
args -> new ShardFollowNodeTaskStatus(
6465
(String) args[0],
65-
(int) args[1],
66-
(long) args[2],
66+
(String) args[1],
67+
(int) args[2],
6768
(long) args[3],
6869
(long) args[4],
6970
(long) args[5],
7071
(long) args[6],
71-
(int) args[7],
72+
(long) args[7],
7273
(int) args[8],
7374
(int) args[9],
74-
(long) args[10],
75+
(int) args[10],
7576
(long) args[11],
7677
(long) args[12],
7778
(long) args[13],
@@ -81,11 +82,12 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
8182
(long) args[17],
8283
(long) args[18],
8384
(long) args[19],
85+
(long) args[20],
8486
new TreeMap<>(
85-
((List<Map.Entry<Long, ElasticsearchException>>) args[20])
87+
((List<Map.Entry<Long, ElasticsearchException>>) args[21])
8688
.stream()
8789
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))),
88-
(long) args[21]));
90+
(long) args[22]));
8991

9092
public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry";
9193

@@ -96,6 +98,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
9698

9799
static {
98100
STATUS_PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_INDEX);
101+
STATUS_PARSER.declareString(ConstructingObjectParser.constructorArg(), FOLLOWER_INDEX);
99102
STATUS_PARSER.declareInt(ConstructingObjectParser.constructorArg(), SHARD_ID);
100103
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_GLOBAL_CHECKPOINT_FIELD);
101104
STATUS_PARSER.declareLong(ConstructingObjectParser.constructorArg(), LEADER_MAX_SEQ_NO_FIELD);
@@ -136,6 +139,12 @@ public String leaderIndex() {
136139
return leaderIndex;
137140
}
138141

142+
private final String followerIndex;
143+
144+
public String followerIndex() {
145+
return followerIndex;
146+
}
147+
139148
private final int shardId;
140149

141150
public int getShardId() {
@@ -264,6 +273,7 @@ public long timeSinceLastFetchMillis() {
264273

265274
public ShardFollowNodeTaskStatus(
266275
final String leaderIndex,
276+
final String followerIndex,
267277
final int shardId,
268278
final long leaderGlobalCheckpoint,
269279
final long leaderMaxSeqNo,
@@ -286,6 +296,7 @@ public ShardFollowNodeTaskStatus(
286296
final NavigableMap<Long, ElasticsearchException> fetchExceptions,
287297
final long timeSinceLastFetchMillis) {
288298
this.leaderIndex = leaderIndex;
299+
this.followerIndex = followerIndex;
289300
this.shardId = shardId;
290301
this.leaderGlobalCheckpoint = leaderGlobalCheckpoint;
291302
this.leaderMaxSeqNo = leaderMaxSeqNo;
@@ -311,6 +322,7 @@ public ShardFollowNodeTaskStatus(
311322

312323
public ShardFollowNodeTaskStatus(final StreamInput in) throws IOException {
313324
this.leaderIndex = in.readString();
325+
this.followerIndex = in.readString();
314326
this.shardId = in.readVInt();
315327
this.leaderGlobalCheckpoint = in.readZLong();
316328
this.leaderMaxSeqNo = in.readZLong();
@@ -342,6 +354,7 @@ public String getWriteableName() {
342354
@Override
343355
public void writeTo(final StreamOutput out) throws IOException {
344356
out.writeString(leaderIndex);
357+
out.writeString(followerIndex);
345358
out.writeVInt(shardId);
346359
out.writeZLong(leaderGlobalCheckpoint);
347360
out.writeZLong(leaderMaxSeqNo);
@@ -377,6 +390,7 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
377390

378391
public XContentBuilder toXContentFragment(final XContentBuilder builder, final Params params) throws IOException {
379392
builder.field(LEADER_INDEX.getPreferredName(), leaderIndex);
393+
builder.field(FOLLOWER_INDEX.getPreferredName(), followerIndex);
380394
builder.field(SHARD_ID.getPreferredName(), shardId);
381395
builder.field(LEADER_GLOBAL_CHECKPOINT_FIELD.getPreferredName(), leaderGlobalCheckpoint);
382396
builder.field(LEADER_MAX_SEQ_NO_FIELD.getPreferredName(), leaderMaxSeqNo);
@@ -439,6 +453,7 @@ public boolean equals(final Object o) {
439453
if (o == null || getClass() != o.getClass()) return false;
440454
final ShardFollowNodeTaskStatus that = (ShardFollowNodeTaskStatus) o;
441455
return leaderIndex.equals(that.leaderIndex) &&
456+
followerIndex.equals(that.followerIndex) &&
442457
shardId == that.shardId &&
443458
leaderGlobalCheckpoint == that.leaderGlobalCheckpoint &&
444459
leaderMaxSeqNo == that.leaderMaxSeqNo &&
@@ -471,6 +486,7 @@ public boolean equals(final Object o) {
471486
public int hashCode() {
472487
return Objects.hash(
473488
leaderIndex,
489+
followerIndex,
474490
shardId,
475491
leaderGlobalCheckpoint,
476492
leaderMaxSeqNo,

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/action/CcrStatsAction.java

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.elasticsearch.common.io.stream.Writeable;
2020
import org.elasticsearch.common.xcontent.ToXContentObject;
2121
import org.elasticsearch.common.xcontent.XContentBuilder;
22-
import org.elasticsearch.index.shard.ShardId;
2322
import org.elasticsearch.tasks.Task;
2423
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
2524

@@ -70,8 +69,8 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
7069
final Map<String, Map<Integer, StatsResponse>> taskResponsesByIndex = new TreeMap<>();
7170
for (final StatsResponse statsResponse : statsResponse) {
7271
taskResponsesByIndex.computeIfAbsent(
73-
statsResponse.followerShardId().getIndexName(),
74-
k -> new TreeMap<>()).put(statsResponse.followerShardId().getId(), statsResponse);
72+
statsResponse.status().followerIndex(),
73+
k -> new TreeMap<>()).put(statsResponse.status().getShardId(), statsResponse);
7574
}
7675
builder.startObject();
7776
{
@@ -150,31 +149,22 @@ public void writeTo(StreamOutput out) throws IOException {
150149

151150
public static class StatsResponse implements Writeable {
152151

153-
private final ShardId followerShardId;
154-
155-
public ShardId followerShardId() {
156-
return followerShardId;
157-
}
158-
159152
private final ShardFollowNodeTaskStatus status;
160153

161154
public ShardFollowNodeTaskStatus status() {
162155
return status;
163156
}
164157

165-
public StatsResponse(final ShardId followerShardId, final ShardFollowNodeTaskStatus status) {
166-
this.followerShardId = followerShardId;
158+
public StatsResponse(final ShardFollowNodeTaskStatus status) {
167159
this.status = status;
168160
}
169161

170162
public StatsResponse(final StreamInput in) throws IOException {
171-
this.followerShardId = ShardId.readShardId(in);
172163
this.status = new ShardFollowNodeTaskStatus(in);
173164
}
174165

175166
@Override
176167
public void writeTo(final StreamOutput out) throws IOException {
177-
followerShardId.writeTo(out);
178168
status.writeTo(out);
179169
}
180170

x-pack/plugin/core/src/main/resources/monitoring-es.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -922,6 +922,9 @@
922922
"leader_index": {
923923
"type": "keyword"
924924
},
925+
"follower_index": {
926+
"type": "keyword"
927+
},
925928
"shard_id": {
926929
"type": "integer"
927930
},

x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/CcrStatsMonitoringDocTests.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ public void testToXContent() throws IOException {
9898
final long timeSinceLastFetchMillis = randomNonNegativeLong();
9999
final ShardFollowNodeTaskStatus status = new ShardFollowNodeTaskStatus(
100100
"cluster_alias:leader_index",
101+
"follower_index",
101102
shardId,
102103
leaderGlobalCheckpoint,
103104
leaderMaxSeqNo,
@@ -139,6 +140,7 @@ public void testToXContent() throws IOException {
139140
+ "},"
140141
+ "\"ccr_stats\":{"
141142
+ "\"leader_index\":\"cluster_alias:leader_index\","
143+
+ "\"follower_index\":\"follower_index\","
142144
+ "\"shard_id\":" + shardId + ","
143145
+ "\"leader_global_checkpoint\":" + leaderGlobalCheckpoint + ","
144146
+ "\"leader_max_seq_no\":" + leaderMaxSeqNo + ","

0 commit comments

Comments
 (0)