Skip to content

Commit 925d087

Browse files
committed
Expose retries for CCR fetch failures (#33694)
This commit exposes the number of times that a fetch has been tried to the CCR stats endpoint, and to CCR monitoring.
1 parent 88f8b5c commit 925d087

File tree

6 files changed

+72
-39
lines changed

6 files changed

+72
-39
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.ElasticsearchException;
1212
import org.elasticsearch.action.support.TransportActions;
1313
import org.elasticsearch.common.Randomness;
14+
import org.elasticsearch.common.collect.Tuple;
1415
import org.elasticsearch.common.logging.Loggers;
1516
import org.elasticsearch.common.transport.NetworkExceptionHelper;
1617
import org.elasticsearch.common.unit.TimeValue;
@@ -36,6 +37,7 @@
3637
import java.util.function.Consumer;
3738
import java.util.function.LongConsumer;
3839
import java.util.function.LongSupplier;
40+
import java.util.stream.Collectors;
3941

4042
/**
4143
* The node task that fetch the write operations from a leader shard and
@@ -72,7 +74,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
7274
private long numberOfOperationsIndexed = 0;
7375
private long lastFetchTime = -1;
7476
private final Queue<Translog.Operation> buffer = new PriorityQueue<>(Comparator.comparing(Translog.Operation::seqNo));
75-
private final LinkedHashMap<Long, ElasticsearchException> fetchExceptions;
77+
private final LinkedHashMap<Long, Tuple<AtomicInteger, ElasticsearchException>> fetchExceptions;
7678

7779
ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers,
7880
ShardFollowTask params, BiConsumer<TimeValue, Runnable> scheduler, final LongSupplier relativeTimeProvider) {
@@ -87,9 +89,9 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
8789
* concurrent fetches. For each failed fetch, we track the from sequence number associated with the request, and we clear the entry
8890
* when the fetch task associated with that from sequence number succeeds.
8991
*/
90-
this.fetchExceptions = new LinkedHashMap<Long, ElasticsearchException>() {
92+
this.fetchExceptions = new LinkedHashMap<Long, Tuple<AtomicInteger, ElasticsearchException>>() {
9193
@Override
92-
protected boolean removeEldestEntry(final Map.Entry<Long, ElasticsearchException> eldest) {
94+
protected boolean removeEldestEntry(final Map.Entry<Long, Tuple<AtomicInteger, ElasticsearchException>> eldest) {
9395
return size() > params.getMaxConcurrentReadBatches();
9496
}
9597
};
@@ -240,7 +242,7 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR
240242
synchronized (ShardFollowNodeTask.this) {
241243
totalFetchTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
242244
numberOfFailedFetches++;
243-
fetchExceptions.put(from, new ElasticsearchException(e));
245+
fetchExceptions.put(from, Tuple.tuple(retryCounter, new ElasticsearchException(e)));
244246
}
245247
handleFailure(e, retryCounter, () -> sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo, retryCounter));
246248
});
@@ -438,7 +440,12 @@ public synchronized ShardFollowNodeTaskStatus getStatus() {
438440
numberOfSuccessfulBulkOperations,
439441
numberOfFailedBulkOperations,
440442
numberOfOperationsIndexed,
441-
new TreeMap<>(fetchExceptions),
443+
new TreeMap<>(
444+
fetchExceptions
445+
.entrySet()
446+
.stream()
447+
.collect(
448+
Collectors.toMap(Map.Entry::getKey, e -> Tuple.tuple(e.getValue().v1().get(), e.getValue().v2())))),
442449
timeSinceLastFetchMillis);
443450
}
444451

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package org.elasticsearch.xpack.ccr.action;
88

99
import org.elasticsearch.ElasticsearchException;
10+
import org.elasticsearch.common.collect.Tuple;
1011
import org.elasticsearch.common.io.stream.Writeable;
1112
import org.elasticsearch.common.xcontent.XContentParser;
1213
import org.elasticsearch.test.AbstractSerializingTestCase;
@@ -83,15 +84,17 @@ protected void assertEqualInstances(final ShardFollowNodeTaskStatus expectedInst
8384
assertThat(newInstance.numberOfOperationsIndexed(), equalTo(expectedInstance.numberOfOperationsIndexed()));
8485
assertThat(newInstance.fetchExceptions().size(), equalTo(expectedInstance.fetchExceptions().size()));
8586
assertThat(newInstance.fetchExceptions().keySet(), equalTo(expectedInstance.fetchExceptions().keySet()));
86-
for (final Map.Entry<Long, ElasticsearchException> entry : newInstance.fetchExceptions().entrySet()) {
87+
for (final Map.Entry<Long, Tuple<Integer, ElasticsearchException>> entry : newInstance.fetchExceptions().entrySet()) {
88+
final Tuple<Integer, ElasticsearchException> expectedTuple = expectedInstance.fetchExceptions().get(entry.getKey());
89+
assertThat(entry.getValue().v1(), equalTo(expectedTuple.v1()));
8790
// x-content loses the exception
88-
final ElasticsearchException expected = expectedInstance.fetchExceptions().get(entry.getKey());
89-
assertThat(entry.getValue().getMessage(), containsString(expected.getMessage()));
90-
assertNotNull(entry.getValue().getCause());
91+
final ElasticsearchException expected = expectedTuple.v2();
92+
assertThat(entry.getValue().v2().getMessage(), containsString(expected.getMessage()));
93+
assertNotNull(entry.getValue().v2().getCause());
9194
assertThat(
92-
entry.getValue().getCause(),
95+
entry.getValue().v2().getCause(),
9396
anyOf(instanceOf(ElasticsearchException.class), instanceOf(IllegalStateException.class)));
94-
assertThat(entry.getValue().getCause().getMessage(), containsString(expected.getCause().getMessage()));
97+
assertThat(entry.getValue().v2().getCause().getMessage(), containsString(expected.getCause().getMessage()));
9598
}
9699
assertThat(newInstance.timeSinceLastFetchMillis(), equalTo(expectedInstance.timeSinceLastFetchMillis()));
97100
}
@@ -101,11 +104,15 @@ protected boolean assertToXContentEquivalence() {
101104
return false;
102105
}
103106

104-
private NavigableMap<Long, ElasticsearchException> randomReadExceptions() {
107+
private NavigableMap<Long, Tuple<Integer, ElasticsearchException>> randomReadExceptions() {
105108
final int count = randomIntBetween(0, 16);
106-
final NavigableMap<Long, ElasticsearchException> readExceptions = new TreeMap<>();
109+
final NavigableMap<Long, Tuple<Integer, ElasticsearchException>> readExceptions = new TreeMap<>();
107110
for (int i = 0; i < count; i++) {
108-
readExceptions.put(randomNonNegativeLong(), new ElasticsearchException(new IllegalStateException("index [" + i + "]")));
111+
readExceptions.put(
112+
randomNonNegativeLong(),
113+
Tuple.tuple(
114+
randomIntBetween(0, Integer.MAX_VALUE),
115+
new ElasticsearchException(new IllegalStateException("index [" + i + "]"))));
109116
}
110117
return readExceptions;
111118
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import org.elasticsearch.ElasticsearchException;
99
import org.elasticsearch.common.UUIDs;
10+
import org.elasticsearch.common.collect.Tuple;
1011
import org.elasticsearch.common.unit.TimeValue;
1112
import org.elasticsearch.index.shard.ShardId;
1213
import org.elasticsearch.index.shard.ShardNotFoundException;
@@ -192,12 +193,13 @@ public void testReceiveRetryableError() {
192193
assertThat(status.numberOfFailedFetches(), equalTo(retryCounter.get()));
193194
if (retryCounter.get() > 0) {
194195
assertThat(status.fetchExceptions().entrySet(), hasSize(1));
195-
final Map.Entry<Long, ElasticsearchException> entry = status.fetchExceptions().entrySet().iterator().next();
196+
final Map.Entry<Long, Tuple<Integer, ElasticsearchException>> entry = status.fetchExceptions().entrySet().iterator().next();
197+
assertThat(entry.getValue().v1(), equalTo(Math.toIntExact(retryCounter.get())));
196198
assertThat(entry.getKey(), equalTo(0L));
197-
assertThat(entry.getValue(), instanceOf(ElasticsearchException.class));
198-
assertNotNull(entry.getValue().getCause());
199-
assertThat(entry.getValue().getCause(), instanceOf(ShardNotFoundException.class));
200-
final ShardNotFoundException cause = (ShardNotFoundException) entry.getValue().getCause();
199+
assertThat(entry.getValue().v2(), instanceOf(ElasticsearchException.class));
200+
assertNotNull(entry.getValue().v2().getCause());
201+
assertThat(entry.getValue().v2().getCause(), instanceOf(ShardNotFoundException.class));
202+
final ShardNotFoundException cause = (ShardNotFoundException) entry.getValue().v2().getCause();
201203
assertThat(cause.getShardId().getIndexName(), equalTo("leader_index"));
202204
assertThat(cause.getShardId().getId(), equalTo(0));
203205
}
@@ -253,12 +255,12 @@ public void testReceiveNonRetryableError() {
253255
assertThat(status.numberOfConcurrentWrites(), equalTo(0));
254256
assertThat(status.numberOfFailedFetches(), equalTo(1L));
255257
assertThat(status.fetchExceptions().entrySet(), hasSize(1));
256-
final Map.Entry<Long, ElasticsearchException> entry = status.fetchExceptions().entrySet().iterator().next();
258+
final Map.Entry<Long, Tuple<Integer, ElasticsearchException>> entry = status.fetchExceptions().entrySet().iterator().next();
257259
assertThat(entry.getKey(), equalTo(0L));
258-
assertThat(entry.getValue(), instanceOf(ElasticsearchException.class));
259-
assertNotNull(entry.getValue().getCause());
260-
assertThat(entry.getValue().getCause(), instanceOf(RuntimeException.class));
261-
final RuntimeException cause = (RuntimeException) entry.getValue().getCause();
260+
assertThat(entry.getValue().v2(), instanceOf(ElasticsearchException.class));
261+
assertNotNull(entry.getValue().v2().getCause());
262+
assertThat(entry.getValue().v2().getCause(), instanceOf(RuntimeException.class));
263+
final RuntimeException cause = (RuntimeException) entry.getValue().v2().getCause();
262264
assertThat(cause.getMessage(), equalTo("replication failed"));
263265
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
264266
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.elasticsearch.ElasticsearchException;
1010
import org.elasticsearch.common.ParseField;
1111
import org.elasticsearch.common.Strings;
12+
import org.elasticsearch.common.collect.Tuple;
1213
import org.elasticsearch.common.io.stream.StreamInput;
1314
import org.elasticsearch.common.io.stream.StreamOutput;
1415
import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -84,17 +85,17 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
8485
(long) args[19],
8586
(long) args[20],
8687
new TreeMap<>(
87-
((List<Map.Entry<Long, ElasticsearchException>>) args[21])
88+
((List<Map.Entry<Long, Tuple<Integer, ElasticsearchException>>>) args[21])
8889
.stream()
8990
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))),
9091
(long) args[22]));
9192

9293
public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry";
9394

94-
static final ConstructingObjectParser<Map.Entry<Long, ElasticsearchException>, Void> FETCH_EXCEPTIONS_ENTRY_PARSER =
95+
static final ConstructingObjectParser<Map.Entry<Long, Tuple<Integer, ElasticsearchException>>, Void> FETCH_EXCEPTIONS_ENTRY_PARSER =
9596
new ConstructingObjectParser<>(
9697
FETCH_EXCEPTIONS_ENTRY_PARSER_NAME,
97-
args -> new AbstractMap.SimpleEntry<>((long) args[0], (ElasticsearchException) args[1]));
98+
args -> new AbstractMap.SimpleEntry<>((long) args[0], Tuple.tuple((Integer)args[1], (ElasticsearchException)args[2])));
9899

99100
static {
100101
STATUS_PARSER.declareString(ConstructingObjectParser.constructorArg(), LEADER_INDEX);
@@ -123,10 +124,12 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
123124
}
124125

125126
static final ParseField FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO = new ParseField("from_seq_no");
127+
static final ParseField FETCH_EXCEPTIONS_RETRIES = new ParseField("retries");
126128
static final ParseField FETCH_EXCEPTIONS_ENTRY_EXCEPTION = new ParseField("exception");
127129

128130
static {
129131
FETCH_EXCEPTIONS_ENTRY_PARSER.declareLong(ConstructingObjectParser.constructorArg(), FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO);
132+
FETCH_EXCEPTIONS_ENTRY_PARSER.declareInt(ConstructingObjectParser.constructorArg(), FETCH_EXCEPTIONS_RETRIES);
130133
FETCH_EXCEPTIONS_ENTRY_PARSER.declareObject(
131134
ConstructingObjectParser.constructorArg(),
132135
(p, c) -> ElasticsearchException.fromXContent(p),
@@ -259,9 +262,9 @@ public long numberOfOperationsIndexed() {
259262
return numberOfOperationsIndexed;
260263
}
261264

262-
private final NavigableMap<Long, ElasticsearchException> fetchExceptions;
265+
private final NavigableMap<Long, Tuple<Integer, ElasticsearchException>> fetchExceptions;
263266

264-
public NavigableMap<Long, ElasticsearchException> fetchExceptions() {
267+
public NavigableMap<Long, Tuple<Integer, ElasticsearchException>> fetchExceptions() {
265268
return fetchExceptions;
266269
}
267270

@@ -293,7 +296,7 @@ public ShardFollowNodeTaskStatus(
293296
final long numberOfSuccessfulBulkOperations,
294297
final long numberOfFailedBulkOperations,
295298
final long numberOfOperationsIndexed,
296-
final NavigableMap<Long, ElasticsearchException> fetchExceptions,
299+
final NavigableMap<Long, Tuple<Integer, ElasticsearchException>> fetchExceptions,
297300
final long timeSinceLastFetchMillis) {
298301
this.leaderIndex = leaderIndex;
299302
this.followerIndex = followerIndex;
@@ -342,7 +345,8 @@ public ShardFollowNodeTaskStatus(final StreamInput in) throws IOException {
342345
this.numberOfSuccessfulBulkOperations = in.readVLong();
343346
this.numberOfFailedBulkOperations = in.readVLong();
344347
this.numberOfOperationsIndexed = in.readVLong();
345-
this.fetchExceptions = new TreeMap<>(in.readMap(StreamInput::readVLong, StreamInput::readException));
348+
this.fetchExceptions =
349+
new TreeMap<>(in.readMap(StreamInput::readVLong, stream -> Tuple.tuple(stream.readVInt(), stream.readException())));
346350
this.timeSinceLastFetchMillis = in.readZLong();
347351
}
348352

@@ -374,7 +378,10 @@ public void writeTo(final StreamOutput out) throws IOException {
374378
out.writeVLong(numberOfSuccessfulBulkOperations);
375379
out.writeVLong(numberOfFailedBulkOperations);
376380
out.writeVLong(numberOfOperationsIndexed);
377-
out.writeMap(fetchExceptions, StreamOutput::writeVLong, StreamOutput::writeException);
381+
out.writeMap(
382+
fetchExceptions,
383+
StreamOutput::writeVLong,
384+
(stream, value) -> { stream.writeVInt(value.v1()); stream.writeException(value.v2()); });
378385
out.writeZLong(timeSinceLastFetchMillis);
379386
}
380387

@@ -421,14 +428,15 @@ public XContentBuilder toXContentFragment(final XContentBuilder builder, final P
421428
builder.field(NUMBER_OF_OPERATIONS_INDEXED_FIELD.getPreferredName(), numberOfOperationsIndexed);
422429
builder.startArray(FETCH_EXCEPTIONS.getPreferredName());
423430
{
424-
for (final Map.Entry<Long, ElasticsearchException> entry : fetchExceptions.entrySet()) {
431+
for (final Map.Entry<Long, Tuple<Integer, ElasticsearchException>> entry : fetchExceptions.entrySet()) {
425432
builder.startObject();
426433
{
427434
builder.field(FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO.getPreferredName(), entry.getKey());
435+
builder.field(FETCH_EXCEPTIONS_RETRIES.getPreferredName(), entry.getValue().v1());
428436
builder.field(FETCH_EXCEPTIONS_ENTRY_EXCEPTION.getPreferredName());
429437
builder.startObject();
430438
{
431-
ElasticsearchException.generateThrowableXContent(builder, params, entry.getValue());
439+
ElasticsearchException.generateThrowableXContent(builder, params, entry.getValue().v2());
432440
}
433441
builder.endObject();
434442
}
@@ -515,7 +523,7 @@ public int hashCode() {
515523
}
516524

517525
private static List<String> getFetchExceptionMessages(final ShardFollowNodeTaskStatus status) {
518-
return status.fetchExceptions().values().stream().map(ElasticsearchException::getMessage).collect(Collectors.toList());
526+
return status.fetchExceptions().values().stream().map(t -> t.v2().getMessage()).collect(Collectors.toList());
519527
}
520528

521529
public String toString() {

x-pack/plugin/core/src/main/resources/monitoring-es.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -987,6 +987,9 @@
987987
"from_seq_no": {
988988
"type": "long"
989989
},
990+
"retries": {
991+
"type": "integer"
992+
},
990993
"exception": {
991994
"type": "text"
992995
}

0 commit comments

Comments
 (0)