Skip to content

Commit fbb92f5

Browse files
committed
Replicate write actions before fsyncing them (#49746)
This commit fixes a number of issues with data replication: - Local and global checkpoints are not updated after the new operations have been fsynced, but might capture a state before the fsync. The reason why this probably went undetected for so long is that AsyncIOProcessor is synchronous if you index one item at a time, and hence working as intended unless you have a high enough level of concurrent indexing. As we rely in other places on the assumption that we have an up-to-date local checkpoint in case of synchronous translog durability, there's a risk for the local and global checkpoints not to be up-to-date after replication completes, and that this won't be corrected by the periodic global checkpoint sync. - AsyncIOProcessor also has another "bad" side effect here: if you index one bulk at a time, the bulk is always first fsynced on the primary before being sent to the replica. Further, if one thread is tasked by AsyncIOProcessor to drain the processing queue and fsync, other threads can easily pile more bulk requests on top of that thread. Things are not very fair here, and the thread might continue doing a lot more fsyncs before returning (as the other threads pile more and more on top), which blocks it from returning as a replication request (e.g. if this thread is on the primary, it blocks the replication requests to the replicas from going out, and delaying checkpoint advancement). This commit fixes all these issues, and also simplifies the code that coordinates all the after write actions.
1 parent 6e751f5 commit fbb92f5

File tree

13 files changed

+240
-238
lines changed

13 files changed

+240
-238
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

+43-13
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.Locale;
4747
import java.util.concurrent.atomic.AtomicBoolean;
4848
import java.util.concurrent.atomic.AtomicInteger;
49+
import java.util.function.LongSupplier;
4950

5051
public class ReplicationOperation<
5152
Request extends ReplicationRequest<Request>,
@@ -110,8 +111,6 @@ public void execute() throws Exception {
110111

111112
private void handlePrimaryResult(final PrimaryResultT primaryResult) {
112113
this.primaryResult = primaryResult;
113-
primary.updateLocalCheckpointForShard(primary.routingEntry().allocationId().getId(), primary.localCheckpoint());
114-
primary.updateGlobalCheckpointForShard(primary.routingEntry().allocationId().getId(), primary.globalCheckpoint());
115114
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
116115
if (replicaRequest != null) {
117116
if (logger.isTraceEnabled()) {
@@ -134,8 +133,26 @@ private void handlePrimaryResult(final PrimaryResultT primaryResult) {
134133
markUnavailableShardsAsStale(replicaRequest, replicationGroup);
135134
performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
136135
}
137-
successfulShards.incrementAndGet(); // mark primary as successful
138-
decPendingAndFinishIfNeeded();
136+
primaryResult.runPostReplicationActions(new ActionListener<Void>() {
137+
138+
@Override
139+
public void onResponse(Void aVoid) {
140+
successfulShards.incrementAndGet();
141+
try {
142+
updateCheckPoints(primary.routingEntry(), primary::localCheckpoint, primary::globalCheckpoint);
143+
} finally {
144+
decPendingAndFinishIfNeeded();
145+
}
146+
}
147+
148+
@Override
149+
public void onFailure(Exception e) {
150+
logger.trace("[{}] op [{}] post replication actions failed for [{}]", primary.routingEntry().shardId(), opType, request);
151+
// TODO: fail shard? This will otherwise have the local / global checkpoint info lagging, or possibly have replicas
152+
// go out of sync with the primary
153+
finishAsFailed(e);
154+
}
155+
});
139156
}
140157

141158
private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, ReplicationGroup replicationGroup) {
@@ -176,16 +193,10 @@ private void performOnReplica(final ShardRouting shard, final ReplicaRequest rep
176193
public void onResponse(ReplicaResponse response) {
177194
successfulShards.incrementAndGet();
178195
try {
179-
primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());
180-
primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), response.globalCheckpoint());
181-
} catch (final AlreadyClosedException e) {
182-
// the index was deleted or this shard was never activated after a relocation; fall through and finish normally
183-
} catch (final Exception e) {
184-
// fail the primary but fall through and let the rest of operation processing complete
185-
final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);
186-
primary.failShard(message, e);
196+
updateCheckPoints(shard, response::localCheckpoint, response::globalCheckpoint);
197+
} finally {
198+
decPendingAndFinishIfNeeded();
187199
}
188-
decPendingAndFinishIfNeeded();
189200
}
190201

191202
@Override
@@ -211,6 +222,19 @@ public String toString() {
211222
});
212223
}
213224

225+
private void updateCheckPoints(ShardRouting shard, LongSupplier localCheckpointSupplier, LongSupplier globalCheckpointSupplier) {
226+
try {
227+
primary.updateLocalCheckpointForShard(shard.allocationId().getId(), localCheckpointSupplier.getAsLong());
228+
primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), globalCheckpointSupplier.getAsLong());
229+
} catch (final AlreadyClosedException e) {
230+
// the index was deleted or this shard was never activated after a relocation; fall through and finish normally
231+
} catch (final Exception e) {
232+
// fail the primary but fall through and let the rest of operation processing complete
233+
final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);
234+
primary.failShard(message, e);
235+
}
236+
}
237+
214238
private void onNoLongerPrimary(Exception failure) {
215239
final Throwable cause = ExceptionsHelper.unwrapCause(failure);
216240
final boolean nodeIsClosing = cause instanceof NodeClosedException;
@@ -464,5 +488,11 @@ public interface PrimaryResult<RequestT extends ReplicationRequest<RequestT>> {
464488
@Nullable RequestT replicaRequest();
465489

466490
void setShardInfo(ReplicationResponse.ShardInfo shardInfo);
491+
492+
/**
493+
* Run actions to be triggered post replication
494+
* @param listener calllback that is invoked after post replication actions have completed
495+
* */
496+
void runPostReplicationActions(ActionListener<Void> listener);
467497
}
468498
}

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

+47-54
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,6 @@
7474
import org.elasticsearch.transport.TransportException;
7575
import org.elasticsearch.transport.TransportRequest;
7676
import org.elasticsearch.transport.TransportRequestOptions;
77-
import org.elasticsearch.transport.TransportResponse;
78-
import org.elasticsearch.transport.TransportResponse.Empty;
7977
import org.elasticsearch.transport.TransportResponseHandler;
8078
import org.elasticsearch.transport.TransportService;
8179

@@ -347,33 +345,32 @@ public void handleException(TransportException exp) {
347345
} else {
348346
setPhase(replicationTask, "primary");
349347

350-
final ActionListener<Response> referenceClosingListener = ActionListener.wrap(response -> {
351-
primaryShardReference.close(); // release shard operation lock before responding to caller
352-
setPhase(replicationTask, "finished");
353-
onCompletionListener.onResponse(response);
354-
}, e -> handleException(primaryShardReference, e));
348+
final ActionListener<Response> responseListener = ActionListener.wrap(response -> {
349+
adaptResponse(response, primaryShardReference.indexShard);
355350

356-
final ActionListener<Response> globalCheckpointSyncingListener = ActionListener.wrap(response -> {
357351
if (syncGlobalCheckpointAfterOperation) {
358-
final IndexShard shard = primaryShardReference.indexShard;
359352
try {
360-
shard.maybeSyncGlobalCheckpoint("post-operation");
353+
primaryShardReference.indexShard.maybeSyncGlobalCheckpoint("post-operation");
361354
} catch (final Exception e) {
362355
// only log non-closed exceptions
363356
if (ExceptionsHelper.unwrap(
364357
e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
365358
// intentionally swallow, a missed global checkpoint sync should not fail this operation
366359
logger.info(
367360
new ParameterizedMessage(
368-
"{} failed to execute post-operation global checkpoint sync", shard.shardId()), e);
361+
"{} failed to execute post-operation global checkpoint sync",
362+
primaryShardReference.indexShard.shardId()), e);
369363
}
370364
}
371365
}
372-
referenceClosingListener.onResponse(response);
373-
}, referenceClosingListener::onFailure);
366+
367+
primaryShardReference.close(); // release shard operation lock before responding to caller
368+
setPhase(replicationTask, "finished");
369+
onCompletionListener.onResponse(response);
370+
}, e -> handleException(primaryShardReference, e));
374371

375372
new ReplicationOperation<>(primaryRequest.getRequest(), primaryShardReference,
376-
ActionListener.wrap(result -> result.respond(globalCheckpointSyncingListener), referenceClosingListener::onFailure),
373+
ActionListener.map(responseListener, result -> result.finalResponseIfSuccessful),
377374
newReplicasProxy(), logger, actionName, primaryRequest.getPrimaryTerm()).execute();
378375
}
379376
} catch (Exception e) {
@@ -394,10 +391,19 @@ public void onFailure(Exception e) {
394391

395392
}
396393

394+
// allows subclasses to adapt the response
395+
protected void adaptResponse(Response response, IndexShard indexShard) {
396+
397+
}
398+
399+
protected ActionListener<Response> wrapResponseActionListener(ActionListener<Response> listener, IndexShard shard) {
400+
return listener;
401+
}
402+
397403
public static class PrimaryResult<ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
398404
Response extends ReplicationResponse>
399405
implements ReplicationOperation.PrimaryResult<ReplicaRequest> {
400-
final ReplicaRequest replicaRequest;
406+
protected final ReplicaRequest replicaRequest;
401407
public final Response finalResponseIfSuccessful;
402408
public final Exception finalFailure;
403409

@@ -430,11 +436,12 @@ public void setShardInfo(ReplicationResponse.ShardInfo shardInfo) {
430436
}
431437
}
432438

433-
public void respond(ActionListener<Response> listener) {
434-
if (finalResponseIfSuccessful != null) {
435-
listener.onResponse(finalResponseIfSuccessful);
436-
} else {
439+
@Override
440+
public void runPostReplicationActions(ActionListener<Void> listener) {
441+
if (finalFailure != null) {
437442
listener.onFailure(finalFailure);
443+
} else {
444+
listener.onResponse(null);
438445
}
439446
}
440447
}
@@ -450,11 +457,11 @@ public ReplicaResult() {
450457
this(null);
451458
}
452459

453-
public void respond(ActionListener<TransportResponse.Empty> listener) {
454-
if (finalFailure == null) {
455-
listener.onResponse(TransportResponse.Empty.INSTANCE);
456-
} else {
460+
public void runPostReplicaActions(ActionListener<Void> listener) {
461+
if (finalFailure != null) {
457462
listener.onFailure(finalFailure);
463+
} else {
464+
listener.onResponse(null);
458465
}
459466
}
460467
}
@@ -504,10 +511,23 @@ public void onResponse(Releasable releasable) {
504511
try {
505512
assert replica.getActiveOperationsCount() != 0 : "must perform shard operation under a permit";
506513
final ReplicaResult replicaResult = shardOperationOnReplica(replicaRequest.getRequest(), replica);
507-
releasable.close(); // release shard operation lock before responding to caller
508-
final TransportReplicationAction.ReplicaResponse response =
509-
new ReplicaResponse(replica.getLocalCheckpoint(), replica.getLastSyncedGlobalCheckpoint());
510-
replicaResult.respond(new ResponseListener(response));
514+
replicaResult.runPostReplicaActions(
515+
ActionListener.wrap(r -> {
516+
final TransportReplicationAction.ReplicaResponse response =
517+
new ReplicaResponse(replica.getLocalCheckpoint(), replica.getLastSyncedGlobalCheckpoint());
518+
releasable.close(); // release shard operation lock before responding to caller
519+
if (logger.isTraceEnabled()) {
520+
logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction,
521+
replicaRequest.getRequest().shardId(),
522+
replicaRequest.getRequest());
523+
}
524+
setPhase(task, "finished");
525+
onCompletionListener.onResponse(response);
526+
}, e -> {
527+
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
528+
this.responseWithFailure(e);
529+
})
530+
);
511531
} catch (final Exception e) {
512532
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
513533
AsyncReplicaAction.this.onFailure(e);
@@ -565,33 +585,6 @@ protected void doRun() throws Exception {
565585
acquireReplicaOperationPermit(replica, replicaRequest.getRequest(), this, replicaRequest.getPrimaryTerm(),
566586
replicaRequest.getGlobalCheckpoint(), replicaRequest.getMaxSeqNoOfUpdatesOrDeletes());
567587
}
568-
569-
/**
570-
* Listens for the response on the replica and sends the response back to the primary.
571-
*/
572-
private class ResponseListener implements ActionListener<TransportResponse.Empty> {
573-
private final ReplicaResponse replicaResponse;
574-
575-
ResponseListener(ReplicaResponse replicaResponse) {
576-
this.replicaResponse = replicaResponse;
577-
}
578-
579-
@Override
580-
public void onResponse(Empty response) {
581-
if (logger.isTraceEnabled()) {
582-
logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction,
583-
replicaRequest.getRequest().shardId(),
584-
replicaRequest.getRequest());
585-
}
586-
setPhase(task, "finished");
587-
onCompletionListener.onResponse(replicaResponse);
588-
}
589-
590-
@Override
591-
public void onFailure(Exception e) {
592-
responseWithFailure(e);
593-
}
594-
}
595588
}
596589

597590
private IndexShard getIndexShard(final ShardId shardId) {

0 commit comments

Comments
 (0)