@@ -57,7 +57,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
57
57
58
58
private long leaderGlobalCheckpoint ;
59
59
private long leaderMaxSeqNo ;
60
- private volatile long leaderMaxSeqNoOfUpdatesOrDeletes = SequenceNumbers .UNASSIGNED_SEQ_NO ;
60
+ private long leaderMaxSeqNoOfUpdatesOrDeletes = SequenceNumbers .UNASSIGNED_SEQ_NO ;
61
61
private long lastRequestedSeqNo ;
62
62
private long followerGlobalCheckpoint = 0 ;
63
63
private long followerMaxSeqNo = 0 ;
@@ -203,7 +203,7 @@ private synchronized void coordinateWrites() {
203
203
numConcurrentWrites ++;
204
204
LOGGER .trace ("{}[{}] write [{}/{}] [{}]" , params .getFollowShardId (), numConcurrentWrites , ops .get (0 ).seqNo (),
205
205
ops .get (ops .size () - 1 ).seqNo (), ops .size ());
206
- sendBulkShardOperationsRequest (ops );
206
+ sendBulkShardOperationsRequest (ops , leaderMaxSeqNoOfUpdatesOrDeletes , new AtomicInteger ( 0 ) );
207
207
}
208
208
}
209
209
@@ -294,11 +294,8 @@ synchronized void innerHandleReadResponse(long from, long maxRequiredSeqNo, Shar
294
294
}
295
295
}
296
296
297
- private void sendBulkShardOperationsRequest (List <Translog .Operation > operations ) {
298
- sendBulkShardOperationsRequest (operations , new AtomicInteger (0 ));
299
- }
300
-
301
- private void sendBulkShardOperationsRequest (List <Translog .Operation > operations , AtomicInteger retryCounter ) {
297
+ private void sendBulkShardOperationsRequest (List <Translog .Operation > operations , long leaderMaxSeqNoOfUpdatesOrDeletes ,
298
+ AtomicInteger retryCounter ) {
302
299
assert leaderMaxSeqNoOfUpdatesOrDeletes != SequenceNumbers .UNASSIGNED_SEQ_NO : "mus is not replicated" ;
303
300
final long startTime = relativeTimeProvider .getAsLong ();
304
301
innerSendBulkShardOperationsRequest (operations , leaderMaxSeqNoOfUpdatesOrDeletes ,
@@ -315,7 +312,8 @@ private void sendBulkShardOperationsRequest(List<Translog.Operation> operations,
315
312
totalIndexTimeMillis += TimeUnit .NANOSECONDS .toMillis (relativeTimeProvider .getAsLong () - startTime );
316
313
numberOfFailedBulkOperations ++;
317
314
}
318
- handleFailure (e , retryCounter , () -> sendBulkShardOperationsRequest (operations , retryCounter ));
315
+ handleFailure (e , retryCounter ,
316
+ () -> sendBulkShardOperationsRequest (operations , leaderMaxSeqNoOfUpdatesOrDeletes , retryCounter ));
319
317
}
320
318
);
321
319
}
0 commit comments