@@ -155,11 +155,12 @@ private void handlePrimaryResult(final PrimaryResultT primaryResult) {
155
155
@ Override
156
156
public void onResponse (Void aVoid ) {
157
157
successfulShards .incrementAndGet ();
158
- try {
159
- updateCheckPoints (primary .routingEntry (), primary ::localCheckpoint , primary ::globalCheckpoint );
160
- } finally {
161
- decPendingAndFinishIfNeeded ();
162
- }
158
+ updateCheckPoints (
159
+ primary .routingEntry (),
160
+ primary ::localCheckpoint ,
161
+ primary ::globalCheckpoint ,
162
+ () -> decPendingAndFinishIfNeeded ()
163
+ );
163
164
}
164
165
165
166
@ Override
@@ -221,11 +222,7 @@ private void performOnReplica(
221
222
@ Override
222
223
public void onResponse (ReplicaResponse response ) {
223
224
successfulShards .incrementAndGet ();
224
- try {
225
- updateCheckPoints (shard , response ::localCheckpoint , response ::globalCheckpoint );
226
- } finally {
227
- decPendingAndFinishIfNeeded ();
228
- }
225
+ updateCheckPoints (shard , response ::localCheckpoint , response ::globalCheckpoint , () -> decPendingAndFinishIfNeeded ());
229
226
}
230
227
231
228
@ Override
@@ -302,16 +299,46 @@ public boolean shouldRetry(Exception e) {
302
299
replicationAction .run ();
303
300
}
304
301
305
- private void updateCheckPoints (ShardRouting shard , LongSupplier localCheckpointSupplier , LongSupplier globalCheckpointSupplier ) {
302
+ private void updateCheckPoints (
303
+ ShardRouting shard ,
304
+ LongSupplier localCheckpointSupplier ,
305
+ LongSupplier globalCheckpointSupplier ,
306
+ Runnable onCompletion
307
+ ) {
308
+ boolean forked = false ;
306
309
try {
307
310
primary .updateLocalCheckpointForShard (shard .allocationId ().getId (), localCheckpointSupplier .getAsLong ());
308
311
primary .updateGlobalCheckpointForShard (shard .allocationId ().getId (), globalCheckpointSupplier .getAsLong ());
309
312
} catch (final AlreadyClosedException e ) {
310
313
// the index was deleted or this shard was never activated after a relocation; fall through and finish normally
311
314
} catch (final Exception e ) {
312
- // fail the primary but fall through and let the rest of operation processing complete
313
- final String message = String .format (Locale .ROOT , "primary failed updating local checkpoint for replica %s" , shard );
314
- primary .failShard (message , e );
315
+ threadPool .executor (ThreadPool .Names .WRITE ).execute (new AbstractRunnable () {
316
+ @ Override
317
+ public void onFailure (Exception e ) {
318
+ assert false : e ;
319
+ }
320
+
321
+ @ Override
322
+ public boolean isForceExecution () {
323
+ return true ;
324
+ }
325
+
326
+ @ Override
327
+ protected void doRun () {
328
+ // fail the primary but fall through and let the rest of operation processing complete
329
+ primary .failShard (String .format (Locale .ROOT , "primary failed updating local checkpoint for replica %s" , shard ), e );
330
+ }
331
+
332
+ @ Override
333
+ public void onAfter () {
334
+ onCompletion .run ();
335
+ }
336
+ });
337
+ forked = true ;
338
+ } finally {
339
+ if (forked == false ) {
340
+ onCompletion .run ();
341
+ }
315
342
}
316
343
}
317
344
0 commit comments