Skip to content

Commit b5f129b

Browse files
dnhatnkcm
authored andcommitted
Replicate max seq_no of updates to replicas (#33967)
We start tracking max seq_no_of_updates on the primary in #33842. This commit replicates that value from a primary to its replicas in replication requests or the translog phase of peer-recovery. With this change, we guarantee that the value of max seq_no_of_updates on a replica when any index/delete operation is performed at least the max_seq_no_of_updates on the primary when that operation was executed. Relates #33656
1 parent 726674c commit b5f129b

File tree

24 files changed

+387
-126
lines changed

24 files changed

+387
-126
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.cluster.routing.ShardRouting;
3232
import org.elasticsearch.common.Nullable;
3333
import org.elasticsearch.common.io.stream.StreamInput;
34+
import org.elasticsearch.index.seqno.SequenceNumbers;
3435
import org.elasticsearch.index.shard.ReplicationGroup;
3536
import org.elasticsearch.index.shard.ShardId;
3637
import org.elasticsearch.rest.RestStatus;
@@ -114,9 +115,13 @@ public void execute() throws Exception {
114115
// of the sampled replication group, and advanced further than what the given replication group would allow it to.
115116
// This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
116117
final long globalCheckpoint = primary.globalCheckpoint();
118+
// we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of
119+
// max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed on.
120+
final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes();
121+
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized";
117122
final ReplicationGroup replicationGroup = primary.getReplicationGroup();
118123
markUnavailableShardsAsStale(replicaRequest, replicationGroup);
119-
performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup);
124+
performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
120125
}
121126

122127
successfulShards.incrementAndGet(); // mark primary as successful
@@ -136,7 +141,7 @@ private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Replica
136141
}
137142

138143
private void performOnReplicas(final ReplicaRequest replicaRequest, final long globalCheckpoint,
139-
final ReplicationGroup replicationGroup) {
144+
final long maxSeqNoOfUpdatesOrDeletes, final ReplicationGroup replicationGroup) {
140145
// for total stats, add number of unassigned shards and
141146
// number of initializing shards that are not ready yet to receive operations (recovery has not opened engine yet on the target)
142147
totalShards.addAndGet(replicationGroup.getSkippedShards().size());
@@ -145,19 +150,20 @@ private void performOnReplicas(final ReplicaRequest replicaRequest, final long g
145150

146151
for (final ShardRouting shard : replicationGroup.getReplicationTargets()) {
147152
if (shard.isSameAllocation(primaryRouting) == false) {
148-
performOnReplica(shard, replicaRequest, globalCheckpoint);
153+
performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
149154
}
150155
}
151156
}
152157

153-
private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest, final long globalCheckpoint) {
158+
private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest,
159+
final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes) {
154160
if (logger.isTraceEnabled()) {
155161
logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest);
156162
}
157163

158164
totalShards.incrementAndGet();
159165
pendingActions.incrementAndGet();
160-
replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, new ActionListener<ReplicaResponse>() {
166+
replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, new ActionListener<ReplicaResponse>() {
161167
@Override
162168
public void onResponse(ReplicaResponse response) {
163169
successfulShards.incrementAndGet();
@@ -322,6 +328,12 @@ public interface Primary<
322328
*/
323329
long globalCheckpoint();
324330

331+
/**
332+
* Returns the maximum seq_no of updates (index operations overwrite Lucene) or deletes on the primary.
333+
* This value must be captured after the execution of a replication request on the primary is completed.
334+
*/
335+
long maxSeqNoOfUpdatesOrDeletes();
336+
325337
/**
326338
* Returns the current replication group on the primary shard
327339
*
@@ -338,12 +350,15 @@ public interface Replicas<RequestT extends ReplicationRequest<RequestT>> {
338350
/**
339351
* Performs the specified request on the specified replica.
340352
*
341-
* @param replica the shard this request should be executed on
342-
* @param replicaRequest the operation to perform
343-
* @param globalCheckpoint the global checkpoint on the primary
344-
* @param listener callback for handling the response or failure
353+
* @param replica the shard this request should be executed on
354+
* @param replicaRequest the operation to perform
355+
* @param globalCheckpoint the global checkpoint on the primary
356+
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwriting Lucene) or deletes on primary
357+
* after this replication was executed on it.
358+
* @param listener callback for handling the response or failure
345359
*/
346-
void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpoint, ActionListener<ReplicaResponse> listener);
360+
void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpoint,
361+
long maxSeqNoOfUpdatesOrDeletes, ActionListener<ReplicaResponse> listener);
347362

348363
/**
349364
* Fail the specified shard if needed, removing it from the current set

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ protected abstract PrimaryResult<ReplicaRequest, Response> shardOperationOnPrima
200200

201201
/**
202202
* Synchronously execute the specified replica operation. This is done under a permit from
203-
* {@link IndexShard#acquireReplicaOperationPermit(long, long, ActionListener, String, Object)}.
203+
* {@link IndexShard#acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)}.
204204
*
205205
* @param shardRequest the request to the replica shard
206206
* @param replica the replica shard to perform the operation on
@@ -489,6 +489,7 @@ public void messageReceived(
489489
replicaRequest.getTargetAllocationID(),
490490
replicaRequest.getPrimaryTerm(),
491491
replicaRequest.getGlobalCheckpoint(),
492+
replicaRequest.getMaxSeqNoOfUpdatesOrDeletes(),
492493
channel,
493494
(ReplicationTask) task).run();
494495
}
@@ -513,6 +514,7 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio
513514
private final String targetAllocationID;
514515
private final long primaryTerm;
515516
private final long globalCheckpoint;
517+
private final long maxSeqNoOfUpdatesOrDeletes;
516518
private final TransportChannel channel;
517519
private final IndexShard replica;
518520
/**
@@ -528,6 +530,7 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio
528530
String targetAllocationID,
529531
long primaryTerm,
530532
long globalCheckpoint,
533+
long maxSeqNoOfUpdatesOrDeletes,
531534
TransportChannel channel,
532535
ReplicationTask task) {
533536
this.request = request;
@@ -536,6 +539,7 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio
536539
this.targetAllocationID = targetAllocationID;
537540
this.primaryTerm = primaryTerm;
538541
this.globalCheckpoint = globalCheckpoint;
542+
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
539543
final ShardId shardId = request.shardId();
540544
assert shardId != null : "request shardId must be set";
541545
this.replica = getIndexShard(shardId);
@@ -575,7 +579,8 @@ public void onNewClusterState(ClusterState state) {
575579
new TransportChannelResponseHandler<>(logger, channel, extraMessage,
576580
() -> TransportResponse.Empty.INSTANCE);
577581
transportService.sendRequest(clusterService.localNode(), transportReplicaAction,
578-
new ConcreteReplicaRequest<>(request, targetAllocationID, primaryTerm, globalCheckpoint),
582+
new ConcreteReplicaRequest<>(request, targetAllocationID, primaryTerm,
583+
globalCheckpoint, maxSeqNoOfUpdatesOrDeletes),
579584
handler);
580585
}
581586

@@ -613,7 +618,7 @@ protected void doRun() throws Exception {
613618
throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID,
614619
actualAllocationId);
615620
}
616-
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, this, executor, request);
621+
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, this, executor, request);
617622
}
618623

619624
/**
@@ -1023,6 +1028,11 @@ public long globalCheckpoint() {
10231028
return indexShard.getGlobalCheckpoint();
10241029
}
10251030

1031+
@Override
1032+
public long maxSeqNoOfUpdatesOrDeletes() {
1033+
return indexShard.getMaxSeqNoOfUpdatesOrDeletes();
1034+
}
1035+
10261036
@Override
10271037
public ReplicationGroup getReplicationGroup() {
10281038
return indexShard.getReplicationGroup();
@@ -1107,15 +1117,16 @@ public void performOn(
11071117
final ShardRouting replica,
11081118
final ReplicaRequest request,
11091119
final long globalCheckpoint,
1120+
final long maxSeqNoOfUpdatesOrDeletes,
11101121
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
11111122
String nodeId = replica.currentNodeId();
11121123
final DiscoveryNode node = clusterService.state().nodes().get(nodeId);
11131124
if (node == null) {
11141125
listener.onFailure(new NoNodeAvailableException("unknown node [" + nodeId + "]"));
11151126
return;
11161127
}
1117-
final ConcreteReplicaRequest<ReplicaRequest> replicaRequest =
1118-
new ConcreteReplicaRequest<>(request, replica.allocationId().getId(), primaryTerm, globalCheckpoint);
1128+
final ConcreteReplicaRequest<ReplicaRequest> replicaRequest = new ConcreteReplicaRequest<>(
1129+
request, replica.allocationId().getId(), primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
11191130
sendReplicaRequest(replicaRequest, node, listener);
11201131
}
11211132

@@ -1263,15 +1274,17 @@ public String toString() {
12631274
protected static final class ConcreteReplicaRequest<R extends TransportRequest> extends ConcreteShardRequest<R> {
12641275

12651276
private long globalCheckpoint;
1277+
private long maxSeqNoOfUpdatesOrDeletes;
12661278

12671279
public ConcreteReplicaRequest(final Supplier<R> requestSupplier) {
12681280
super(requestSupplier);
12691281
}
12701282

12711283
public ConcreteReplicaRequest(final R request, final String targetAllocationID, final long primaryTerm,
1272-
final long globalCheckpoint) {
1284+
final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes) {
12731285
super(request, targetAllocationID, primaryTerm);
12741286
this.globalCheckpoint = globalCheckpoint;
1287+
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
12751288
}
12761289

12771290
@Override
@@ -1282,6 +1295,13 @@ public void readFrom(StreamInput in) throws IOException {
12821295
} else {
12831296
globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
12841297
}
1298+
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
1299+
maxSeqNoOfUpdatesOrDeletes = in.readZLong();
1300+
} else {
1301+
// UNASSIGNED_SEQ_NO (-2) means uninitialized, and replicas will disable
1302+
// optimization using seq_no if its max_seq_no_of_updates is still uninitialized
1303+
maxSeqNoOfUpdatesOrDeletes = SequenceNumbers.UNASSIGNED_SEQ_NO;
1304+
}
12851305
}
12861306

12871307
@Override
@@ -1290,19 +1310,27 @@ public void writeTo(StreamOutput out) throws IOException {
12901310
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
12911311
out.writeZLong(globalCheckpoint);
12921312
}
1313+
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
1314+
out.writeZLong(maxSeqNoOfUpdatesOrDeletes);
1315+
}
12931316
}
12941317

12951318
public long getGlobalCheckpoint() {
12961319
return globalCheckpoint;
12971320
}
12981321

1322+
public long getMaxSeqNoOfUpdatesOrDeletes() {
1323+
return maxSeqNoOfUpdatesOrDeletes;
1324+
}
1325+
12991326
@Override
13001327
public String toString() {
13011328
return "ConcreteReplicaRequest{" +
13021329
"targetAllocationID='" + getTargetAllocationID() + '\'' +
13031330
", primaryTerm='" + getPrimaryTerm() + '\'' +
13041331
", request=" + getRequest() +
13051332
", globalCheckpoint=" + globalCheckpoint +
1333+
", maxSeqNoOfUpdatesOrDeletes=" + maxSeqNoOfUpdatesOrDeletes +
13061334
'}';
13071335
}
13081336
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.lucene.util.InfoStream;
4848
import org.elasticsearch.Assertions;
4949
import org.elasticsearch.ExceptionsHelper;
50+
import org.elasticsearch.Version;
5051
import org.elasticsearch.action.index.IndexRequest;
5152
import org.elasticsearch.common.Nullable;
5253
import org.elasticsearch.common.SuppressForbidden;
@@ -976,6 +977,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
976977
if (plan.addStaleOpToLucene) {
977978
addStaleDocs(index.docs(), indexWriter);
978979
} else if (plan.useLuceneUpdateDocument) {
980+
assert assertMaxSeqNoOfUpdatesIsAdvanced(index.uid(), plan.seqNoForIndexing, true, true);
979981
updateDocs(index.uid(), index.docs(), indexWriter);
980982
} else {
981983
// document does not exists, we can optimize for create, but double check if assertions are running
@@ -1275,8 +1277,8 @@ protected final DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOE
12751277
return plan;
12761278
}
12771279

1278-
private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan)
1279-
throws IOException {
1280+
private DeleteResult deleteInLucene(Delete delete, DeletionStrategy plan) throws IOException {
1281+
assert assertMaxSeqNoOfUpdatesIsAdvanced(delete.uid(), plan.seqNoOfDeletion, false, false);
12801282
try {
12811283
if (softDeleteEnabled) {
12821284
final ParsedDocument tombstone = engineConfig.getTombstoneDocSupplier().newDeleteTombstoneDoc(delete.type(), delete.id());
@@ -2556,6 +2558,29 @@ private void updateAutoIdTimestamp(long newTimestamp, boolean unsafe) {
25562558
assert maxUnsafeAutoIdTimestamp.get() <= maxSeenAutoIdTimestamp.get();
25572559
}
25582560

2561+
private boolean assertMaxSeqNoOfUpdatesIsAdvanced(Term id, long seqNo, boolean allowDeleted, boolean relaxIfGapInSeqNo) {
2562+
final long maxSeqNoOfUpdates = getMaxSeqNoOfUpdatesOrDeletes();
2563+
// If the primary is on an old version which does not replicate msu, we need to relax this assertion for that.
2564+
if (maxSeqNoOfUpdates == SequenceNumbers.UNASSIGNED_SEQ_NO) {
2565+
assert config().getIndexSettings().getIndexVersionCreated().before(Version.V_7_0_0_alpha1);
2566+
return true;
2567+
}
2568+
// We treat a delete on the tombstones on replicas as a regular document, then use updateDocument (not addDocument).
2569+
if (allowDeleted) {
2570+
final VersionValue versionValue = versionMap.getVersionForAssert(id.bytes());
2571+
if (versionValue != null && versionValue.isDelete()) {
2572+
return true;
2573+
}
2574+
}
2575+
// Operations can be processed on a replica in a different order than on the primary. If the order on the primary is index-1,
2576+
// delete-2, index-3, and the order on a replica is index-1, index-3, delete-2, then the msu of index-3 on the replica is 2
2577+
// even though it is an update (overwrites index-1). We should relax this assertion if there is a pending gap in the seq_no.
2578+
if (relaxIfGapInSeqNo && getLocalCheckpoint() < maxSeqNoOfUpdates) {
2579+
return true;
2580+
}
2581+
assert seqNo <= maxSeqNoOfUpdates : "id=" + id + " seq_no=" + seqNo + " msu=" + maxSeqNoOfUpdates;
2582+
return true;
2583+
}
25592584

25602585
@Override
25612586
public void initializeMaxSeqNoOfUpdatesOrDeletes() {

0 commit comments

Comments
 (0)