Skip to content

Commit ba9f6bb

Browse files
authored
Fork to WRITE thread before failing shard in updateCheckPoints (elastic#87458) (elastic#87496)
Failing a shard may block on IO so must not happen on a transport worker thread. With this commit we use a `WRITE` thread to handle shard failures caused by exceptions thrown within `updateCheckPoints`. Closes elastic#87094
1 parent 094cce8 commit ba9f6bb

File tree

3 files changed

+57
-17
lines changed

3 files changed

+57
-17
lines changed

docs/changelog/87458.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 87458
2+
summary: Fork to WRITE thread before failing shard in `updateCheckPoints`
3+
area: Engine
4+
type: bug
5+
issues:
6+
- 87094

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -155,11 +155,12 @@ private void handlePrimaryResult(final PrimaryResultT primaryResult) {
155155
@Override
156156
public void onResponse(Void aVoid) {
157157
successfulShards.incrementAndGet();
158-
try {
159-
updateCheckPoints(primary.routingEntry(), primary::localCheckpoint, primary::globalCheckpoint);
160-
} finally {
161-
decPendingAndFinishIfNeeded();
162-
}
158+
updateCheckPoints(
159+
primary.routingEntry(),
160+
primary::localCheckpoint,
161+
primary::globalCheckpoint,
162+
() -> decPendingAndFinishIfNeeded()
163+
);
163164
}
164165

165166
@Override
@@ -221,11 +222,7 @@ private void performOnReplica(
221222
@Override
222223
public void onResponse(ReplicaResponse response) {
223224
successfulShards.incrementAndGet();
224-
try {
225-
updateCheckPoints(shard, response::localCheckpoint, response::globalCheckpoint);
226-
} finally {
227-
decPendingAndFinishIfNeeded();
228-
}
225+
updateCheckPoints(shard, response::localCheckpoint, response::globalCheckpoint, () -> decPendingAndFinishIfNeeded());
229226
}
230227

231228
@Override
@@ -302,16 +299,46 @@ public boolean shouldRetry(Exception e) {
302299
replicationAction.run();
303300
}
304301

305-
private void updateCheckPoints(ShardRouting shard, LongSupplier localCheckpointSupplier, LongSupplier globalCheckpointSupplier) {
302+
private void updateCheckPoints(
303+
ShardRouting shard,
304+
LongSupplier localCheckpointSupplier,
305+
LongSupplier globalCheckpointSupplier,
306+
Runnable onCompletion
307+
) {
308+
boolean forked = false;
306309
try {
307310
primary.updateLocalCheckpointForShard(shard.allocationId().getId(), localCheckpointSupplier.getAsLong());
308311
primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), globalCheckpointSupplier.getAsLong());
309312
} catch (final AlreadyClosedException e) {
310313
// the index was deleted or this shard was never activated after a relocation; fall through and finish normally
311314
} catch (final Exception e) {
312-
// fail the primary but fall through and let the rest of operation processing complete
313-
final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);
314-
primary.failShard(message, e);
315+
threadPool.executor(ThreadPool.Names.WRITE).execute(new AbstractRunnable() {
316+
@Override
317+
public void onFailure(Exception e) {
318+
assert false : e;
319+
}
320+
321+
@Override
322+
public boolean isForceExecution() {
323+
return true;
324+
}
325+
326+
@Override
327+
protected void doRun() {
328+
// fail the primary but fall through and let the rest of operation processing complete
329+
primary.failShard(String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard), e);
330+
}
331+
332+
@Override
333+
public void onAfter() {
334+
onCompletion.run();
335+
}
336+
});
337+
forked = true;
338+
} finally {
339+
if (forked == false) {
340+
onCompletion.run();
341+
}
315342
}
316343
}
317344

server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -510,8 +510,9 @@ public void testPrimaryFailureHandlingReplicaResponse() throws Exception {
510510
final Set<String> trackedShards = shardRoutingTable.getAllAllocationIds();
511511
final ReplicationGroup initialReplicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards, 0);
512512

513+
final Thread testThread = Thread.currentThread();
513514
final boolean fatal = randomBoolean();
514-
final AtomicBoolean primaryFailed = new AtomicBoolean();
515+
final PlainActionFuture<Void> primaryFailedFuture = new PlainActionFuture<>();
515516
final ReplicationOperation.Primary<Request, Request, TestPrimary.Result> primary = new TestPrimary(
516517
primaryRouting,
517518
() -> initialReplicationGroup,
@@ -520,7 +521,10 @@ public void testPrimaryFailureHandlingReplicaResponse() throws Exception {
520521

521522
@Override
522523
public void failShard(String message, Exception exception) {
523-
primaryFailed.set(true);
524+
assertNotSame(testThread, Thread.currentThread());
525+
assertThat(Thread.currentThread().getName(), containsString('[' + ThreadPool.Names.WRITE + ']'));
526+
assertTrue(fatal);
527+
primaryFailedFuture.onResponse(null);
524528
}
525529

526530
@Override
@@ -543,7 +547,10 @@ public void updateLocalCheckpointForShard(String allocationId, long checkpoint)
543547
TestReplicationOperation operation = new TestReplicationOperation(request, primary, listener, replicas, primaryTerm);
544548
operation.execute();
545549

546-
assertThat(primaryFailed.get(), equalTo(fatal));
550+
if (fatal) {
551+
primaryFailedFuture.get(10, TimeUnit.SECONDS);
552+
}
553+
547554
final ShardInfo shardInfo = listener.actionGet().getShardInfo();
548555
assertThat(shardInfo.getFailed(), equalTo(0));
549556
assertThat(shardInfo.getFailures(), arrayWithSize(0));

0 commit comments

Comments
 (0)