Skip to content

Replicate write actions before fsyncing them #49746

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Dec 3, 2019
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Locale;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongSupplier;

public class ReplicationOperation<
Request extends ReplicationRequest<Request>,
Expand Down Expand Up @@ -110,8 +111,6 @@ public void execute() throws Exception {

private void handlePrimaryResult(final PrimaryResultT primaryResult) {
this.primaryResult = primaryResult;
primary.updateLocalCheckpointForShard(primary.routingEntry().allocationId().getId(), primary.localCheckpoint());
primary.updateGlobalCheckpointForShard(primary.routingEntry().allocationId().getId(), primary.globalCheckpoint());
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
if (replicaRequest != null) {
if (logger.isTraceEnabled()) {
Expand All @@ -134,8 +133,21 @@ private void handlePrimaryResult(final PrimaryResultT primaryResult) {
markUnavailableShardsAsStale(replicaRequest, replicationGroup);
performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
}
successfulShards.incrementAndGet(); // mark primary as successful
decPendingAndFinishIfNeeded();
primaryResult.runPostReplicationActions(new ActionListener<>() {

@Override
public void onResponse(Void aVoid) {
updateCheckPoints(primary.routingEntry(), primary::localCheckpoint, primary::globalCheckpoint);
}

@Override
public void onFailure(Exception e) {
logger.trace("[{}] op [{}] post replication actions failed for [{}]", primary.routingEntry().shardId(), opType, request);
// TODO: fail shard? This will otherwise have the local / global checkpoint info lagging, or possibly have replicas
// go out of sync with the primary
finishAsFailed(e);
}
});
}

private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, ReplicationGroup replicationGroup) {
Expand Down Expand Up @@ -174,18 +186,7 @@ private void performOnReplica(final ShardRouting shard, final ReplicaRequest rep
new ActionListener<>() {
@Override
public void onResponse(ReplicaResponse response) {
successfulShards.incrementAndGet();
try {
primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());
primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), response.globalCheckpoint());
} catch (final AlreadyClosedException e) {
// the index was deleted or this shard was never activated after a relocation; fall through and finish normally
} catch (final Exception e) {
// fail the primary but fall through and let the rest of operation processing complete
final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);
primary.failShard(message, e);
}
decPendingAndFinishIfNeeded();
updateCheckPoints(shard, response::localCheckpoint, response::globalCheckpoint);
}

@Override
Expand All @@ -211,6 +212,22 @@ public String toString() {
});
}

private void updateCheckPoints(ShardRouting shard, LongSupplier localCheckpointSupplier, LongSupplier globalCheckpointSupplier) {
successfulShards.incrementAndGet();
try {
primary.updateLocalCheckpointForShard(shard.allocationId().getId(), localCheckpointSupplier.getAsLong());
primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), globalCheckpointSupplier.getAsLong());
} catch (final AlreadyClosedException e) {
// the index was deleted or this shard was never activated after a relocation; fall through and finish normally
} catch (final Exception e) {
// fail the primary but fall through and let the rest of operation processing complete
final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);
primary.failShard(message, e);
} finally {
decPendingAndFinishIfNeeded();
}
}

private void onNoLongerPrimary(Exception failure) {
final Throwable cause = ExceptionsHelper.unwrapCause(failure);
final boolean nodeIsClosing = cause instanceof NodeClosedException;
Expand Down Expand Up @@ -464,5 +481,11 @@ public interface PrimaryResult<RequestT extends ReplicationRequest<RequestT>> {
@Nullable RequestT replicaRequest();

void setShardInfo(ReplicationResponse.ShardInfo shardInfo);

/**
* Run actions to be triggered post replication
* @param listener calllback that is invoked after post replication actions have completed
* */
void runPostReplicationActions(ActionListener<Void> listener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,6 @@
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportResponse.Empty;
import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -352,27 +350,11 @@ public void handleException(TransportException exp) {
onCompletionListener.onResponse(response);
}, e -> handleException(primaryShardReference, e));

final ActionListener<Response> globalCheckpointSyncingListener = ActionListener.wrap(response -> {
if (syncGlobalCheckpointAfterOperation) {
final IndexShard shard = primaryShardReference.indexShard;
try {
shard.maybeSyncGlobalCheckpoint("post-operation");
} catch (final Exception e) {
// only log non-closed exceptions
if (ExceptionsHelper.unwrap(
e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
// intentionally swallow, a missed global checkpoint sync should not fail this operation
logger.info(
new ParameterizedMessage(
"{} failed to execute post-operation global checkpoint sync", shard.shardId()), e);
}
}
}
referenceClosingListener.onResponse(response);
}, referenceClosingListener::onFailure);
final ActionListener<Response> responseListener = getResponseActionListener(referenceClosingListener,
primaryShardReference.indexShard);

new ReplicationOperation<>(primaryRequest.getRequest(), primaryShardReference,
ActionListener.wrap(result -> result.respond(globalCheckpointSyncingListener), referenceClosingListener::onFailure),
ActionListener.map(responseListener, result -> result.finalResponseIfSuccessful),
newReplicasProxy(), logger, actionName, primaryRequest.getPrimaryTerm()).execute();
}
} catch (Exception e) {
Expand All @@ -393,10 +375,31 @@ public void onFailure(Exception e) {

}

protected ActionListener<Response> getResponseActionListener(ActionListener<Response> referenceClosingListener, IndexShard shard) {

return ActionListener.wrap(response -> {
if (syncGlobalCheckpointAfterOperation) {
try {
shard.maybeSyncGlobalCheckpoint("post-operation");
} catch (final Exception e) {
// only log non-closed exceptions
if (ExceptionsHelper.unwrap(
e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
// intentionally swallow, a missed global checkpoint sync should not fail this operation
logger.info(
new ParameterizedMessage(
"{} failed to execute post-operation global checkpoint sync", shard.shardId()), e);
}
}
}
referenceClosingListener.onResponse(response);
}, referenceClosingListener::onFailure);
}

public static class PrimaryResult<ReplicaRequest extends ReplicationRequest<ReplicaRequest>,
Response extends ReplicationResponse>
implements ReplicationOperation.PrimaryResult<ReplicaRequest> {
final ReplicaRequest replicaRequest;
protected final ReplicaRequest replicaRequest;
public final Response finalResponseIfSuccessful;
public final Exception finalFailure;

Expand Down Expand Up @@ -429,11 +432,12 @@ public void setShardInfo(ReplicationResponse.ShardInfo shardInfo) {
}
}

public void respond(ActionListener<Response> listener) {
if (finalResponseIfSuccessful != null) {
listener.onResponse(finalResponseIfSuccessful);
} else {
@Override
public void runPostReplicationActions(ActionListener<Void> listener) {
if (finalFailure != null) {
listener.onFailure(finalFailure);
} else {
listener.onResponse(null);
}
}
}
Expand All @@ -449,11 +453,11 @@ public ReplicaResult() {
this(null);
}

public void respond(ActionListener<TransportResponse.Empty> listener) {
if (finalFailure == null) {
listener.onResponse(TransportResponse.Empty.INSTANCE);
} else {
public void runPostReplicaActions(ActionListener<Void> listener) {
if (finalFailure != null) {
listener.onFailure(finalFailure);
} else {
listener.onResponse(null);
}
}
}
Expand Down Expand Up @@ -503,10 +507,23 @@ public void onResponse(Releasable releasable) {
try {
assert replica.getActiveOperationsCount() != 0 : "must perform shard operation under a permit";
final ReplicaResult replicaResult = shardOperationOnReplica(replicaRequest.getRequest(), replica);
releasable.close(); // release shard operation lock before responding to caller
final TransportReplicationAction.ReplicaResponse response =
new ReplicaResponse(replica.getLocalCheckpoint(), replica.getLastSyncedGlobalCheckpoint());
replicaResult.respond(new ResponseListener(response));
replicaResult.runPostReplicaActions(
ActionListener.wrap(r -> {
final TransportReplicationAction.ReplicaResponse response =
new ReplicaResponse(replica.getLocalCheckpoint(), replica.getLastSyncedGlobalCheckpoint());
releasable.close(); // release shard operation lock before responding to caller
if (logger.isTraceEnabled()) {
logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction,
replicaRequest.getRequest().shardId(),
replicaRequest.getRequest());
}
setPhase(task, "finished");
onCompletionListener.onResponse(response);
}, e -> {
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
this.responseWithFailure(e);
})
);
} catch (final Exception e) {
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
AsyncReplicaAction.this.onFailure(e);
Expand Down Expand Up @@ -564,33 +581,6 @@ protected void doRun() throws Exception {
acquireReplicaOperationPermit(replica, replicaRequest.getRequest(), this, replicaRequest.getPrimaryTerm(),
replicaRequest.getGlobalCheckpoint(), replicaRequest.getMaxSeqNoOfUpdatesOrDeletes());
}

/**
* Listens for the response on the replica and sends the response back to the primary.
*/
private class ResponseListener implements ActionListener<TransportResponse.Empty> {
private final ReplicaResponse replicaResponse;

ResponseListener(ReplicaResponse replicaResponse) {
this.replicaResponse = replicaResponse;
}

@Override
public void onResponse(Empty response) {
if (logger.isTraceEnabled()) {
logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction,
replicaRequest.getRequest().shardId(),
replicaRequest.getRequest());
}
setPhase(task, "finished");
onCompletionListener.onResponse(replicaResponse);
}

@Override
public void onFailure(Exception e) {
responseWithFailure(e);
}
}
}

private IndexShard getIndexShard(final ShardId shardId) {
Expand Down
Loading