Skip to content

Commit 6883c3f

Browse files
committed
Replicate max seq_no of updates to replicas
We start tracking max seq_no_of_updates on the primary in elastic#33842. This commit replicates that value from a primary to its replicas in replication requests or the translog phase of peer-recovery. With this change, we guarantee that the value of max seq_no_of_updates on a replica when any index/delete operation is performed at least the max_seq_no_of_updates on the primary when that operation was executed. Relates elastic#33656
1 parent 7944a0c commit 6883c3f

File tree

23 files changed

+383
-122
lines changed

23 files changed

+383
-122
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.elasticsearch.cluster.routing.ShardRouting;
3232
import org.elasticsearch.common.Nullable;
3333
import org.elasticsearch.common.io.stream.StreamInput;
34+
import org.elasticsearch.index.seqno.SequenceNumbers;
3435
import org.elasticsearch.index.shard.ReplicationGroup;
3536
import org.elasticsearch.index.shard.ShardId;
3637
import org.elasticsearch.rest.RestStatus;
@@ -114,9 +115,13 @@ public void execute() throws Exception {
114115
// of the sampled replication group, and advanced further than what the given replication group would allow it to.
115116
// This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint.
116117
final long globalCheckpoint = primary.globalCheckpoint();
118+
// we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of
119+
// max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed on.
120+
final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes();
121+
assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized";
117122
final ReplicationGroup replicationGroup = primary.getReplicationGroup();
118123
markUnavailableShardsAsStale(replicaRequest, replicationGroup);
119-
performOnReplicas(replicaRequest, globalCheckpoint, replicationGroup);
124+
performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);
120125
}
121126

122127
successfulShards.incrementAndGet(); // mark primary as successful
@@ -136,7 +141,7 @@ private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, Replica
136141
}
137142

138143
private void performOnReplicas(final ReplicaRequest replicaRequest, final long globalCheckpoint,
139-
final ReplicationGroup replicationGroup) {
144+
final long maxSeqNoOfUpdatesOrDeletes, final ReplicationGroup replicationGroup) {
140145
// for total stats, add number of unassigned shards and
141146
// number of initializing shards that are not ready yet to receive operations (recovery has not opened engine yet on the target)
142147
totalShards.addAndGet(replicationGroup.getSkippedShards().size());
@@ -145,19 +150,20 @@ private void performOnReplicas(final ReplicaRequest replicaRequest, final long g
145150

146151
for (final ShardRouting shard : replicationGroup.getReplicationTargets()) {
147152
if (shard.isSameAllocation(primaryRouting) == false) {
148-
performOnReplica(shard, replicaRequest, globalCheckpoint);
153+
performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
149154
}
150155
}
151156
}
152157

153-
private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest, final long globalCheckpoint) {
158+
private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest,
159+
final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes) {
154160
if (logger.isTraceEnabled()) {
155161
logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest);
156162
}
157163

158164
totalShards.incrementAndGet();
159165
pendingActions.incrementAndGet();
160-
replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, new ActionListener<ReplicaResponse>() {
166+
replicasProxy.performOn(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, new ActionListener<ReplicaResponse>() {
161167
@Override
162168
public void onResponse(ReplicaResponse response) {
163169
successfulShards.incrementAndGet();
@@ -322,6 +328,12 @@ public interface Primary<
322328
*/
323329
long globalCheckpoint();
324330

331+
/**
332+
* Returns the maximum seq_no of updates (index operations overwrite Lucene) or deletes on the primary.
333+
* This value must be captured after the execution of a replication request on the primary is completed.
334+
*/
335+
long maxSeqNoOfUpdatesOrDeletes();
336+
325337
/**
326338
* Returns the current replication group on the primary shard
327339
*
@@ -338,12 +350,15 @@ public interface Replicas<RequestT extends ReplicationRequest<RequestT>> {
338350
/**
339351
* Performs the specified request on the specified replica.
340352
*
341-
* @param replica the shard this request should be executed on
342-
* @param replicaRequest the operation to perform
343-
* @param globalCheckpoint the global checkpoint on the primary
344-
* @param listener callback for handling the response or failure
353+
* @param replica the shard this request should be executed on
354+
* @param replicaRequest the operation to perform
355+
* @param globalCheckpoint the global checkpoint on the primary
356+
* @param maxSeqNoOfUpdatesOrDeletes the max seq_no of updates (index operations overwriting Lucene) or deletes on primary
357+
* after this replication was executed on it.
358+
* @param listener callback for handling the response or failure
345359
*/
346-
void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpoint, ActionListener<ReplicaResponse> listener);
360+
void performOn(ShardRouting replica, RequestT replicaRequest, long globalCheckpoint,
361+
long maxSeqNoOfUpdatesOrDeletes, ActionListener<ReplicaResponse> listener);
347362

348363
/**
349364
* Fail the specified shard if needed, removing it from the current set

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ protected abstract PrimaryResult<ReplicaRequest, Response> shardOperationOnPrima
200200

201201
/**
202202
* Synchronously execute the specified replica operation. This is done under a permit from
203-
* {@link IndexShard#acquireReplicaOperationPermit(long, long, ActionListener, String, Object)}.
203+
* {@link IndexShard#acquireReplicaOperationPermit(long, long, long, ActionListener, String, Object)}.
204204
*
205205
* @param shardRequest the request to the replica shard
206206
* @param replica the replica shard to perform the operation on
@@ -489,6 +489,7 @@ public void messageReceived(
489489
replicaRequest.getTargetAllocationID(),
490490
replicaRequest.getPrimaryTerm(),
491491
replicaRequest.getGlobalCheckpoint(),
492+
replicaRequest.getMaxSeqNoOfUpdatesOrDeletes(),
492493
channel,
493494
(ReplicationTask) task).run();
494495
}
@@ -513,6 +514,7 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio
513514
private final String targetAllocationID;
514515
private final long primaryTerm;
515516
private final long globalCheckpoint;
517+
private final long maxSeqNoOfUpdatesOrDeletes;
516518
private final TransportChannel channel;
517519
private final IndexShard replica;
518520
/**
@@ -528,6 +530,7 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio
528530
String targetAllocationID,
529531
long primaryTerm,
530532
long globalCheckpoint,
533+
long maxSeqNoOfUpdatesOrDeletes,
531534
TransportChannel channel,
532535
ReplicationTask task) {
533536
this.request = request;
@@ -536,6 +539,7 @@ private final class AsyncReplicaAction extends AbstractRunnable implements Actio
536539
this.targetAllocationID = targetAllocationID;
537540
this.primaryTerm = primaryTerm;
538541
this.globalCheckpoint = globalCheckpoint;
542+
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
539543
final ShardId shardId = request.shardId();
540544
assert shardId != null : "request shardId must be set";
541545
this.replica = getIndexShard(shardId);
@@ -575,7 +579,8 @@ public void onNewClusterState(ClusterState state) {
575579
new TransportChannelResponseHandler<>(logger, channel, extraMessage,
576580
() -> TransportResponse.Empty.INSTANCE);
577581
transportService.sendRequest(clusterService.localNode(), transportReplicaAction,
578-
new ConcreteReplicaRequest<>(request, targetAllocationID, primaryTerm, globalCheckpoint),
582+
new ConcreteReplicaRequest<>(request, targetAllocationID, primaryTerm,
583+
globalCheckpoint, maxSeqNoOfUpdatesOrDeletes),
579584
handler);
580585
}
581586

@@ -613,7 +618,7 @@ protected void doRun() throws Exception {
613618
throw new ShardNotFoundException(this.replica.shardId(), "expected aID [{}] but found [{}]", targetAllocationID,
614619
actualAllocationId);
615620
}
616-
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, this, executor, request);
621+
replica.acquireReplicaOperationPermit(primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, this, executor, request);
617622
}
618623

619624
/**
@@ -1023,6 +1028,11 @@ public long globalCheckpoint() {
10231028
return indexShard.getGlobalCheckpoint();
10241029
}
10251030

1031+
@Override
1032+
public long maxSeqNoOfUpdatesOrDeletes() {
1033+
return indexShard.getMaxSeqNoOfUpdatesOrDeletes();
1034+
}
1035+
10261036
@Override
10271037
public ReplicationGroup getReplicationGroup() {
10281038
return indexShard.getReplicationGroup();
@@ -1107,15 +1117,16 @@ public void performOn(
11071117
final ShardRouting replica,
11081118
final ReplicaRequest request,
11091119
final long globalCheckpoint,
1120+
final long maxSeqNoOfUpdatesOrDeletes,
11101121
final ActionListener<ReplicationOperation.ReplicaResponse> listener) {
11111122
String nodeId = replica.currentNodeId();
11121123
final DiscoveryNode node = clusterService.state().nodes().get(nodeId);
11131124
if (node == null) {
11141125
listener.onFailure(new NoNodeAvailableException("unknown node [" + nodeId + "]"));
11151126
return;
11161127
}
1117-
final ConcreteReplicaRequest<ReplicaRequest> replicaRequest =
1118-
new ConcreteReplicaRequest<>(request, replica.allocationId().getId(), primaryTerm, globalCheckpoint);
1128+
final ConcreteReplicaRequest<ReplicaRequest> replicaRequest = new ConcreteReplicaRequest<>(
1129+
request, replica.allocationId().getId(), primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
11191130
sendReplicaRequest(replicaRequest, node, listener);
11201131
}
11211132

@@ -1263,15 +1274,17 @@ public String toString() {
12631274
protected static final class ConcreteReplicaRequest<R extends TransportRequest> extends ConcreteShardRequest<R> {
12641275

12651276
private long globalCheckpoint;
1277+
private long maxSeqNoOfUpdatesOrDeletes;
12661278

12671279
public ConcreteReplicaRequest(final Supplier<R> requestSupplier) {
12681280
super(requestSupplier);
12691281
}
12701282

12711283
public ConcreteReplicaRequest(final R request, final String targetAllocationID, final long primaryTerm,
1272-
final long globalCheckpoint) {
1284+
final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes) {
12731285
super(request, targetAllocationID, primaryTerm);
12741286
this.globalCheckpoint = globalCheckpoint;
1287+
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
12751288
}
12761289

12771290
@Override
@@ -1282,6 +1295,13 @@ public void readFrom(StreamInput in) throws IOException {
12821295
} else {
12831296
globalCheckpoint = SequenceNumbers.UNASSIGNED_SEQ_NO;
12841297
}
1298+
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
1299+
maxSeqNoOfUpdatesOrDeletes = in.readZLong();
1300+
} else {
1301+
// UNASSIGNED_SEQ_NO (-2) means uninitialized, and replicas will disable
1302+
// optimization using seq_no if its max_seq_no_of_updates is still uninitialized
1303+
maxSeqNoOfUpdatesOrDeletes = SequenceNumbers.UNASSIGNED_SEQ_NO;
1304+
}
12851305
}
12861306

12871307
@Override
@@ -1290,19 +1310,27 @@ public void writeTo(StreamOutput out) throws IOException {
12901310
if (out.getVersion().onOrAfter(Version.V_6_0_0_alpha1)) {
12911311
out.writeZLong(globalCheckpoint);
12921312
}
1313+
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
1314+
out.writeZLong(maxSeqNoOfUpdatesOrDeletes);
1315+
}
12931316
}
12941317

12951318
public long getGlobalCheckpoint() {
12961319
return globalCheckpoint;
12971320
}
12981321

1322+
public long getMaxSeqNoOfUpdatesOrDeletes() {
1323+
return maxSeqNoOfUpdatesOrDeletes;
1324+
}
1325+
12991326
@Override
13001327
public String toString() {
13011328
return "ConcreteReplicaRequest{" +
13021329
"targetAllocationID='" + getTargetAllocationID() + '\'' +
13031330
", primaryTerm='" + getPrimaryTerm() + '\'' +
13041331
", request=" + getRequest() +
13051332
", globalCheckpoint=" + globalCheckpoint +
1333+
", maxSeqNoOfUpdatesOrDeletes=" + maxSeqNoOfUpdatesOrDeletes +
13061334
'}';
13071335
}
13081336
}

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.lucene.util.InfoStream;
4848
import org.elasticsearch.Assertions;
4949
import org.elasticsearch.ExceptionsHelper;
50+
import org.elasticsearch.Version;
5051
import org.elasticsearch.action.index.IndexRequest;
5152
import org.elasticsearch.common.Nullable;
5253
import org.elasticsearch.common.SuppressForbidden;
@@ -905,6 +906,7 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO
905906
}
906907
}
907908
}
909+
assert assertMaxSeqNoOfUpdatesIsPropagated(index, plan);
908910
return plan;
909911
}
910912

@@ -1213,6 +1215,7 @@ protected DeletionStrategy deletionStrategyForOperation(final Delete delete) thr
12131215

12141216
protected final DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOException {
12151217
assertNonPrimaryOrigin(delete);
1218+
assert assertMaxSeqNoOfUpdatesIsPropagated(delete);
12161219
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(delete.seqNo(), curr));
12171220
assert maxSeqNoOfNonAppendOnlyOperations.get() >= delete.seqNo() : "max_seqno of non-append-only was not updated;" +
12181221
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of delete [" + delete.seqNo() + "]";
@@ -2556,6 +2559,27 @@ private void updateAutoIdTimestamp(long newTimestamp, boolean unsafe) {
25562559
assert maxUnsafeAutoIdTimestamp.get() <= maxSeenAutoIdTimestamp.get();
25572560
}
25582561

2562+
private boolean assertMaxSeqNoOfUpdatesIsPropagated(Delete delete) {
2563+
final long maxSeqNoOfUpdates = getMaxSeqNoOfUpdatesOrDeletes();
2564+
final Version indexVersion = config().getIndexSettings().getIndexVersionCreated();
2565+
assert delete.seqNo() <= maxSeqNoOfUpdates ||
2566+
(maxSeqNoOfUpdates == SequenceNumbers.UNASSIGNED_SEQ_NO && indexVersion.before(Version.V_7_0_0_alpha1)) :
2567+
"id=" + delete.id() + " seq_no=" + delete.seqNo() + " max_seq_no_of_updates=" + maxSeqNoOfUpdates + " index_version=" + indexVersion;
2568+
return true;
2569+
}
2570+
2571+
private boolean assertMaxSeqNoOfUpdatesIsPropagated(Index index, IndexingStrategy plan) {
2572+
final long maxSeqNoOfUpdates = getMaxSeqNoOfUpdatesOrDeletes();
2573+
final Version indexVersion = config().getIndexSettings().getIndexVersionCreated();
2574+
assert plan.useLuceneUpdateDocument == false
2575+
|| index.seqNo() <= maxSeqNoOfUpdates // msu must be propagated
2576+
|| getLocalCheckpoint() < maxSeqNoOfUpdates // or gap in the sequence number
2577+
|| (maxSeqNoOfUpdates == SequenceNumbers.UNASSIGNED_SEQ_NO && indexVersion.before(Version.V_7_0_0_alpha1))
2578+
// we treat a deleted doc in the tombstone as a valid doc then use updateDocument to overwrite
2579+
|| (versionMap.getUnderLock(index.uid().bytes()) != null && versionMap.getUnderLock(index.uid().bytes()).isDelete()) :
2580+
"id=" + index.id() + " seq_no=" + index.seqNo() + " max_seq_no_of_updates=" + maxSeqNoOfUpdates + " index_version=" + indexVersion;
2581+
return true;
2582+
}
25592583

25602584
@Override
25612585
public void initializeMaxSeqNoOfUpdatesOrDeletes() {

0 commit comments

Comments
 (0)