Skip to content

Commit a35ac16

Browse files
committed
Prepare ShardFollowNodeTask to bootstrap when it fall behind leader shard
* Changed the shard changes api to include a special metadata in the exception being thrown to indicate that the ops are no longer there. * Changed ShardFollowNodeTask to handle this exception with special metadata and mark a shard as fallen behind its leader shard. The shard follow task will then abort its on going replication. The code that does the restore from ccr repository still needs to be added. This change should make that change a bit easier. Relates to elastic#35975
1 parent 676e1b1 commit a35ac16

File tree

10 files changed

+114
-13
lines changed

10 files changed

+114
-13
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/Ccr.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
113113
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_NAME_KEY = "leader_index_name";
114114
public static final String CCR_CUSTOM_METADATA_REMOTE_CLUSTER_NAME_KEY = "remote_cluster_name";
115115

116+
public static final String FALLEN_BEHIND_LEADER_SHARD_METADATA_KEY = "es.fallen_behind_leader_shard";
117+
116118
private final boolean enabled;
117119
private final Settings settings;
118120
private final CcrLicenseChecker ccrLicenseChecker;

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.elasticsearch.tasks.Task;
3838
import org.elasticsearch.threadpool.ThreadPool;
3939
import org.elasticsearch.transport.TransportService;
40+
import org.elasticsearch.xpack.ccr.Ccr;
4041

4142
import java.io.IOException;
4243
import java.util.ArrayList;
@@ -397,7 +398,11 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
397398
if (cause instanceof IllegalStateException && cause.getMessage().contains("Not all operations between from_seqno [")) {
398399
String message = "Operations are no longer available for replicating. Maybe increase the retention setting [" +
399400
IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey() + "]?";
400-
listener.onFailure(new ElasticsearchException(message, e));
401+
// Make it easy to detect this error in ShardFollowNodeTask:
402+
// (adding a metadata header instead of introducing a new exception that extends ElasticsearchException)
403+
ElasticsearchException wrapper = new ElasticsearchException(message, e);
404+
wrapper.addMetadata(Ccr.FALLEN_BEHIND_LEADER_SHARD_METADATA_KEY);
405+
listener.onFailure(wrapper);
401406
} else {
402407
listener.onFailure(e);
403408
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.elasticsearch.persistent.AllocatedPersistentTask;
3131
import org.elasticsearch.tasks.TaskId;
3232
import org.elasticsearch.transport.ConnectTransportException;
33+
import org.elasticsearch.xpack.ccr.Ccr;
3334
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
3435
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
3536

@@ -43,6 +44,7 @@
4344
import java.util.Queue;
4445
import java.util.TreeMap;
4546
import java.util.concurrent.TimeUnit;
47+
import java.util.concurrent.atomic.AtomicBoolean;
4648
import java.util.concurrent.atomic.AtomicInteger;
4749
import java.util.function.BiConsumer;
4850
import java.util.function.Consumer;
@@ -90,6 +92,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
9092
private final LinkedHashMap<Long, Tuple<AtomicInteger, ElasticsearchException>> fetchExceptions;
9193

9294
private volatile ElasticsearchException fatalException;
95+
private final AtomicBoolean fallenBehindLeaderShard = new AtomicBoolean(false);
9396

9497
ShardFollowNodeTask(long id, String type, String action, String description, TaskId parentTask, Map<String, String> headers,
9598
ShardFollowTask params, BiConsumer<TimeValue, Runnable> scheduler, final LongSupplier relativeTimeProvider) {
@@ -274,6 +277,13 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR
274277
failedReadRequests++;
275278
fetchExceptions.put(from, Tuple.tuple(retryCounter, ExceptionsHelper.convertToElastic(e)));
276279
}
280+
if (e instanceof ElasticsearchException) {
281+
ElasticsearchException elasticsearchException = (ElasticsearchException) e;
282+
if (elasticsearchException.getMetadataKeys().contains(Ccr.FALLEN_BEHIND_LEADER_SHARD_METADATA_KEY)) {
283+
handleFallenBehindLeaderShard(e);
284+
return;
285+
}
286+
}
277287
handleFailure(e, retryCounter, () -> sendShardChangesRequest(from, maxOperationCount, maxRequiredSeqNo, retryCounter));
278288
});
279289
}
@@ -290,6 +300,17 @@ void handleReadResponse(long from, long maxRequiredSeqNo, ShardChangesAction.Res
290300
maybeUpdateSettings(response.getSettingsVersion(), updateMappingsTask);
291301
}
292302

303+
void handleFallenBehindLeaderShard(Exception e) {
304+
if (fallenBehindLeaderShard.compareAndSet(false, true)) {
305+
LOGGER.warn(new ParameterizedMessage("{} shard follow task has fallen behind the leader shard {} that it is following",
306+
params.getFollowShardId(), params.getLeaderShardId()), e);
307+
308+
// Do restore from repository here and
309+
// after start() should be invoked and
310+
// stats should be reset including fallenBehindLeaderShard
311+
}
312+
}
313+
293314
/** Called when some operations are fetched from the leading */
294315
protected void onOperationsFetched(Translog.Operation[] operations) {
295316

@@ -487,7 +508,7 @@ protected void onCancelled() {
487508
}
488509

489510
protected boolean isStopped() {
490-
return fatalException != null || isCancelled() || isCompleted();
511+
return fallenBehindLeaderShard.get() || fatalException != null || isCancelled() || isCompleted();
491512
}
492513

493514
public ShardId getFollowShardId() {
@@ -536,7 +557,8 @@ public synchronized ShardFollowNodeTaskStatus getStatus() {
536557
.collect(
537558
Collectors.toMap(Map.Entry::getKey, e -> Tuple.tuple(e.getValue().v1().get(), e.getValue().v2())))),
538559
timeSinceLastFetchMillis,
539-
fatalException);
560+
fatalException,
561+
fallenBehindLeaderShard.get());
540562
}
541563

542564
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardChangesTests.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@
1616
import org.elasticsearch.index.translog.Translog;
1717
import org.elasticsearch.plugins.Plugin;
1818
import org.elasticsearch.test.ESSingleNodeTestCase;
19+
import org.elasticsearch.xpack.ccr.Ccr;
1920
import org.elasticsearch.xpack.ccr.LocalStateCcr;
2021

2122
import java.util.Collection;
2223
import java.util.Collections;
2324

2425
import static org.hamcrest.Matchers.equalTo;
26+
import static org.hamcrest.Matchers.notNullValue;
2527

2628
public class ShardChangesTests extends ESSingleNodeTestCase {
2729

@@ -113,9 +115,12 @@ public void testMissingOperations() {
113115
request.setFromSeqNo(0L);
114116
request.setMaxOperationCount(1);
115117

116-
Exception e = expectThrows(ElasticsearchException.class, () -> client().execute(ShardChangesAction.INSTANCE, request).actionGet());
118+
ElasticsearchException e =
119+
expectThrows(ElasticsearchException.class, () -> client().execute(ShardChangesAction.INSTANCE, request).actionGet());
117120
assertThat(e.getMessage(), equalTo("Operations are no longer available for replicating. Maybe increase the retention setting " +
118121
"[index.soft_deletes.retention.operations]?"));
122+
assertThat(e.getMetadataKeys().size(), equalTo(1));
123+
assertThat(e.getMetadata(Ccr.FALLEN_BEHIND_LEADER_SHARD_METADATA_KEY), notNullValue());
119124
}
120125

121126
}

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskStatusTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ protected ShardFollowNodeTaskStatus createTestInstance() {
6161
randomNonNegativeLong(),
6262
randomReadExceptions(),
6363
randomLong(),
64-
randomBoolean() ? new ElasticsearchException("fatal error") : null);
64+
randomBoolean() ? new ElasticsearchException("fatal error") : null,
65+
randomBoolean());
6566
}
6667

6768
@Override

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTaskTests.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import org.elasticsearch.index.shard.ShardNotFoundException;
1616
import org.elasticsearch.index.translog.Translog;
1717
import org.elasticsearch.test.ESTestCase;
18+
import org.elasticsearch.xpack.ccr.Ccr;
1819
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
1920
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
2021

@@ -42,6 +43,7 @@
4243
import static org.hamcrest.Matchers.instanceOf;
4344
import static org.hamcrest.Matchers.lessThanOrEqualTo;
4445
import static org.hamcrest.Matchers.sameInstance;
46+
import static org.hamcrest.core.Is.is;
4547

4648
public class ShardFollowNodeTaskTests extends ESTestCase {
4749

@@ -287,6 +289,37 @@ public void testReceiveRetryableError() {
287289
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
288290
}
289291

292+
public void testFallenBehindLeaderShard() {
293+
ShardFollowTaskParams params = new ShardFollowTaskParams();
294+
params.maxReadRequestOperationCount = 64;
295+
params.maxOutstandingReadRequests = 1;
296+
params.maxOutstandingWriteRequests = 1;
297+
ShardFollowNodeTask task = createShardFollowTask(params);
298+
startTask(task, 63, -1);
299+
300+
ElasticsearchException exception = new ElasticsearchException("no ops for you");
301+
exception.addMetadata(Ccr.FALLEN_BEHIND_LEADER_SHARD_METADATA_KEY);
302+
readFailures.add(exception);
303+
mappingVersions.add(1L);
304+
leaderGlobalCheckpoints.add(63L);
305+
maxSeqNos.add(63L);
306+
responseSizes.add(64);
307+
simulateResponse.set(true);
308+
task.coordinateReads();
309+
310+
assertThat(task.isStopped(), is(true));
311+
ShardFollowNodeTaskStatus status = task.getStatus();
312+
assertThat(status.fallenBehindLeaderShard(), is(true));
313+
assertThat(status.outstandingReadRequests(), equalTo(1));
314+
assertThat(status.outstandingWriteRequests(), equalTo(0));
315+
assertThat(status.failedReadRequests(), equalTo(1L));
316+
assertThat(status.successfulReadRequests(), equalTo(0L));
317+
assertThat(status.readExceptions().size(), equalTo(1));
318+
assertThat(status.readExceptions().values().iterator().next().v2(), sameInstance(exception));
319+
assertThat(status.lastRequestedSeqNo(), equalTo(63L));
320+
assertThat(status.leaderGlobalCheckpoint(), equalTo(63L));
321+
}
322+
290323
public void testEmptyShardChangesResponseShouldClearFetchException() {
291324
ShardFollowTaskParams params = new ShardFollowTaskParams();
292325
params.maxReadRequestOperationCount = 64;

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/StatsResponsesTests.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ static FollowStatsAction.StatsResponses createStatsResponse() {
5959
randomNonNegativeLong(),
6060
Collections.emptyNavigableMap(),
6161
randomLong(),
62-
randomBoolean() ? new ElasticsearchException("fatal error") : null);
62+
randomBoolean() ? new ElasticsearchException("fatal error") : null,
63+
randomBoolean());
6364
responses.add(new FollowStatsAction.StatsResponse(status));
6465
}
6566
return new FollowStatsAction.StatsResponses(Collections.emptyList(), Collections.emptyList(), responses);

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/monitoring/collector/ccr/FollowStatsMonitoringDocTests.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ public void testToXContent() throws IOException {
110110
randomNonNegativeLong(),
111111
Tuple.tuple(randomIntBetween(0, Integer.MAX_VALUE), new ElasticsearchException("shard is sad"))));
112112
final long timeSinceLastReadMillis = randomNonNegativeLong();
113+
final boolean fallenBehindLeaderShard = randomBoolean();
113114
final ShardFollowNodeTaskStatus status = new ShardFollowNodeTaskStatus(
114115
"leader_cluster",
115116
"leader_index",
@@ -138,7 +139,8 @@ public void testToXContent() throws IOException {
138139
operationWritten,
139140
fetchExceptions,
140141
timeSinceLastReadMillis,
141-
new ElasticsearchException("fatal error"));
142+
new ElasticsearchException("fatal error"),
143+
fallenBehindLeaderShard);
142144
final FollowStatsMonitoringDoc document = new FollowStatsMonitoringDoc("_cluster", timestamp, intervalMillis, node, status);
143145
final BytesReference xContent = XContentHelper.toXContent(document, XContentType.JSON, false);
144146
assertThat(
@@ -194,7 +196,8 @@ public void testToXContent() throws IOException {
194196
+ "}"
195197
+ "],"
196198
+ "\"time_since_last_read_millis\":" + timeSinceLastReadMillis + ","
197-
+ "\"fatal_exception\":{\"type\":\"exception\",\"reason\":\"fatal error\"}"
199+
+ "\"fatal_exception\":{\"type\":\"exception\",\"reason\":\"fatal error\"},"
200+
+ "\"fallen_behind_leader_shard\":" + fallenBehindLeaderShard
198201
+ "}"
199202
+ "}"));
200203
}
@@ -230,7 +233,8 @@ public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException {
230233
10,
231234
fetchExceptions,
232235
2,
233-
null);
236+
null,
237+
false);
234238
XContentBuilder builder = jsonBuilder();
235239
builder.value(status);
236240
Map<String, Object> serializedStatus = XContentHelper.convertToMap(XContentType.JSON.xContent(), Strings.toString(builder), false);
@@ -252,6 +256,8 @@ public void testShardFollowNodeTaskStatusFieldsMapped() throws IOException {
252256
} else if (fieldValue instanceof String) {
253257
assertThat("expected keyword field type for field [" + fieldName + "]", fieldType,
254258
anyOf(equalTo("keyword"), equalTo("text")));
259+
} else if (fieldValue instanceof Boolean) {
260+
assertThat("expected keyword field type for field [" + fieldName + "]", fieldType, equalTo("boolean"));
255261
} else {
256262
// Manual test specific object fields and if not just fail:
257263
if (fieldName.equals("read_exceptions")) {

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ccr/ShardFollowNodeTaskStatus.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package org.elasticsearch.xpack.core.ccr;
88

99
import org.elasticsearch.ElasticsearchException;
10+
import org.elasticsearch.Version;
1011
import org.elasticsearch.common.ParseField;
1112
import org.elasticsearch.common.Strings;
1213
import org.elasticsearch.common.collect.Tuple;
@@ -62,6 +63,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
6263
private static final ParseField READ_EXCEPTIONS = new ParseField("read_exceptions");
6364
private static final ParseField TIME_SINCE_LAST_READ_MILLIS_FIELD = new ParseField("time_since_last_read_millis");
6465
private static final ParseField FATAL_EXCEPTION = new ParseField("fatal_exception");
66+
private static final ParseField FALLEN_BEHIND_LEADER_SHARD = new ParseField("fallen_behind_leader_shard");
6567

6668
@SuppressWarnings("unchecked")
6769
static final ConstructingObjectParser<ShardFollowNodeTaskStatus, Void> STATUS_PARSER =
@@ -98,7 +100,8 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
98100
.stream()
99101
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))),
100102
(long) args[26],
101-
(ElasticsearchException) args[27]));
103+
(ElasticsearchException) args[27],
104+
(boolean) args[28]));
102105

103106
public static final String READ_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-read-exceptions-entry";
104107

@@ -138,6 +141,7 @@ public class ShardFollowNodeTaskStatus implements Task.Status {
138141
STATUS_PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(),
139142
(p, c) -> ElasticsearchException.fromXContent(p),
140143
FATAL_EXCEPTION);
144+
STATUS_PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), FALLEN_BEHIND_LEADER_SHARD);
141145
}
142146

143147
static final ParseField READ_EXCEPTIONS_ENTRY_FROM_SEQ_NO = new ParseField("from_seq_no");
@@ -321,6 +325,12 @@ public ElasticsearchException getFatalException() {
321325
return fatalException;
322326
}
323327

328+
private final boolean fallenBehindLeaderShard;
329+
330+
public boolean fallenBehindLeaderShard() {
331+
return fallenBehindLeaderShard;
332+
}
333+
324334
public ShardFollowNodeTaskStatus(
325335
final String remoteCluster,
326336
final String leaderIndex,
@@ -349,7 +359,8 @@ public ShardFollowNodeTaskStatus(
349359
final long operationWritten,
350360
final NavigableMap<Long, Tuple<Integer, ElasticsearchException>> readExceptions,
351361
final long timeSinceLastReadMillis,
352-
final ElasticsearchException fatalException) {
362+
final ElasticsearchException fatalException,
363+
final boolean fallenBehindLeaderShard) {
353364
this.remoteCluster = remoteCluster;
354365
this.leaderIndex = leaderIndex;
355366
this.followerIndex = followerIndex;
@@ -378,6 +389,7 @@ public ShardFollowNodeTaskStatus(
378389
this.readExceptions = Objects.requireNonNull(readExceptions);
379390
this.timeSinceLastReadMillis = timeSinceLastReadMillis;
380391
this.fatalException = fatalException;
392+
this.fallenBehindLeaderShard = fallenBehindLeaderShard;
381393
}
382394

383395
public ShardFollowNodeTaskStatus(final StreamInput in) throws IOException {
@@ -410,6 +422,11 @@ public ShardFollowNodeTaskStatus(final StreamInput in) throws IOException {
410422
new TreeMap<>(in.readMap(StreamInput::readVLong, stream -> Tuple.tuple(stream.readVInt(), stream.readException())));
411423
this.timeSinceLastReadMillis = in.readZLong();
412424
this.fatalException = in.readException();
425+
if (in.getVersion().onOrAfter(Version.V_6_7_0)) {
426+
this.fallenBehindLeaderShard = in.readBoolean();
427+
} else {
428+
this.fallenBehindLeaderShard = false;
429+
}
413430
}
414431

415432
@Override
@@ -453,6 +470,9 @@ public void writeTo(final StreamOutput out) throws IOException {
453470
});
454471
out.writeZLong(timeSinceLastReadMillis);
455472
out.writeException(fatalException);
473+
if (out.getVersion().onOrAfter(Version.V_6_7_0)) {
474+
out.writeBoolean(fallenBehindLeaderShard);
475+
}
456476
}
457477

458478
@Override
@@ -536,6 +556,7 @@ public XContentBuilder toXContentFragment(final XContentBuilder builder, final P
536556
}
537557
builder.endObject();
538558
}
559+
builder.field(FALLEN_BEHIND_LEADER_SHARD.getPreferredName(), fallenBehindLeaderShard);
539560
return builder;
540561
}
541562

@@ -582,7 +603,8 @@ public boolean equals(final Object o) {
582603
readExceptions.keySet().equals(that.readExceptions.keySet()) &&
583604
getReadExceptionMessages(this).equals(getReadExceptionMessages(that)) &&
584605
timeSinceLastReadMillis == that.timeSinceLastReadMillis &&
585-
Objects.equals(fatalExceptionMessage, otherFatalExceptionMessage);
606+
Objects.equals(fatalExceptionMessage, otherFatalExceptionMessage) &&
607+
Objects.equals(fallenBehindLeaderShard, that.fallenBehindLeaderShard);
586608
}
587609

588610
@Override
@@ -620,7 +642,8 @@ public int hashCode() {
620642
readExceptions.keySet(),
621643
getReadExceptionMessages(this),
622644
timeSinceLastReadMillis,
623-
fatalExceptionMessage);
645+
fatalExceptionMessage,
646+
fallenBehindLeaderShard);
624647
}
625648

626649
private static List<String> getReadExceptionMessages(final ShardFollowNodeTaskStatus status) {

x-pack/plugin/core/src/main/resources/monitoring-es.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,6 +1028,9 @@
10281028
},
10291029
"time_since_last_read_millis": {
10301030
"type": "long"
1031+
},
1032+
"fallen_behind_leader_shard": {
1033+
"type": "boolean"
10311034
}
10321035
}
10331036
},

0 commit comments

Comments
 (0)