Skip to content

Commit 1579916

Browse files
committed
[CCR] Improve shard follow task's retryable error handling (#33371)
Improve failure handling of retryable errors by retrying remote calls in a exponential backoff like manner. The delay between a retry would not be longer than the configured max retry delay. Also retryable errors will be retried indefinitely. Relates to #30086
1 parent 5fa2acc commit 1579916

File tree

9 files changed

+92
-172
lines changed

9 files changed

+92
-172
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternAction.java

+13-13
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,9 @@ public static class Request extends AcknowledgedRequest<Request> implements ToXC
6464
PARSER.declareLong(Request::setMaxOperationSizeInBytes, AutoFollowPattern.MAX_BATCH_SIZE_IN_BYTES);
6565
PARSER.declareInt(Request::setMaxConcurrentWriteBatches, AutoFollowPattern.MAX_CONCURRENT_WRITE_BATCHES);
6666
PARSER.declareInt(Request::setMaxWriteBufferSize, AutoFollowPattern.MAX_WRITE_BUFFER_SIZE);
67-
PARSER.declareField(Request::setRetryTimeout,
67+
PARSER.declareField(Request::setMaxRetryDelay,
6868
(p, c) -> TimeValue.parseTimeValue(p.text(), AutoFollowPattern.RETRY_TIMEOUT.getPreferredName()),
69-
ShardFollowTask.RETRY_TIMEOUT, ObjectParser.ValueType.STRING);
69+
ShardFollowTask.MAX_RETRY_DELAY, ObjectParser.ValueType.STRING);
7070
PARSER.declareField(Request::setIdleShardRetryDelay,
7171
(p, c) -> TimeValue.parseTimeValue(p.text(), AutoFollowPattern.IDLE_SHARD_RETRY_DELAY.getPreferredName()),
7272
ShardFollowTask.IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING);
@@ -95,7 +95,7 @@ public static Request fromXContent(XContentParser parser, String remoteClusterAl
9595
private Long maxOperationSizeInBytes;
9696
private Integer maxConcurrentWriteBatches;
9797
private Integer maxWriteBufferSize;
98-
private TimeValue retryTimeout;
98+
private TimeValue maxRetryDelay;
9999
private TimeValue idleShardRetryDelay;
100100

101101
@Override
@@ -174,12 +174,12 @@ public void setMaxWriteBufferSize(Integer maxWriteBufferSize) {
174174
this.maxWriteBufferSize = maxWriteBufferSize;
175175
}
176176

177-
public TimeValue getRetryTimeout() {
178-
return retryTimeout;
177+
public TimeValue getMaxRetryDelay() {
178+
return maxRetryDelay;
179179
}
180180

181-
public void setRetryTimeout(TimeValue retryTimeout) {
182-
this.retryTimeout = retryTimeout;
181+
public void setMaxRetryDelay(TimeValue maxRetryDelay) {
182+
this.maxRetryDelay = maxRetryDelay;
183183
}
184184

185185
public TimeValue getIdleShardRetryDelay() {
@@ -201,7 +201,7 @@ public void readFrom(StreamInput in) throws IOException {
201201
maxOperationSizeInBytes = in.readOptionalLong();
202202
maxConcurrentWriteBatches = in.readOptionalVInt();
203203
maxWriteBufferSize = in.readOptionalVInt();
204-
retryTimeout = in.readOptionalTimeValue();
204+
maxRetryDelay = in.readOptionalTimeValue();
205205
idleShardRetryDelay = in.readOptionalTimeValue();
206206
}
207207

@@ -216,7 +216,7 @@ public void writeTo(StreamOutput out) throws IOException {
216216
out.writeOptionalLong(maxOperationSizeInBytes);
217217
out.writeOptionalVInt(maxConcurrentWriteBatches);
218218
out.writeOptionalVInt(maxWriteBufferSize);
219-
out.writeOptionalTimeValue(retryTimeout);
219+
out.writeOptionalTimeValue(maxRetryDelay);
220220
out.writeOptionalTimeValue(idleShardRetryDelay);
221221
}
222222

@@ -244,8 +244,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
244244
if (maxConcurrentWriteBatches != null) {
245245
builder.field(ShardFollowTask.MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
246246
}
247-
if (retryTimeout != null) {
248-
builder.field(ShardFollowTask.RETRY_TIMEOUT.getPreferredName(), retryTimeout.getStringRep());
247+
if (maxRetryDelay != null) {
248+
builder.field(ShardFollowTask.MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep());
249249
}
250250
if (idleShardRetryDelay != null) {
251251
builder.field(ShardFollowTask.IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep());
@@ -268,7 +268,7 @@ public boolean equals(Object o) {
268268
Objects.equals(maxOperationSizeInBytes, request.maxOperationSizeInBytes) &&
269269
Objects.equals(maxConcurrentWriteBatches, request.maxConcurrentWriteBatches) &&
270270
Objects.equals(maxWriteBufferSize, request.maxWriteBufferSize) &&
271-
Objects.equals(retryTimeout, request.retryTimeout) &&
271+
Objects.equals(maxRetryDelay, request.maxRetryDelay) &&
272272
Objects.equals(idleShardRetryDelay, request.idleShardRetryDelay);
273273
}
274274

@@ -283,7 +283,7 @@ public int hashCode() {
283283
maxOperationSizeInBytes,
284284
maxConcurrentWriteBatches,
285285
maxWriteBufferSize,
286-
retryTimeout,
286+
maxRetryDelay,
287287
idleShardRetryDelay
288288
);
289289
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

+21-12
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import org.apache.logging.log4j.message.ParameterizedMessage;
1111
import org.elasticsearch.ElasticsearchException;
1212
import org.elasticsearch.action.support.TransportActions;
13+
import org.elasticsearch.common.Randomness;
1314
import org.elasticsearch.common.logging.Loggers;
1415
import org.elasticsearch.common.transport.NetworkExceptionHelper;
1516
import org.elasticsearch.common.unit.TimeValue;
@@ -18,7 +19,6 @@
1819
import org.elasticsearch.persistent.AllocatedPersistentTask;
1920
import org.elasticsearch.tasks.TaskId;
2021
import org.elasticsearch.xpack.ccr.action.bulk.BulkShardOperationsResponse;
21-
import org.elasticsearch.xpack.core.ccr.action.FollowIndexAction;
2222
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
2323

2424
import java.util.ArrayList;
@@ -43,11 +43,12 @@
4343
*/
4444
public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
4545

46+
private static final int DELAY_MILLIS = 50;
4647
private static final Logger LOGGER = Loggers.getLogger(ShardFollowNodeTask.class);
4748

4849
private final String leaderIndex;
4950
private final ShardFollowTask params;
50-
private final TimeValue retryTimeout;
51+
private final TimeValue maxRetryDelay;
5152
private final TimeValue idleShardChangesRequestDelay;
5253
private final BiConsumer<TimeValue, Runnable> scheduler;
5354
private final LongSupplier relativeTimeProvider;
@@ -79,7 +80,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
7980
this.params = params;
8081
this.scheduler = scheduler;
8182
this.relativeTimeProvider = relativeTimeProvider;
82-
this.retryTimeout = params.getRetryTimeout();
83+
this.maxRetryDelay = params.getMaxRetryDelay();
8384
this.idleShardChangesRequestDelay = params.getIdleShardRetryDelay();
8485
/*
8586
* We keep track of the most recent fetch exceptions, with the number of exceptions that we track equal to the maximum number of
@@ -357,20 +358,28 @@ private void updateMapping(LongConsumer handler, AtomicInteger retryCounter) {
357358

358359
private void handleFailure(Exception e, AtomicInteger retryCounter, Runnable task) {
359360
assert e != null;
360-
if (shouldRetry(e)) {
361-
if (isStopped() == false && retryCounter.incrementAndGet() <= FollowIndexAction.RETRY_LIMIT) {
362-
LOGGER.debug(new ParameterizedMessage("{} error during follow shard task, retrying...", params.getFollowShardId()), e);
363-
scheduler.accept(retryTimeout, task);
364-
} else {
365-
markAsFailed(new ElasticsearchException("retrying failed [" + retryCounter.get() +
366-
"] times, aborting...", e));
367-
}
361+
if (shouldRetry(e) && isStopped() == false) {
362+
int currentRetry = retryCounter.incrementAndGet();
363+
LOGGER.debug(new ParameterizedMessage("{} error during follow shard task, retrying [{}]",
364+
params.getFollowShardId(), currentRetry), e);
365+
long delay = computeDelay(currentRetry, maxRetryDelay.getMillis());
366+
scheduler.accept(TimeValue.timeValueMillis(delay), task);
368367
} else {
369368
markAsFailed(e);
370369
}
371370
}
372371

373-
private boolean shouldRetry(Exception e) {
372+
static long computeDelay(int currentRetry, long maxRetryDelayInMillis) {
373+
// Cap currentRetry to avoid overflow when computing n variable
374+
int maxCurrentRetry = Math.min(currentRetry, 24);
375+
long n = Math.round(Math.pow(2, maxCurrentRetry - 1));
376+
// + 1 here, because nextInt(...) bound is exclusive and otherwise the first delay would always be zero.
377+
int k = Randomness.get().nextInt(Math.toIntExact(n + 1));
378+
int backOffDelay = k * DELAY_MILLIS;
379+
return Math.min(backOffDelay, maxRetryDelayInMillis);
380+
}
381+
382+
private static boolean shouldRetry(Exception e) {
374383
return NetworkExceptionHelper.isConnectException(e) ||
375384
NetworkExceptionHelper.isCloseConnectionException(e) ||
376385
TransportActions.isShardNotAvailableException(e);

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTask.java

+13-13
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
4848
public static final ParseField MAX_BATCH_SIZE_IN_BYTES = new ParseField("max_batch_size_in_bytes");
4949
public static final ParseField MAX_CONCURRENT_WRITE_BATCHES = new ParseField("max_concurrent_write_batches");
5050
public static final ParseField MAX_WRITE_BUFFER_SIZE = new ParseField("max_write_buffer_size");
51-
public static final ParseField RETRY_TIMEOUT = new ParseField("retry_timeout");
51+
public static final ParseField MAX_RETRY_DELAY = new ParseField("max_retry_delay");
5252
public static final ParseField IDLE_SHARD_RETRY_DELAY = new ParseField("idle_shard_retry_delay");
5353

5454
@SuppressWarnings("unchecked")
@@ -71,8 +71,8 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
7171
PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_CONCURRENT_WRITE_BATCHES);
7272
PARSER.declareInt(ConstructingObjectParser.constructorArg(), MAX_WRITE_BUFFER_SIZE);
7373
PARSER.declareField(ConstructingObjectParser.constructorArg(),
74-
(p, c) -> TimeValue.parseTimeValue(p.text(), RETRY_TIMEOUT.getPreferredName()),
75-
RETRY_TIMEOUT, ObjectParser.ValueType.STRING);
74+
(p, c) -> TimeValue.parseTimeValue(p.text(), MAX_RETRY_DELAY.getPreferredName()),
75+
MAX_RETRY_DELAY, ObjectParser.ValueType.STRING);
7676
PARSER.declareField(ConstructingObjectParser.constructorArg(),
7777
(p, c) -> TimeValue.parseTimeValue(p.text(), IDLE_SHARD_RETRY_DELAY.getPreferredName()),
7878
IDLE_SHARD_RETRY_DELAY, ObjectParser.ValueType.STRING);
@@ -87,13 +87,13 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
8787
private final long maxBatchSizeInBytes;
8888
private final int maxConcurrentWriteBatches;
8989
private final int maxWriteBufferSize;
90-
private final TimeValue retryTimeout;
90+
private final TimeValue maxRetryDelay;
9191
private final TimeValue idleShardRetryDelay;
9292
private final Map<String, String> headers;
9393

9494
ShardFollowTask(String leaderClusterAlias, ShardId followShardId, ShardId leaderShardId, int maxBatchOperationCount,
9595
int maxConcurrentReadBatches, long maxBatchSizeInBytes, int maxConcurrentWriteBatches,
96-
int maxWriteBufferSize, TimeValue retryTimeout, TimeValue idleShardRetryDelay, Map<String, String> headers) {
96+
int maxWriteBufferSize, TimeValue maxRetryDelay, TimeValue idleShardRetryDelay, Map<String, String> headers) {
9797
this.leaderClusterAlias = leaderClusterAlias;
9898
this.followShardId = followShardId;
9999
this.leaderShardId = leaderShardId;
@@ -102,7 +102,7 @@ public class ShardFollowTask implements XPackPlugin.XPackPersistentTaskParams {
102102
this.maxBatchSizeInBytes = maxBatchSizeInBytes;
103103
this.maxConcurrentWriteBatches = maxConcurrentWriteBatches;
104104
this.maxWriteBufferSize = maxWriteBufferSize;
105-
this.retryTimeout = retryTimeout;
105+
this.maxRetryDelay = maxRetryDelay;
106106
this.idleShardRetryDelay = idleShardRetryDelay;
107107
this.headers = headers != null ? Collections.unmodifiableMap(headers) : Collections.emptyMap();
108108
}
@@ -116,7 +116,7 @@ public ShardFollowTask(StreamInput in) throws IOException {
116116
this.maxBatchSizeInBytes = in.readVLong();
117117
this.maxConcurrentWriteBatches = in.readVInt();
118118
this.maxWriteBufferSize = in.readVInt();
119-
this.retryTimeout = in.readTimeValue();
119+
this.maxRetryDelay = in.readTimeValue();
120120
this.idleShardRetryDelay = in.readTimeValue();
121121
this.headers = Collections.unmodifiableMap(in.readMap(StreamInput::readString, StreamInput::readString));
122122
}
@@ -153,8 +153,8 @@ public long getMaxBatchSizeInBytes() {
153153
return maxBatchSizeInBytes;
154154
}
155155

156-
public TimeValue getRetryTimeout() {
157-
return retryTimeout;
156+
public TimeValue getMaxRetryDelay() {
157+
return maxRetryDelay;
158158
}
159159

160160
public TimeValue getIdleShardRetryDelay() {
@@ -184,7 +184,7 @@ public void writeTo(StreamOutput out) throws IOException {
184184
out.writeVLong(maxBatchSizeInBytes);
185185
out.writeVInt(maxConcurrentWriteBatches);
186186
out.writeVInt(maxWriteBufferSize);
187-
out.writeTimeValue(retryTimeout);
187+
out.writeTimeValue(maxRetryDelay);
188188
out.writeTimeValue(idleShardRetryDelay);
189189
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
190190
}
@@ -210,7 +210,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
210210
builder.field(MAX_BATCH_SIZE_IN_BYTES.getPreferredName(), maxBatchSizeInBytes);
211211
builder.field(MAX_CONCURRENT_WRITE_BATCHES.getPreferredName(), maxConcurrentWriteBatches);
212212
builder.field(MAX_WRITE_BUFFER_SIZE.getPreferredName(), maxWriteBufferSize);
213-
builder.field(RETRY_TIMEOUT.getPreferredName(), retryTimeout.getStringRep());
213+
builder.field(MAX_RETRY_DELAY.getPreferredName(), maxRetryDelay.getStringRep());
214214
builder.field(IDLE_SHARD_RETRY_DELAY.getPreferredName(), idleShardRetryDelay.getStringRep());
215215
builder.field(HEADERS.getPreferredName(), headers);
216216
return builder.endObject();
@@ -229,15 +229,15 @@ public boolean equals(Object o) {
229229
maxConcurrentWriteBatches == that.maxConcurrentWriteBatches &&
230230
maxBatchSizeInBytes == that.maxBatchSizeInBytes &&
231231
maxWriteBufferSize == that.maxWriteBufferSize &&
232-
Objects.equals(retryTimeout, that.retryTimeout) &&
232+
Objects.equals(maxRetryDelay, that.maxRetryDelay) &&
233233
Objects.equals(idleShardRetryDelay, that.idleShardRetryDelay) &&
234234
Objects.equals(headers, that.headers);
235235
}
236236

237237
@Override
238238
public int hashCode() {
239239
return Objects.hash(leaderClusterAlias, followShardId, leaderShardId, maxBatchOperationCount, maxConcurrentReadBatches,
240-
maxConcurrentWriteBatches, maxBatchSizeInBytes, maxWriteBufferSize, retryTimeout, idleShardRetryDelay, headers);
240+
maxConcurrentWriteBatches, maxBatchSizeInBytes, maxWriteBufferSize, maxRetryDelay, idleShardRetryDelay, headers);
241241
}
242242

243243
public String toString() {

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportFollowIndexAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ void start(
181181
request.getMaxOperationSizeInBytes(),
182182
request.getMaxConcurrentWriteBatches(),
183183
request.getMaxWriteBufferSize(),
184-
request.getRetryTimeout(),
184+
request.getMaxRetryDelay(),
185185
request.getIdleShardRetryDelay(),
186186
filteredHeaders);
187187
persistentTasksService.sendStartRequest(taskId, ShardFollowTask.NAME, shardFollowTask,

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/TransportPutAutoFollowPatternAction.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ static ClusterState innerPut(PutAutoFollowPatternAction.Request request,
149149
request.getMaxOperationSizeInBytes(),
150150
request.getMaxConcurrentWriteBatches(),
151151
request.getMaxWriteBufferSize(),
152-
request.getRetryTimeout(),
152+
request.getMaxRetryDelay(),
153153
request.getIdleShardRetryDelay()
154154
);
155155
patterns.put(request.getLeaderClusterAlias(), autoFollowPattern);

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/AutoFollowTests.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ public void testAutoFollowParameterAreDelegated() throws Exception {
131131
request.setMaxOperationSizeInBytes(randomNonNegativeLong());
132132
}
133133
if (randomBoolean()) {
134-
request.setRetryTimeout(TimeValue.timeValueMillis(500));
134+
request.setMaxRetryDelay(TimeValue.timeValueMillis(500));
135135
}
136136
if (randomBoolean()) {
137137
request.setIdleShardRetryDelay(TimeValue.timeValueMillis(500));
@@ -162,8 +162,8 @@ public void testAutoFollowParameterAreDelegated() throws Exception {
162162
if (request.getMaxOperationSizeInBytes() != null) {
163163
assertThat(shardFollowTask.getMaxBatchSizeInBytes(), equalTo(request.getMaxOperationSizeInBytes()));
164164
}
165-
if (request.getRetryTimeout() != null) {
166-
assertThat(shardFollowTask.getRetryTimeout(), equalTo(request.getRetryTimeout()));
165+
if (request.getMaxRetryDelay() != null) {
166+
assertThat(shardFollowTask.getMaxRetryDelay(), equalTo(request.getMaxRetryDelay()));
167167
}
168168
if (request.getIdleShardRetryDelay() != null) {
169169
assertThat(shardFollowTask.getIdleShardRetryDelay(), equalTo(request.getIdleShardRetryDelay()));

x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/PutAutoFollowPatternRequestTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ protected PutAutoFollowPatternAction.Request createTestInstance() {
4141
request.setIdleShardRetryDelay(TimeValue.timeValueMillis(500));
4242
}
4343
if (randomBoolean()) {
44-
request.setRetryTimeout(TimeValue.timeValueMillis(500));
44+
request.setMaxRetryDelay(TimeValue.timeValueMillis(500));
4545
}
4646
if (randomBoolean()) {
4747
request.setMaxBatchOperationCount(randomIntBetween(0, Integer.MAX_VALUE));

0 commit comments

Comments
 (0)