Skip to content

Advance checkpoints only after persisting ops #43205

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 48 commits into from
Jun 20, 2019
Merged
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
3b52416
Only advance local checkpoint when fsynced ops
ywelsch Jun 11, 2019
24666d4
Use persisted global checkpoint
ywelsch Jun 11, 2019
b500abf
fix tests
ywelsch Jun 11, 2019
8595991
fix more tests
ywelsch Jun 11, 2019
0b42111
fix more tests
ywelsch Jun 11, 2019
1d859a7
distinguish between persisted and non-persisted checkpoint
ywelsch Jun 12, 2019
502c5c9
rename and fixes
ywelsch Jun 12, 2019
ef396b8
Disable async durability for index close tests
ywelsch Jun 12, 2019
0c48432
disable async fsync
ywelsch Jun 12, 2019
f81a5c4
Distinguish between persisted and computed global checkpoint in Repli…
ywelsch Jun 12, 2019
e061760
more minor fixes
ywelsch Jun 12, 2019
898df9d
fix more tests
ywelsch Jun 12, 2019
85e8bfd
2 phase closing
ywelsch Jun 12, 2019
5958bb8
reenable tests
ywelsch Jun 12, 2019
78b9120
checkstyle
ywelsch Jun 12, 2019
6a4e568
add BWC for verifiy before close
ywelsch Jun 12, 2019
4848693
checkstyle
ywelsch Jun 12, 2019
07e8dec
fix test
ywelsch Jun 12, 2019
eb4181e
use async durability for extensive testing
ywelsch Jun 13, 2019
56fe3d8
Use request level durability for corruption tests
ywelsch Jun 13, 2019
1050e5d
when one flush is not enough
ywelsch Jun 13, 2019
6532b15
simplify ReplicationTracker
ywelsch Jun 13, 2019
3b669b4
checkstyle
ywelsch Jun 13, 2019
7757231
rename and test fix
ywelsch Jun 13, 2019
7dff7f0
checkstyle and more
ywelsch Jun 13, 2019
abd063d
randomize durability again
ywelsch Jun 13, 2019
9edaf79
Add tests for TranslogWriter
ywelsch Jun 13, 2019
2e60635
Add test that shows gcp is safe
ywelsch Jun 13, 2019
c556bfe
Merge remote-tracking branch 'elastic/master' into fsync-local-checkp…
ywelsch Jun 13, 2019
54e492a
Nhat's feedback
ywelsch Jun 14, 2019
7d274bc
fix test
ywelsch Jun 15, 2019
798a05e
Use translog.sync to advance persisted checkpoint
ywelsch Jun 15, 2019
6c93e2d
Tanguy's feedback
ywelsch Jun 15, 2019
34424cd
Merge remote-tracking branch 'elastic/master' into fsync-local-checkp…
ywelsch Jun 17, 2019
94cd109
fix testTranslogReplayWithFailure
ywelsch Jun 17, 2019
df82d5c
can't bump on a conditional sync
ywelsch Jun 17, 2019
6556795
Revert "can't bump on a conditional sync"
ywelsch Jun 17, 2019
66bbe7d
Revert "Use translog.sync to advance persisted checkpoint"
ywelsch Jun 17, 2019
6d96874
Revert "Nhat's feedback"
ywelsch Jun 17, 2019
fb233d3
separate processed from persisted
ywelsch Jun 17, 2019
6bcca56
Merge remote-tracking branch 'elastic/master' into fsync-local-checkp…
ywelsch Jun 17, 2019
5818b26
Nhat's feedback
ywelsch Jun 18, 2019
3bb4963
Merge remote-tracking branch 'elastic/master' into fsync-local-checkp…
ywelsch Jun 18, 2019
07303bb
Henning's comments
ywelsch Jun 18, 2019
f3b0166
Tanguy's comments
ywelsch Jun 18, 2019
9d2c452
Increase initial capacity
ywelsch Jun 18, 2019
810c986
checkstyle
ywelsch Jun 19, 2019
dc01e04
Merge remote-tracking branch 'elastic/master' into fsync-local-checkp…
ywelsch Jun 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.support.ActionFilters;
Expand Down Expand Up @@ -94,12 +95,12 @@ protected void shardOperationOnPrimary(final ShardRequest shardRequest, final In
}

@Override
protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) {
protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) throws IOException {
executeShardOperation(shardRequest, replica);
return new ReplicaResult();
}

private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) {
private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) throws IOException {
final ShardId shardId = indexShard.shardId();
if (indexShard.getActiveOperationsCount() != IndexShard.OPERATIONS_BLOCKED) {
throw new IllegalStateException("Index shard " + shardId + " is not blocking all operations during closing");
Expand All @@ -109,9 +110,19 @@ private void executeShardOperation(final ShardRequest request, final IndexShard
if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), request.clusterBlock()) == false) {
throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + request.clusterBlock() + " before closing");
}
indexShard.verifyShardBeforeIndexClosing();
indexShard.flush(new FlushRequest().force(true).waitIfOngoing(true));
logger.trace("{} shard is ready for closing", shardId);
if (request.isPhase1()) {
// in order to advance the global checkpoint to the maximum sequence number, the (persisted) local checkpoint needs to be
// advanced first, which, when using async translog syncing, does not automatically hold at the time where we have acquired
// all operation permits. Instead, this requires and explicit sync, which communicates the updated (persisted) local checkpoint
// to the primary (we call this phase1), and phase2 can then use the fact that the global checkpoint has moved to the maximum
// sequence number to pass the verifyShardBeforeIndexClosing check and create a safe commit where the maximum sequence number
// is equal to the global checkpoint.
indexShard.sync();
} else {
indexShard.verifyShardBeforeIndexClosing();
indexShard.flush(new FlushRequest().force(true).waitIfOngoing(true));
logger.trace("{} shard is ready for closing", shardId);
}
}

@Override
Expand All @@ -136,14 +147,22 @@ public static class ShardRequest extends ReplicationRequest<ShardRequest> {

private final ClusterBlock clusterBlock;

private final boolean phase1;

ShardRequest(StreamInput in) throws IOException {
super(in);
clusterBlock = new ClusterBlock(in);
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
phase1 = in.readBoolean();
} else {
phase1 = false;
}
}

public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final TaskId parentTaskId) {
public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final boolean phase1, final TaskId parentTaskId) {
super(shardId);
this.clusterBlock = Objects.requireNonNull(clusterBlock);
this.phase1 = phase1;
setParentTask(parentTaskId);
}

Expand All @@ -161,10 +180,17 @@ public void readFrom(final StreamInput in) {
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
clusterBlock.writeTo(out);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
out.writeBoolean(phase1);
}
}

public ClusterBlock clusterBlock() {
return clusterBlock;
}

public boolean isPhase1() {
return phase1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public void execute() throws Exception {
private void handlePrimaryResult(final PrimaryResultT primaryResult) {
this.primaryResult = primaryResult;
primary.updateLocalCheckpointForShard(primary.routingEntry().allocationId().getId(), primary.localCheckpoint());
primary.updateGlobalCheckpointForShard(primary.routingEntry().allocationId().getId(), primary.globalCheckpoint());
final ReplicaRequest replicaRequest = primaryResult.replicaRequest();
if (replicaRequest != null) {
if (logger.isTraceEnabled()) {
Expand All @@ -123,7 +124,7 @@ private void handlePrimaryResult(final PrimaryResultT primaryResult) {
// is valid for this replication group. If we would sample in the reverse, the global checkpoint might be based on a subset
// of the sampled replication group, and advanced further than what the given replication group would allow it to.
// This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
final long globalCheckpoint = primary.globalCheckpoint();
final long globalCheckpoint = primary.computedGlobalCheckpoint();
// we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of
// max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed
// on.
Expand Down Expand Up @@ -341,16 +342,23 @@ public interface Primary<
void updateGlobalCheckpointForShard(String allocationId, long globalCheckpoint);

/**
* Returns the local checkpoint on the primary shard.
* Returns the persisted local checkpoint on the primary shard.
*
* @return the local checkpoint
*/
long localCheckpoint();

/**
* Returns the global checkpoint on the primary shard.
* Returns the global checkpoint computed on the primary shard.
*
* @return the global checkpoint
* @return the computed global checkpoint
*/
long computedGlobalCheckpoint();

/**
* Returns the persisted global checkpoint on the primary shard.
*
* @return the persisted global checkpoint
*/
long globalCheckpoint();

Expand Down Expand Up @@ -419,16 +427,16 @@ void performOn(ShardRouting replica, RequestT replicaRequest,
public interface ReplicaResponse {

/**
* The local checkpoint for the shard.
* The persisted local checkpoint for the shard.
*
* @return the local checkpoint
* @return the persisted local checkpoint
**/
long localCheckpoint();

/**
* The global checkpoint for the shard.
* The persisted global checkpoint for the shard.
*
* @return the global checkpoint
* @return the persisted global checkpoint
**/
long globalCheckpoint();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ public void onResponse(Releasable releasable) {
final ReplicaResult replicaResult = shardOperationOnReplica(replicaRequest.getRequest(), replica);
releasable.close(); // release shard operation lock before responding to caller
final TransportReplicationAction.ReplicaResponse response =
new ReplicaResponse(replica.getLocalCheckpoint(), replica.getGlobalCheckpoint());
new ReplicaResponse(replica.getLocalCheckpoint(), replica.getLastSyncedGlobalCheckpoint());
replicaResult.respond(new ResponseListener(response));
} catch (final Exception e) {
Releasables.closeWhileHandlingException(releasable); // release shard operation lock before responding to caller
Expand Down Expand Up @@ -892,10 +892,6 @@ public void close() {
operationLock.close();
}

public long getLocalCheckpoint() {
return indexShard.getLocalCheckpoint();
}

public ShardRouting routingEntry() {
return indexShard.routingEntry();
}
Expand Down Expand Up @@ -943,7 +939,12 @@ public long localCheckpoint() {

@Override
public long globalCheckpoint() {
return indexShard.getGlobalCheckpoint();
return indexShard.getLastSyncedGlobalCheckpoint();
}

@Override
public long computedGlobalCheckpoint() {
return indexShard.getLastKnownGlobalCheckpoint();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,11 +389,26 @@ private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shar
}
final TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), request.taskId());
final TransportVerifyShardBeforeCloseAction.ShardRequest shardRequest =
new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, closingBlock, parentTaskId);
new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, closingBlock, true, parentTaskId);
if (request.ackTimeout() != null) {
shardRequest.timeout(request.ackTimeout());
}
transportVerifyShardBeforeCloseAction.execute(shardRequest, listener);
transportVerifyShardBeforeCloseAction.execute(shardRequest, new ActionListener<>() {
@Override
public void onResponse(ReplicationResponse replicationResponse) {
final TransportVerifyShardBeforeCloseAction.ShardRequest shardRequest =
new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, closingBlock, false, parentTaskId);
if (request.ackTimeout() != null) {
shardRequest.timeout(request.ackTimeout());
}
transportVerifyShardBeforeCloseAction.execute(shardRequest, listener);
}

@Override
public void onFailure(Exception e) {
listener.onFailure(e);
}
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,9 +793,9 @@ public final CommitStats commitStats() {
}

/**
* @return the local checkpoint for this Engine
* @return the persisted local checkpoint for this Engine
*/
public abstract long getLocalCheckpoint();
public abstract long getPersistedLocalCheckpoint();

/**
* @return a {@link SeqNoStats} object, using local state and the supplied global checkpoint
Expand Down
Loading