Skip to content

Commit 0504cb6

Browse files
dnhatnkcm
authored andcommitted
CCR: replicates max seq_no of updates to follower (#34051)
This commit replicates the max_seq_no_of_updates on the leading index to the primaries of the following index via ShardFollowNodeTask. The max_seq_of_updates is then transmitted to the replicas of the follower via replication requests (that's BulkShardOperationsRequest). Relates #33656
1 parent b5b0d82 commit 0504cb6

11 files changed

+132
-28
lines changed

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardChangesAction.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,12 @@ public long getMaxSeqNo() {
207207
return maxSeqNo;
208208
}
209209

210+
private long maxSeqNoOfUpdatesOrDeletes;
211+
212+
public long getMaxSeqNoOfUpdatesOrDeletes() {
213+
return maxSeqNoOfUpdatesOrDeletes;
214+
}
215+
210216
private Translog.Operation[] operations;
211217

212218
public Translog.Operation[] getOperations() {
@@ -220,11 +226,13 @@ public Translog.Operation[] getOperations() {
220226
final long mappingVersion,
221227
final long globalCheckpoint,
222228
final long maxSeqNo,
229+
final long maxSeqNoOfUpdatesOrDeletes,
223230
final Translog.Operation[] operations) {
224231

225232
this.mappingVersion = mappingVersion;
226233
this.globalCheckpoint = globalCheckpoint;
227234
this.maxSeqNo = maxSeqNo;
235+
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
228236
this.operations = operations;
229237
}
230238

@@ -234,6 +242,7 @@ public void readFrom(final StreamInput in) throws IOException {
234242
mappingVersion = in.readVLong();
235243
globalCheckpoint = in.readZLong();
236244
maxSeqNo = in.readZLong();
245+
maxSeqNoOfUpdatesOrDeletes = in.readZLong();
237246
operations = in.readArray(Translog.Operation::readOperation, Translog.Operation[]::new);
238247
}
239248

@@ -243,6 +252,7 @@ public void writeTo(final StreamOutput out) throws IOException {
243252
out.writeVLong(mappingVersion);
244253
out.writeZLong(globalCheckpoint);
245254
out.writeZLong(maxSeqNo);
255+
out.writeZLong(maxSeqNoOfUpdatesOrDeletes);
246256
out.writeArray(Translog.Operation::writeOperation, operations);
247257
}
248258

@@ -254,12 +264,13 @@ public boolean equals(final Object o) {
254264
return mappingVersion == that.mappingVersion &&
255265
globalCheckpoint == that.globalCheckpoint &&
256266
maxSeqNo == that.maxSeqNo &&
267+
maxSeqNoOfUpdatesOrDeletes == that.maxSeqNoOfUpdatesOrDeletes &&
257268
Arrays.equals(operations, that.operations);
258269
}
259270

260271
@Override
261272
public int hashCode() {
262-
return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, Arrays.hashCode(operations));
273+
return Objects.hash(mappingVersion, globalCheckpoint, maxSeqNo, maxSeqNoOfUpdatesOrDeletes, Arrays.hashCode(operations));
263274
}
264275
}
265276

@@ -294,7 +305,9 @@ protected Response shardOperation(Request request, ShardId shardId) throws IOExc
294305
request.getMaxOperationCount(),
295306
request.getExpectedHistoryUUID(),
296307
request.getMaxOperationSizeInBytes());
297-
return getResponse(mappingVersion, seqNoStats, operations);
308+
// must capture after after snapshotting operations to ensure this MUS is at least the highest MUS of any of these operations.
309+
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
310+
return getResponse(mappingVersion, seqNoStats, maxSeqNoOfUpdatesOrDeletes, operations);
298311
}
299312

300313
@Override
@@ -358,7 +371,8 @@ private void globalCheckpointAdvancementFailure(
358371
final long mappingVersion =
359372
clusterService.state().metaData().index(shardId.getIndex()).getMappingVersion();
360373
final SeqNoStats latestSeqNoStats = indexShard.seqNoStats();
361-
listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, EMPTY_OPERATIONS_ARRAY));
374+
final long maxSeqNoOfUpdatesOrDeletes = indexShard.getMaxSeqNoOfUpdatesOrDeletes();
375+
listener.onResponse(getResponse(mappingVersion, latestSeqNoStats, maxSeqNoOfUpdatesOrDeletes, EMPTY_OPERATIONS_ARRAY));
362376
} catch (final Exception caught) {
363377
caught.addSuppressed(e);
364378
listener.onFailure(caught);
@@ -433,8 +447,9 @@ static Translog.Operation[] getOperations(IndexShard indexShard,
433447
return operations.toArray(EMPTY_OPERATIONS_ARRAY);
434448
}
435449

436-
static Response getResponse(final long mappingVersion, final SeqNoStats seqNoStats, final Translog.Operation[] operations) {
437-
return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), operations);
450+
static Response getResponse(final long mappingVersion, final SeqNoStats seqNoStats,
451+
final long maxSeqNoOfUpdates, final Translog.Operation[] operations) {
452+
return new Response(mappingVersion, seqNoStats.getGlobalCheckpoint(), seqNoStats.getMaxSeqNo(), maxSeqNoOfUpdates, operations);
438453
}
439454

440455
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowNodeTask.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.common.logging.Loggers;
1717
import org.elasticsearch.common.transport.NetworkExceptionHelper;
1818
import org.elasticsearch.common.unit.TimeValue;
19+
import org.elasticsearch.index.seqno.SequenceNumbers;
1920
import org.elasticsearch.index.shard.ShardId;
2021
import org.elasticsearch.index.translog.Translog;
2122
import org.elasticsearch.persistent.AllocatedPersistentTask;
@@ -56,6 +57,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
5657

5758
private long leaderGlobalCheckpoint;
5859
private long leaderMaxSeqNo;
60+
private long leaderMaxSeqNoOfUpdatesOrDeletes = SequenceNumbers.UNASSIGNED_SEQ_NO;
5961
private long lastRequestedSeqNo;
6062
private long followerGlobalCheckpoint = 0;
6163
private long followerMaxSeqNo = 0;
@@ -201,7 +203,7 @@ private synchronized void coordinateWrites() {
201203
numConcurrentWrites++;
202204
LOGGER.trace("{}[{}] write [{}/{}] [{}]", params.getFollowShardId(), numConcurrentWrites, ops.get(0).seqNo(),
203205
ops.get(ops.size() - 1).seqNo(), ops.size());
204-
sendBulkShardOperationsRequest(ops);
206+
sendBulkShardOperationsRequest(ops, leaderMaxSeqNoOfUpdatesOrDeletes, new AtomicInteger(0));
205207
}
206208
}
207209

@@ -262,6 +264,7 @@ synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, Shar
262264
onOperationsFetched(response.getOperations());
263265
leaderGlobalCheckpoint = Math.max(leaderGlobalCheckpoint, response.getGlobalCheckpoint());
264266
leaderMaxSeqNo = Math.max(leaderMaxSeqNo, response.getMaxSeqNo());
267+
leaderMaxSeqNoOfUpdatesOrDeletes = SequenceNumbers.max(leaderMaxSeqNoOfUpdatesOrDeletes, response.getMaxSeqNoOfUpdatesOrDeletes());
265268
final long newFromSeqNo;
266269
if (response.getOperations().length == 0) {
267270
newFromSeqNo = from;
@@ -291,13 +294,11 @@ synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, Shar
291294
}
292295
}
293296

294-
private void sendBulkShardOperationsRequest(List<Translog.Operation> operations) {
295-
sendBulkShardOperationsRequest(operations, new AtomicInteger(0));
296-
}
297-
298-
private void sendBulkShardOperationsRequest(List<Translog.Operation> operations, AtomicInteger retryCounter) {
297+
private void sendBulkShardOperationsRequest(List<Translog.Operation> operations, long leaderMaxSeqNoOfUpdatesOrDeletes,
298+
AtomicInteger retryCounter) {
299+
assert leaderMaxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "mus is not replicated";
299300
final long startTime = relativeTimeProvider.getAsLong();
300-
innerSendBulkShardOperationsRequest(operations,
301+
innerSendBulkShardOperationsRequest(operations, leaderMaxSeqNoOfUpdatesOrDeletes,
301302
response -> {
302303
synchronized (ShardFollowNodeTask.this) {
303304
totalIndexTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
@@ -311,7 +312,8 @@ private void sendBulkShardOperationsRequest(List<Translog.Operation> operations,
311312
totalIndexTimeMillis += TimeUnit.NANOSECONDS.toMillis(relativeTimeProvider.getAsLong() - startTime);
312313
numberOfFailedBulkOperations++;
313314
}
314-
handleFailure(e, retryCounter, () -> sendBulkShardOperationsRequest(operations, retryCounter));
315+
handleFailure(e, retryCounter,
316+
() -> sendBulkShardOperationsRequest(operations, leaderMaxSeqNoOfUpdatesOrDeletes, retryCounter));
315317
}
316318
);
317319
}
@@ -383,8 +385,8 @@ private static boolean shouldRetry(Exception e) {
383385
// These methods are protected for testing purposes:
384386
protected abstract void innerUpdateMapping(LongConsumer handler, Consumer<Exception> errorHandler);
385387

386-
protected abstract void innerSendBulkShardOperationsRequest(
387-
List<Translog.Operation> operations, Consumer<BulkShardOperationsResponse> handler, Consumer<Exception> errorHandler);
388+
protected abstract void innerSendBulkShardOperationsRequest(List<Translog.Operation> operations, long leaderMaxSeqNoOfUpdatesOrDeletes,
389+
Consumer<BulkShardOperationsResponse> handler, Consumer<Exception> errorHandler);
388390

389391
protected abstract void innerSendShardChangesRequest(long from, int maxOperationCount, Consumer<ShardChangesAction.Response> handler,
390392
Consumer<Exception> errorHandler);

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/ShardFollowTasksExecutor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,9 +133,11 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> erro
133133
@Override
134134
protected void innerSendBulkShardOperationsRequest(
135135
final List<Translog.Operation> operations,
136+
final long maxSeqNoOfUpdatesOrDeletes,
136137
final Consumer<BulkShardOperationsResponse> handler,
137138
final Consumer<Exception> errorHandler) {
138-
final BulkShardOperationsRequest request = new BulkShardOperationsRequest(params.getFollowShardId(), operations);
139+
final BulkShardOperationsRequest request = new BulkShardOperationsRequest(
140+
params.getFollowShardId(), operations, maxSeqNoOfUpdatesOrDeletes);
139141
followerClient.execute(BulkShardOperationsAction.INSTANCE, request,
140142
ActionListener.wrap(response -> handler.accept(response), errorHandler));
141143
}

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/BulkShardOperationsRequest.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,29 +17,37 @@
1717
public final class BulkShardOperationsRequest extends ReplicatedWriteRequest<BulkShardOperationsRequest> {
1818

1919
private List<Translog.Operation> operations;
20+
private long maxSeqNoOfUpdatesOrDeletes;
2021

2122
public BulkShardOperationsRequest() {
2223
}
2324

24-
public BulkShardOperationsRequest(final ShardId shardId, final List<Translog.Operation> operations) {
25+
public BulkShardOperationsRequest(ShardId shardId, List<Translog.Operation> operations, long maxSeqNoOfUpdatesOrDeletes) {
2526
super(shardId);
2627
setRefreshPolicy(RefreshPolicy.NONE);
2728
this.operations = operations;
29+
this.maxSeqNoOfUpdatesOrDeletes = maxSeqNoOfUpdatesOrDeletes;
2830
}
2931

3032
public List<Translog.Operation> getOperations() {
3133
return operations;
3234
}
3335

36+
public long getMaxSeqNoOfUpdatesOrDeletes() {
37+
return maxSeqNoOfUpdatesOrDeletes;
38+
}
39+
3440
@Override
3541
public void readFrom(final StreamInput in) throws IOException {
3642
super.readFrom(in);
43+
maxSeqNoOfUpdatesOrDeletes = in.readZLong();
3744
operations = in.readList(Translog.Operation::readOperation);
3845
}
3946

4047
@Override
4148
public void writeTo(final StreamOutput out) throws IOException {
4249
super.writeTo(out);
50+
out.writeZLong(maxSeqNoOfUpdatesOrDeletes);
4351
out.writeVInt(operations.size());
4452
for (Translog.Operation operation : operations) {
4553
Translog.Operation.writeOperation(out, operation);
@@ -50,6 +58,7 @@ public void writeTo(final StreamOutput out) throws IOException {
5058
public String toString() {
5159
return "BulkShardOperationsRequest{" +
5260
"operations=" + operations.size()+
61+
", maxSeqNoUpdates=" + maxSeqNoOfUpdatesOrDeletes +
5362
", shardId=" + shardId +
5463
", timeout=" + timeout +
5564
", index='" + index + '\'' +

x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.elasticsearch.common.settings.Settings;
1919
import org.elasticsearch.index.engine.Engine;
2020
import org.elasticsearch.index.seqno.SeqNoStats;
21+
import org.elasticsearch.index.seqno.SequenceNumbers;
2122
import org.elasticsearch.index.shard.IndexShard;
2223
import org.elasticsearch.index.shard.ShardId;
2324
import org.elasticsearch.index.translog.Translog;
@@ -60,13 +61,15 @@ public TransportBulkShardOperationsAction(
6061
@Override
6162
protected WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> shardOperationOnPrimary(
6263
final BulkShardOperationsRequest request, final IndexShard primary) throws Exception {
63-
return shardOperationOnPrimary(request.shardId(), request.getOperations(), primary, logger);
64+
return shardOperationOnPrimary(
65+
request.shardId(), request.getOperations(), request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger);
6466
}
6567

6668
// public for testing purposes only
6769
public static WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperationsResponse> shardOperationOnPrimary(
6870
final ShardId shardId,
6971
final List<Translog.Operation> sourceOperations,
72+
final long maxSeqNoOfUpdatesOrDeletes,
7073
final IndexShard primary,
7174
final Logger logger) throws IOException {
7275
final List<Translog.Operation> targetOperations = sourceOperations.stream().map(operation -> {
@@ -103,16 +106,19 @@ public static WritePrimaryResult<BulkShardOperationsRequest, BulkShardOperations
103106
}
104107
return operationWithPrimaryTerm;
105108
}).collect(Collectors.toList());
106-
// TODO: Replace this artificial value by the actual max_seq_no_updates from the leader
107-
targetOperations.stream().mapToLong(Translog.Operation::seqNo).max().ifPresent(primary::advanceMaxSeqNoOfUpdatesOrDeletes);
109+
assert maxSeqNoOfUpdatesOrDeletes >= SequenceNumbers.NO_OPS_PERFORMED : "invalid msu [" + maxSeqNoOfUpdatesOrDeletes + "]";
110+
primary.advanceMaxSeqNoOfUpdatesOrDeletes(maxSeqNoOfUpdatesOrDeletes);
108111
final Translog.Location location = applyTranslogOperations(targetOperations, primary, Engine.Operation.Origin.PRIMARY);
109-
final BulkShardOperationsRequest replicaRequest = new BulkShardOperationsRequest(shardId, targetOperations);
112+
final BulkShardOperationsRequest replicaRequest = new BulkShardOperationsRequest(
113+
shardId, targetOperations, maxSeqNoOfUpdatesOrDeletes);
110114
return new CcrWritePrimaryResult(replicaRequest, location, primary, logger);
111115
}
112116

113117
@Override
114118
protected WriteReplicaResult<BulkShardOperationsRequest> shardOperationOnReplica(
115119
final BulkShardOperationsRequest request, final IndexShard replica) throws Exception {
120+
assert replica.getMaxSeqNoOfUpdatesOrDeletes() >= request.getMaxSeqNoOfUpdatesOrDeletes() :
121+
"mus on replica [" + replica + "] < mus of request [" + request.getMaxSeqNoOfUpdatesOrDeletes() + "]";
116122
final Translog.Location location = applyTranslogOperations(request.getOperations(), replica, Engine.Operation.Origin.REPLICA);
117123
return new WriteReplicaResult<>(request, location, null, replica, logger);
118124
}

0 commit comments

Comments
 (0)