@@ -67,7 +67,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
67
67
private volatile int numConcurrentWrites = 0 ;
68
68
private volatile long followerGlobalCheckpoint = 0 ;
69
69
private volatile long currentIndexMetadataVersion = 0 ;
70
- private final Queue <Translog .Operation > buffer = new PriorityQueue <>(Comparator .comparing (Translog .Operation ::seqNo ). reversed () );
70
+ private final Queue <Translog .Operation > buffer = new PriorityQueue <>(Comparator .comparing (Translog .Operation ::seqNo ));
71
71
72
72
ShardFollowNodeTask (long id , String type , String action , String description , TaskId parentTask , Map <String , String > headers ,
73
73
ShardFollowTask params , BiConsumer <TimeValue , Runnable > scheduler ) {
@@ -78,10 +78,10 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
78
78
this .idleShardChangesRequestDelay = params .getIdleShardRetryDelay ();
79
79
}
80
80
81
- void start (long followerGlobalCheckpoint ) {
81
+ void start (long leaderGlobalCheckpoint , long followerGlobalCheckpoint ) {
82
82
this .lastRequestedSeqno = followerGlobalCheckpoint ;
83
83
this .followerGlobalCheckpoint = followerGlobalCheckpoint ;
84
- this .leaderGlobalCheckpoint = followerGlobalCheckpoint ;
84
+ this .leaderGlobalCheckpoint = leaderGlobalCheckpoint ;
85
85
86
86
// Forcefully updates follower mapping, this gets us the leader imd version and
87
87
// makes sure that leader and follower mapping are identical.
@@ -93,7 +93,7 @@ void start(long followerGlobalCheckpoint) {
93
93
});
94
94
}
95
95
96
- private synchronized void coordinateReads () {
96
+ synchronized void coordinateReads () {
97
97
if (isStopped ()) {
98
98
LOGGER .info ("{} shard follow task has been stopped" , params .getFollowShardId ());
99
99
return ;
@@ -105,7 +105,8 @@ private synchronized void coordinateReads() {
105
105
while (hasReadBudget () && lastRequestedSeqno < leaderGlobalCheckpoint ) {
106
106
numConcurrentReads ++;
107
107
long from = lastRequestedSeqno + 1 ;
108
- long maxRequiredSeqno = Math .min (leaderGlobalCheckpoint , from + maxBatchOperationCount );
108
+ // -1 is needed, because maxRequiredSeqno is inclusive
109
+ long maxRequiredSeqno = Math .min (leaderGlobalCheckpoint , (from + maxBatchOperationCount ) - 1 );
109
110
LOGGER .trace ("{}[{}] read [{}/{}]" , params .getFollowShardId (), numConcurrentReads , maxRequiredSeqno , maxBatchOperationCount );
110
111
sendShardChangesRequest (from , maxBatchOperationCount , maxRequiredSeqno );
111
112
lastRequestedSeqno = maxRequiredSeqno ;
@@ -137,6 +138,11 @@ private boolean hasReadBudget() {
137
138
}
138
139
139
140
private synchronized void coordinateWrites () {
141
+ if (isStopped ()) {
142
+ LOGGER .info ("{} shard follow task has been stopped" , params .getFollowShardId ());
143
+ return ;
144
+ }
145
+
140
146
while (hasWriteBudget () && buffer .isEmpty () == false ) {
141
147
long sumEstimatedSize = 0L ;
142
148
int length = Math .min (params .getMaxBatchOperationCount (), buffer .size ());
@@ -176,48 +182,48 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR
176
182
e -> handleFailure (e , retryCounter , () -> sendShardChangesRequest (from , maxOperationCount , maxRequiredSeqNo , retryCounter )));
177
183
}
178
184
179
- private void handleReadResponse (long from , long maxRequiredSeqNo , ShardChangesAction .Response response ) {
180
- maybeUpdateMapping (response .getIndexMetadataVersion (), () -> {
181
- synchronized (ShardFollowNodeTask .this ) {
182
- leaderGlobalCheckpoint = Math .max (leaderGlobalCheckpoint , response .getGlobalCheckpoint ());
183
- final long newMinRequiredSeqNo ;
184
- if (response .getOperations ().length == 0 ) {
185
- newMinRequiredSeqNo = from ;
186
- } else {
187
- assert response .getOperations ()[0 ].seqNo () == from :
188
- "first operation is not what we asked for. From is [" + from + "], got " + response .getOperations ()[0 ];
189
- buffer .addAll (Arrays .asList (response .getOperations ()));
190
- final long maxSeqNo = response .getOperations ()[response .getOperations ().length - 1 ].seqNo ();
191
- assert maxSeqNo ==
192
- Arrays .stream (response .getOperations ()).mapToLong (Translog .Operation ::seqNo ).max ().getAsLong ();
193
- newMinRequiredSeqNo = maxSeqNo + 1 ;
194
- // update last requested seq no as we may have gotten more than we asked for and we don't want to ask it again.
195
- lastRequestedSeqno = Math .max (lastRequestedSeqno , maxSeqNo );
196
- assert lastRequestedSeqno <= leaderGlobalCheckpoint : "lastRequestedSeqno [" + lastRequestedSeqno +
197
- "] is larger than the global checkpoint [" + leaderGlobalCheckpoint + "]" ;
198
- coordinateWrites ();
199
- }
185
+ void handleReadResponse (long from , long maxRequiredSeqNo , ShardChangesAction .Response response ) {
186
+ maybeUpdateMapping (response .getIndexMetadataVersion (), () -> innerHandleReadResponse (from , maxRequiredSeqNo , response ));
187
+ }
200
188
201
- if (newMinRequiredSeqNo < maxRequiredSeqNo ) {
202
- int newSize = (int ) (maxRequiredSeqNo - newMinRequiredSeqNo ) + 1 ;
203
- LOGGER .trace ("{} received [{}] ops, still missing [{}/{}], continuing to read..." ,
204
- params .getFollowShardId (), response .getOperations ().length , newMinRequiredSeqNo , maxRequiredSeqNo );
205
- sendShardChangesRequest (newMinRequiredSeqNo , newSize , maxRequiredSeqNo );
206
- } else {
207
- // read is completed, decrement
208
- numConcurrentReads --;
209
- if (response .getOperations ().length == 0 && leaderGlobalCheckpoint == lastRequestedSeqno ) {
210
- // we got nothing and we have no reason to believe asking again well get us more, treat shard as idle and delay
211
- // future requests
212
- LOGGER .trace ("{} received no ops and no known ops to fetch, scheduling to coordinate reads" ,
213
- params .getFollowShardId ());
214
- scheduler .accept (idleShardChangesRequestDelay , this ::coordinateReads );
215
- } else {
216
- coordinateReads ();
217
- }
218
- }
189
+ synchronized void innerHandleReadResponse (long from , long maxRequiredSeqNo , ShardChangesAction .Response response ) {
190
+ leaderGlobalCheckpoint = Math .max (leaderGlobalCheckpoint , response .getGlobalCheckpoint ());
191
+ final long newMinRequiredSeqNo ;
192
+ if (response .getOperations ().length == 0 ) {
193
+ newMinRequiredSeqNo = from ;
194
+ } else {
195
+ assert response .getOperations ()[0 ].seqNo () == from :
196
+ "first operation is not what we asked for. From is [" + from + "], got " + response .getOperations ()[0 ];
197
+ buffer .addAll (Arrays .asList (response .getOperations ()));
198
+ final long maxSeqNo = response .getOperations ()[response .getOperations ().length - 1 ].seqNo ();
199
+ assert maxSeqNo ==
200
+ Arrays .stream (response .getOperations ()).mapToLong (Translog .Operation ::seqNo ).max ().getAsLong ();
201
+ newMinRequiredSeqNo = maxSeqNo + 1 ;
202
+ // update last requested seq no as we may have gotten more than we asked for and we don't want to ask it again.
203
+ lastRequestedSeqno = Math .max (lastRequestedSeqno , maxSeqNo );
204
+ assert lastRequestedSeqno <= leaderGlobalCheckpoint : "lastRequestedSeqno [" + lastRequestedSeqno +
205
+ "] is larger than the global checkpoint [" + leaderGlobalCheckpoint + "]" ;
206
+ coordinateWrites ();
207
+ }
208
+
209
+ if (newMinRequiredSeqNo < maxRequiredSeqNo && isStopped () == false ) {
210
+ int newSize = (int ) (maxRequiredSeqNo - newMinRequiredSeqNo ) + 1 ;
211
+ LOGGER .trace ("{} received [{}] ops, still missing [{}/{}], continuing to read..." ,
212
+ params .getFollowShardId (), response .getOperations ().length , newMinRequiredSeqNo , maxRequiredSeqNo );
213
+ sendShardChangesRequest (newMinRequiredSeqNo , newSize , maxRequiredSeqNo );
214
+ } else {
215
+ // read is completed, decrement
216
+ numConcurrentReads --;
217
+ if (response .getOperations ().length == 0 && leaderGlobalCheckpoint == lastRequestedSeqno ) {
218
+ // we got nothing and we have no reason to believe asking again well get us more, treat shard as idle and delay
219
+ // future requests
220
+ LOGGER .trace ("{} received no ops and no known ops to fetch, scheduling to coordinate reads" ,
221
+ params .getFollowShardId ());
222
+ scheduler .accept (idleShardChangesRequestDelay , this ::coordinateReads );
223
+ } else {
224
+ coordinateReads ();
219
225
}
220
- });
226
+ }
221
227
}
222
228
223
229
private void sendBulkShardOperationsRequest (List <Translog .Operation > operations ) {
@@ -306,7 +312,8 @@ protected boolean isStopped() {
306
312
307
313
@ Override
308
314
public Status getStatus () {
309
- return new Status (leaderGlobalCheckpoint , lastRequestedSeqno , followerGlobalCheckpoint , numConcurrentReads , numConcurrentWrites );
315
+ return new Status (leaderGlobalCheckpoint , lastRequestedSeqno , followerGlobalCheckpoint , numConcurrentReads , numConcurrentWrites ,
316
+ currentIndexMetadataVersion );
310
317
}
311
318
312
319
public static class Status implements Task .Status {
@@ -318,31 +325,35 @@ public static class Status implements Task.Status {
318
325
static final ParseField LAST_REQUESTED_SEQNO_FIELD = new ParseField ("last_requested_seqno" );
319
326
static final ParseField NUMBER_OF_CONCURRENT_READS_FIELD = new ParseField ("number_of_concurrent_reads" );
320
327
static final ParseField NUMBER_OF_CONCURRENT_WRITES_FIELD = new ParseField ("number_of_concurrent_writes" );
328
+ static final ParseField INDEX_METADATA_VERSION_FIELD = new ParseField ("index_metadata_version" );
321
329
322
330
static final ConstructingObjectParser <Status , Void > PARSER = new ConstructingObjectParser <>(NAME ,
323
- args -> new Status ((long ) args [0 ], (long ) args [1 ], (long ) args [2 ], (int ) args [3 ], (int ) args [4 ]));
331
+ args -> new Status ((long ) args [0 ], (long ) args [1 ], (long ) args [2 ], (int ) args [3 ], (int ) args [4 ], ( long ) args [ 5 ] ));
324
332
325
333
static {
326
334
PARSER .declareLong (ConstructingObjectParser .constructorArg (), LEADER_GLOBAL_CHECKPOINT_FIELD );
327
335
PARSER .declareLong (ConstructingObjectParser .constructorArg (), LAST_REQUESTED_SEQNO_FIELD );
328
336
PARSER .declareLong (ConstructingObjectParser .constructorArg (), FOLLOWER_GLOBAL_CHECKPOINT_FIELD );
329
337
PARSER .declareInt (ConstructingObjectParser .constructorArg (), NUMBER_OF_CONCURRENT_READS_FIELD );
330
338
PARSER .declareInt (ConstructingObjectParser .constructorArg (), NUMBER_OF_CONCURRENT_WRITES_FIELD );
339
+ PARSER .declareLong (ConstructingObjectParser .constructorArg (), INDEX_METADATA_VERSION_FIELD );
331
340
}
332
341
333
342
private final long leaderGlobalCheckpoint ;
334
343
private final long lastRequestedSeqno ;
335
344
private final long followerGlobalCheckpoint ;
336
345
private final int numberOfConcurrentReads ;
337
346
private final int numberOfConcurrentWrites ;
347
+ private final long indexMetadataVersion ;
338
348
339
349
Status (long leaderGlobalCheckpoint , long lastRequestedSeqno , long followerGlobalCheckpoint ,
340
- int numberOfConcurrentReads , int numberOfConcurrentWrites ) {
350
+ int numberOfConcurrentReads , int numberOfConcurrentWrites , long indexMetadataVersion ) {
341
351
this .leaderGlobalCheckpoint = leaderGlobalCheckpoint ;
342
352
this .lastRequestedSeqno = lastRequestedSeqno ;
343
353
this .followerGlobalCheckpoint = followerGlobalCheckpoint ;
344
354
this .numberOfConcurrentReads = numberOfConcurrentReads ;
345
355
this .numberOfConcurrentWrites = numberOfConcurrentWrites ;
356
+ this .indexMetadataVersion = indexMetadataVersion ;
346
357
}
347
358
348
359
public Status (StreamInput in ) throws IOException {
@@ -351,6 +362,7 @@ public Status(StreamInput in) throws IOException {
351
362
this .followerGlobalCheckpoint = in .readZLong ();
352
363
this .numberOfConcurrentReads = in .readVInt ();
353
364
this .numberOfConcurrentWrites = in .readVInt ();
365
+ this .indexMetadataVersion = in .readVLong ();
354
366
}
355
367
356
368
public long getLeaderGlobalCheckpoint () {
@@ -373,6 +385,10 @@ public int getNumberOfConcurrentWrites() {
373
385
return numberOfConcurrentWrites ;
374
386
}
375
387
388
+ public long getIndexMetadataVersion () {
389
+ return indexMetadataVersion ;
390
+ }
391
+
376
392
@ Override
377
393
public String getWriteableName () {
378
394
return NAME ;
@@ -385,6 +401,7 @@ public void writeTo(StreamOutput out) throws IOException {
385
401
out .writeZLong (followerGlobalCheckpoint );
386
402
out .writeVInt (numberOfConcurrentReads );
387
403
out .writeVInt (numberOfConcurrentWrites );
404
+ out .writeVLong (indexMetadataVersion );
388
405
}
389
406
390
407
@ Override
@@ -396,6 +413,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
396
413
builder .field (LAST_REQUESTED_SEQNO_FIELD .getPreferredName (), lastRequestedSeqno );
397
414
builder .field (NUMBER_OF_CONCURRENT_READS_FIELD .getPreferredName (), numberOfConcurrentReads );
398
415
builder .field (NUMBER_OF_CONCURRENT_WRITES_FIELD .getPreferredName (), numberOfConcurrentWrites );
416
+ builder .field (INDEX_METADATA_VERSION_FIELD .getPreferredName (), indexMetadataVersion );
399
417
}
400
418
builder .endObject ();
401
419
return builder ;
@@ -414,13 +432,14 @@ public boolean equals(Object o) {
414
432
lastRequestedSeqno == status .lastRequestedSeqno &&
415
433
followerGlobalCheckpoint == status .followerGlobalCheckpoint &&
416
434
numberOfConcurrentReads == status .numberOfConcurrentReads &&
417
- numberOfConcurrentWrites == status .numberOfConcurrentWrites ;
435
+ numberOfConcurrentWrites == status .numberOfConcurrentWrites &&
436
+ indexMetadataVersion == status .indexMetadataVersion ;
418
437
}
419
438
420
439
@ Override
421
440
public int hashCode () {
422
441
return Objects .hash (leaderGlobalCheckpoint , lastRequestedSeqno , followerGlobalCheckpoint , numberOfConcurrentReads ,
423
- numberOfConcurrentWrites );
442
+ numberOfConcurrentWrites , indexMetadataVersion );
424
443
}
425
444
426
445
public String toString () {
0 commit comments