Skip to content

Commit 80d6e78

Browse files
ywelschSivagurunathanV
authored andcommitted
Replicate write actions before fsyncing them (elastic#49746)
This commit fixes a number of issues with data replication: - Local and global checkpoints are not updated after the new operations have been fsynced, but might capture a state before the fsync. The reason why this probably went undetected for so long is that AsyncIOProcessor is synchronous if you index one item at a time, and hence working as intended unless you have a high enough level of concurrent indexing. As we rely in other places on the assumption that we have an up-to-date local checkpoint in case of synchronous translog durability, there's a risk for the local and global checkpoints not to be up-to-date after replication completes, and that this won't be corrected by the periodic global checkpoint sync. - AsyncIOProcessor also has another "bad" side effect here: if you index one bulk at a time, the bulk is always first fsynced on the primary before being sent to the replica. Further, if one thread is tasked by AsyncIOProcessor to drain the processing queue and fsync, other threads can easily pile more bulk requests on top of that thread. Things are not very fair here, and the thread might continue doing a lot more fsyncs before returning (as the other threads pile more and more on top), which blocks it from returning as a replication request (e.g. if this thread is on the primary, it blocks the replication requests to the replicas from going out, and delaying checkpoint advancement). This commit fixes all these issues, and also simplifies the code that coordinates all the after write actions.
1 parent 7733366 commit 80d6e78

File tree

13 files changed

+241
-238
lines changed

13 files changed

+241
-238
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.Locale;
4747
import java.util.concurrent.atomic.AtomicBoolean;
4848
import java.util.concurrent.atomic.AtomicInteger;
49+
import java.util.function.LongSupplier;
4950

5051
public class ReplicationOperation<
5152
Request extends ReplicationRequest<Request>,
@@ -110,8 +111,6 @@ public void execute() throws Exception {
110111

111112
private void handlePrimaryResult(final PrimaryResultT primaryResult) {
112113
this.primaryResult = primaryResult;
113-
primary.updateLocalCheckpointForShard(primary.routingEntry().allocationId().getId(), primary.localCheckpoint());
114-
primary.updateGlobalCheckpointForShard(primary.routingEntry().allocationId().getId(), primary.globalCheckpoint());
115114
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
116115
if (replicaRequest != null) {
117116
if (logger.isTraceEnabled()) {
@@ -134,8 +133,26 @@ private void handlePrimaryResult(final PrimaryResultT primaryResult) {
134133
markUnavailableShardsAsStale(replicaRequest, replicationGroup);
135134
performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
136135
}
137-
successfulShards.incrementAndGet(); // mark primary as successful
138-
decPendingAndFinishIfNeeded();
136+
primaryResult.runPostReplicationActions(new ActionListener<>() {
137+
138+
@Override
139+
public void onResponse(Void aVoid) {
140+
successfulShards.incrementAndGet();
141+
try {
142+
updateCheckPoints(primary.routingEntry(), primary::localCheckpoint, primary::globalCheckpoint);
143+
} finally {
144+
decPendingAndFinishIfNeeded();
145+
}
146+
}
147+
148+
@Override
149+
public void onFailure(Exception e) {
150+
logger.trace("[{}] op [{}] post replication actions failed for [{}]", primary.routingEntry().shardId(), opType, request);
151+
// TODO: fail shard? This will otherwise have the local / global checkpoint info lagging, or possibly have replicas
152+
// go out of sync with the primary
153+
finishAsFailed(e);
154+
}
155+
});
139156
}
140157

141158
private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, ReplicationGroup replicationGroup) {
@@ -176,16 +193,10 @@ private void performOnReplica(final ShardRouting shard, final ReplicaRequest rep
176193
public void onResponse(ReplicaResponse response) {
177194
successfulShards.incrementAndGet();
178195
try {
179-
primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());
180-
primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), response.globalCheckpoint());
181-
} catch (final AlreadyClosedException e) {
182-
// the index was deleted or this shard was never activated after a relocation; fall through and finish normally
183-
} catch (final Exception e) {
184-
// fail the primary but fall through and let the rest of operation processing complete
185-
final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);
186-
primary.failShard(message, e);
196+
updateCheckPoints(shard, response::localCheckpoint, response::globalCheckpoint);
197+
} finally {
198+
decPendingAndFinishIfNeeded();
187199
}
188-
decPendingAndFinishIfNeeded();
189200
}
190201

191202
@Override
@@ -211,6 +222,19 @@ public String toString() {
211222
});
212223
}
213224

225+
private void updateCheckPoints(ShardRouting shard, LongSupplier localCheckpointSupplier, LongSupplier globalCheckpointSupplier) {
226+
try {
227+
primary.updateLocalCheckpointForShard(shard.allocationId().getId(), localCheckpointSupplier.getAsLong());
228+
primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), globalCheckpointSupplier.getAsLong());
229+
} catch (final AlreadyClosedException e) {
230+
// the index was deleted or this shard was never activated after a relocation; fall through and finish normally
231+
} catch (final Exception e) {
232+
// fail the primary but fall through and let the rest of operation processing complete
233+
final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);
234+
primary.failShard(message, e);
235+
}
236+
}
237+
214238
private void onNoLongerPrimary(Exception failure) {
215239
final Throwable cause = ExceptionsHelper.unwrapCause(failure);
216240
final boolean nodeIsClosing = cause instanceof NodeClosedException;
@@ -464,5 +488,11 @@ public interface PrimaryResult<RequestT extends ReplicationRequest<RequestT>> {
464488
@Nullable RequestT replicaRequest();
465489

466490
void setShardInfo(ReplicationResponse.ShardInfo shardInfo);
491+
492+
/**
493+
* Run actions to be triggered post replication
494+
* @param listener calllback that is invoked after post replication actions have completed
495+
* */
496+
void runPostReplicationActions(ActionListener<Void> listener);
467497
}
468498
}

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 47 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,6 @@
7373
import org.elasticsearch.transport.TransportException;
7474
import org.elasticsearch.transport.TransportRequest;
7575
import org.elasticsearch.transport.TransportRequestOptions;
76-
import org.elasticsearch.transport.TransportResponse;
77-
import org.elasticsearch.transport.TransportResponse.Empty;
7876
import org.elasticsearch.transport.TransportResponseHandler;
7977
import org.elasticsearch.transport.TransportService;
8078

@@ -346,33 +344,32 @@ public void handleException(TransportException exp) {
346344
} else {
347345
setPhase(replicationTask, "primary");
348346

349-
final ActionListener<Response> referenceClosingListener = ActionListener.wrap(response -> {
350-
primaryShardReference.close(); // release shard operation lock before responding to caller
351-
setPhase(replicationTask, "finished");
352-
onCompletionListener.onResponse(response);
353-
}, e -> handleException(primaryShardReference, e));
347+
final ActionListener<Response> responseListener = ActionListener.wrap(response -> {
348+
adaptResponse(response, primaryShardReference.indexShard);
354349

355-
final ActionListener<Response> globalCheckpointSyncingListener = ActionListener.wrap(response -> {
356350
if (syncGlobalCheckpointAfterOperation) {
357-
final IndexShard shard = primaryShardReference.indexShard;
358351
try {
359-
shard.maybeSyncGlobalCheckpoint("post-operation");
352+
primaryShardReference.indexShard.maybeSyncGlobalCheckpoint("post-operation");
360353
} catch (final Exception e) {
361354
// only log non-closed exceptions
362355
if (ExceptionsHelper.unwrap(
363356
e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
364357
// intentionally swallow, a missed global checkpoint sync should not fail this operation
365358
logger.info(
366359
new ParameterizedMessage(
367-
"{} failed to execute post-operation global checkpoint sync", shard.shardId()), e);
360+
"{} failed to execute post-operation global checkpoint sync",
361+
primaryShardReference.indexShard.shardId()), e);
368362
}
369363
}
370364
}
371-
referenceClosingListener.onResponse(response);
372-
}, referenceClosingListener::onFailure);
365+
366+
primaryShardReference.close(); // release shard operation lock before responding to caller
367+
setPhase(replicationTask, "finished");
368+
onCompletionListener.onResponse(response);
369+
}, e -> handleException(primaryShardReference, e));
373370

374371
new ReplicationOperation<>(primaryRequest.getRequest(), primaryShardReference,
375-
ActionListener.wrap(result -> result.respond(globalCheckpointSyncingListener), referenceClosingListener::onFailure),
372+
ActionListener.map(responseListener, result -> result.finalResponseIfSuccessful),
376373
newReplicasProxy(), logger, actionName, primaryRequest.getPrimaryTerm()).execute();
377374
}
378375
} catch (Exception e) {
@@ -393,10 +390,19 @@ public void onFailure(Exception e) {
393390

394391
}
395392

393+
// allows subclasses to adapt the response
394+
protected void adaptResponse(Response response, IndexShard indexShard) {
395+
396+
}
397+
398+
protected ActionListener<Response> wrapResponseActionListener(ActionListener<Response> listener, IndexShard shard) {
399+
return listener;
400+
}
401+
396402
public static class PrimaryResult<ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
397403
Response extends ReplicationResponse>
398404
implements ReplicationOperation.PrimaryResult<ReplicaRequest> {
399-
final ReplicaRequest replicaRequest;
405+
protected final ReplicaRequest replicaRequest;
400406
public final Response finalResponseIfSuccessful;
401407
public final Exception finalFailure;
402408

@@ -429,11 +435,12 @@ public void setShardInfo(ReplicationResponse.ShardInfo shardInfo) {
429435
}
430436
}
431437

432-
public void respond(ActionListener<Response> listener) {
433-
if (finalResponseIfSuccessful != null) {
434-
listener.onResponse(finalResponseIfSuccessful);
435-
} else {
438+
@Override
439+
public void runPostReplicationActions(ActionListener<Void> listener) {
440+
if (finalFailure != null) {
436441
listener.onFailure(finalFailure);
442+
} else {
443+
listener.onResponse(null);
437444
}
438445
}
439446
}
@@ -449,11 +456,11 @@ public ReplicaResult() {
449456
this(null);
450457
}
451458

452-
public void respond(ActionListener<TransportResponse.Empty> listener) {
453-
if (finalFailure == null) {
454-
listener.onResponse(TransportResponse.Empty.INSTANCE);
455-
} else {
459+
public void runPostReplicaActions(ActionListener<Void> listener) {
460+
if (finalFailure != null) {
456461
listener.onFailure(finalFailure);
462+
} else {
463+
listener.onResponse(null);
457464
}
458465
}
459466
}
@@ -503,10 +510,23 @@ public void onResponse(Releasable releasable) {
503510
try {
504511
assert replica.getActiveOperationsCount() != 0 : "must perform shard operation under a permit";
505512
final ReplicaResult replicaResult = shardOperationOnReplica(replicaRequest.getRequest(), replica);
506-
releasable.close(); // release shard operation lock before responding to caller
507-
final TransportReplicationAction.ReplicaResponse response =
508-
new ReplicaResponse(replica.getLocalCheckpoint(), replica.getLastSyncedGlobalCheckpoint());
509-
replicaResult.respond(new ResponseListener(response));
513+
replicaResult.runPostReplicaActions(
514+
ActionListener.wrap(r -> {
515+
final TransportReplicationAction.ReplicaResponse response =
516+
new ReplicaResponse(replica.getLocalCheckpoint(), replica.getLastSyncedGlobalCheckpoint());
517+
releasable.close(); // release shard operation lock before responding to caller
518+
if (logger.isTraceEnabled()) {
519+
logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction,
520+
replicaRequest.getRequest().shardId(),
521+
replicaRequest.getRequest());
522+
}
523+
setPhase(task, "finished");
524+
onCompletionListener.onResponse(response);
525+
}, e -> {
526+
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
527+
this.responseWithFailure(e);
528+
})
529+
);
510530
} catch (final Exception e) {
511531
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
512532
AsyncReplicaAction.this.onFailure(e);
@@ -564,33 +584,6 @@ protected void doRun() throws Exception {
564584
acquireReplicaOperationPermit(replica, replicaRequest.getRequest(), this, replicaRequest.getPrimaryTerm(),
565585
replicaRequest.getGlobalCheckpoint(), replicaRequest.getMaxSeqNoOfUpdatesOrDeletes());
566586
}
567-
568-
/**
569-
* Listens for the response on the replica and sends the response back to the primary.
570-
*/
571-
private class ResponseListener implements ActionListener<TransportResponse.Empty> {
572-
private final ReplicaResponse replicaResponse;
573-
574-
ResponseListener(ReplicaResponse replicaResponse) {
575-
this.replicaResponse = replicaResponse;
576-
}
577-
578-
@Override
579-
public void onResponse(Empty response) {
580-
if (logger.isTraceEnabled()) {
581-
logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction,
582-
replicaRequest.getRequest().shardId(),
583-
replicaRequest.getRequest());
584-
}
585-
setPhase(task, "finished");
586-
onCompletionListener.onResponse(replicaResponse);
587-
}
588-
589-
@Override
590-
public void onFailure(Exception e) {
591-
responseWithFailure(e);
592-
}
593-
}
594587
}
595588

596589
private IndexShard getIndexShard(final ShardId shardId) {

0 commit comments

Comments
 (0)