@@ -67,6 +67,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
67
67
68
68
private static final Logger LOGGER = Loggers .getLogger (ShardFollowNodeTask .class );
69
69
70
+ private final String leaderIndex ;
70
71
private final ShardFollowTask params ;
71
72
private final TimeValue retryTimeout ;
72
73
private final TimeValue idleShardChangesRequestDelay ;
@@ -90,6 +91,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
90
91
private long numberOfSuccessfulBulkOperations = 0 ;
91
92
private long numberOfFailedBulkOperations = 0 ;
92
93
private long numberOfOperationsIndexed = 0 ;
94
+ private long lastFetchTime = -1 ;
93
95
private final Queue <Translog .Operation > buffer = new PriorityQueue <>(Comparator .comparing (Translog .Operation ::seqNo ));
94
96
private final LinkedHashMap <Long , ElasticsearchException > fetchExceptions ;
95
97
@@ -112,6 +114,12 @@ protected boolean removeEldestEntry(final Map.Entry<Long, ElasticsearchException
112
114
return size () > params .getMaxConcurrentReadBatches ();
113
115
}
114
116
};
117
+
118
+ if (params .getLeaderClusterAlias () != null ) {
119
+ leaderIndex = params .getLeaderClusterAlias () + ":" + params .getLeaderShardId ().getIndexName ();
120
+ } else {
121
+ leaderIndex = params .getLeaderShardId ().getIndexName ();
122
+ }
115
123
}
116
124
117
125
void start (
@@ -235,6 +243,9 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR
235
243
236
244
private void sendShardChangesRequest (long from , int maxOperationCount , long maxRequiredSeqNo , AtomicInteger retryCounter ) {
237
245
final long startTime = relativeTimeProvider .getAsLong ();
246
+ synchronized (this ) {
247
+ lastFetchTime = startTime ;
248
+ }
238
249
innerSendShardChangesRequest (from , maxOperationCount ,
239
250
response -> {
240
251
synchronized (ShardFollowNodeTask .this ) {
@@ -411,7 +422,15 @@ public ShardId getFollowShardId() {
411
422
412
423
@ Override
413
424
public synchronized Status getStatus () {
425
+ final long timeSinceLastFetchMillis ;
426
+ if (lastFetchTime != -1 ) {
427
+ timeSinceLastFetchMillis = TimeUnit .NANOSECONDS .toMillis (relativeTimeProvider .getAsLong () - lastFetchTime );
428
+ } else {
429
+ // To avoid confusion when ccr didn't yet execute a fetch:
430
+ timeSinceLastFetchMillis = -1 ;
431
+ }
414
432
return new Status (
433
+ leaderIndex ,
415
434
getFollowShardId ().getId (),
416
435
leaderGlobalCheckpoint ,
417
436
leaderMaxSeqNo ,
@@ -431,13 +450,15 @@ public synchronized Status getStatus() {
431
450
numberOfSuccessfulBulkOperations ,
432
451
numberOfFailedBulkOperations ,
433
452
numberOfOperationsIndexed ,
434
- new TreeMap <>(fetchExceptions ));
453
+ new TreeMap <>(fetchExceptions ),
454
+ timeSinceLastFetchMillis );
435
455
}
436
456
437
457
public static class Status implements Task .Status {
438
458
439
459
public static final String STATUS_PARSER_NAME = "shard-follow-node-task-status" ;
440
460
461
+ static final ParseField LEADER_INDEX = new ParseField ("leader_index" );
441
462
static final ParseField SHARD_ID = new ParseField ("shard_id" );
442
463
static final ParseField LEADER_GLOBAL_CHECKPOINT_FIELD = new ParseField ("leader_global_checkpoint" );
443
464
static final ParseField LEADER_MAX_SEQ_NO_FIELD = new ParseField ("leader_max_seq_no" );
@@ -458,20 +479,21 @@ public static class Status implements Task.Status {
458
479
static final ParseField NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD = new ParseField ("number_of_failed_bulk_operations" );
459
480
static final ParseField NUMBER_OF_OPERATIONS_INDEXED_FIELD = new ParseField ("number_of_operations_indexed" );
460
481
static final ParseField FETCH_EXCEPTIONS = new ParseField ("fetch_exceptions" );
482
+ static final ParseField TIME_SINCE_LAST_FETCH_MILLIS_FIELD = new ParseField ("time_since_last_fetch_millis" );
461
483
462
484
@ SuppressWarnings ("unchecked" )
463
485
static final ConstructingObjectParser <Status , Void > STATUS_PARSER = new ConstructingObjectParser <>(STATUS_PARSER_NAME ,
464
486
args -> new Status (
465
- (int ) args [0 ],
466
- (long ) args [1 ],
487
+ (String ) args [0 ],
488
+ (int ) args [1 ],
467
489
(long ) args [2 ],
468
490
(long ) args [3 ],
469
491
(long ) args [4 ],
470
492
(long ) args [5 ],
471
- (int ) args [6 ],
493
+ (long ) args [6 ],
472
494
(int ) args [7 ],
473
495
(int ) args [8 ],
474
- (long ) args [9 ],
496
+ (int ) args [9 ],
475
497
(long ) args [10 ],
476
498
(long ) args [11 ],
477
499
(long ) args [12 ],
@@ -481,10 +503,12 @@ public static class Status implements Task.Status {
481
503
(long ) args [16 ],
482
504
(long ) args [17 ],
483
505
(long ) args [18 ],
506
+ (long ) args [19 ],
484
507
new TreeMap <>(
485
- ((List <Map .Entry <Long , ElasticsearchException >>) args [19 ])
508
+ ((List <Map .Entry <Long , ElasticsearchException >>) args [20 ])
486
509
.stream ()
487
- .collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue )))));
510
+ .collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue ))),
511
+ (long ) args [21 ]));
488
512
489
513
public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry" ;
490
514
@@ -494,6 +518,7 @@ public static class Status implements Task.Status {
494
518
args -> new AbstractMap .SimpleEntry <>((long ) args [0 ], (ElasticsearchException ) args [1 ]));
495
519
496
520
static {
521
+ STATUS_PARSER .declareString (ConstructingObjectParser .constructorArg (), LEADER_INDEX );
497
522
STATUS_PARSER .declareInt (ConstructingObjectParser .constructorArg (), SHARD_ID );
498
523
STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), LEADER_GLOBAL_CHECKPOINT_FIELD );
499
524
STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), LEADER_MAX_SEQ_NO_FIELD );
@@ -514,6 +539,7 @@ public static class Status implements Task.Status {
514
539
STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD );
515
540
STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), NUMBER_OF_OPERATIONS_INDEXED_FIELD );
516
541
STATUS_PARSER .declareObjectArray (ConstructingObjectParser .constructorArg (), FETCH_EXCEPTIONS_ENTRY_PARSER , FETCH_EXCEPTIONS );
542
+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), TIME_SINCE_LAST_FETCH_MILLIS_FIELD );
517
543
}
518
544
519
545
static final ParseField FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO = new ParseField ("from_seq_no" );
@@ -527,6 +553,12 @@ public static class Status implements Task.Status {
527
553
FETCH_EXCEPTIONS_ENTRY_EXCEPTION );
528
554
}
529
555
556
+ private final String leaderIndex ;
557
+
558
+ public String leaderIndex () {
559
+ return leaderIndex ;
560
+ }
561
+
530
562
private final int shardId ;
531
563
532
564
public int getShardId () {
@@ -647,7 +679,14 @@ public NavigableMap<Long, ElasticsearchException> fetchExceptions() {
647
679
return fetchExceptions ;
648
680
}
649
681
682
+ private final long timeSinceLastFetchMillis ;
683
+
684
+ public long timeSinceLastFetchMillis () {
685
+ return timeSinceLastFetchMillis ;
686
+ }
687
+
650
688
Status (
689
+ final String leaderIndex ,
651
690
final int shardId ,
652
691
final long leaderGlobalCheckpoint ,
653
692
final long leaderMaxSeqNo ,
@@ -667,7 +706,9 @@ public NavigableMap<Long, ElasticsearchException> fetchExceptions() {
667
706
final long numberOfSuccessfulBulkOperations ,
668
707
final long numberOfFailedBulkOperations ,
669
708
final long numberOfOperationsIndexed ,
670
- final NavigableMap <Long , ElasticsearchException > fetchExceptions ) {
709
+ final NavigableMap <Long , ElasticsearchException > fetchExceptions ,
710
+ final long timeSinceLastFetchMillis ) {
711
+ this .leaderIndex = leaderIndex ;
671
712
this .shardId = shardId ;
672
713
this .leaderGlobalCheckpoint = leaderGlobalCheckpoint ;
673
714
this .leaderMaxSeqNo = leaderMaxSeqNo ;
@@ -688,9 +729,11 @@ public NavigableMap<Long, ElasticsearchException> fetchExceptions() {
688
729
this .numberOfFailedBulkOperations = numberOfFailedBulkOperations ;
689
730
this .numberOfOperationsIndexed = numberOfOperationsIndexed ;
690
731
this .fetchExceptions = Objects .requireNonNull (fetchExceptions );
732
+ this .timeSinceLastFetchMillis = timeSinceLastFetchMillis ;
691
733
}
692
734
693
735
public Status (final StreamInput in ) throws IOException {
736
+ this .leaderIndex = in .readString ();
694
737
this .shardId = in .readVInt ();
695
738
this .leaderGlobalCheckpoint = in .readZLong ();
696
739
this .leaderMaxSeqNo = in .readZLong ();
@@ -711,6 +754,7 @@ public Status(final StreamInput in) throws IOException {
711
754
this .numberOfFailedBulkOperations = in .readVLong ();
712
755
this .numberOfOperationsIndexed = in .readVLong ();
713
756
this .fetchExceptions = new TreeMap <>(in .readMap (StreamInput ::readVLong , StreamInput ::readException ));
757
+ this .timeSinceLastFetchMillis = in .readZLong ();
714
758
}
715
759
716
760
@ Override
@@ -720,6 +764,7 @@ public String getWriteableName() {
720
764
721
765
@ Override
722
766
public void writeTo (final StreamOutput out ) throws IOException {
767
+ out .writeString (leaderIndex );
723
768
out .writeVInt (shardId );
724
769
out .writeZLong (leaderGlobalCheckpoint );
725
770
out .writeZLong (leaderMaxSeqNo );
@@ -740,12 +785,14 @@ public void writeTo(final StreamOutput out) throws IOException {
740
785
out .writeVLong (numberOfFailedBulkOperations );
741
786
out .writeVLong (numberOfOperationsIndexed );
742
787
out .writeMap (fetchExceptions , StreamOutput ::writeVLong , StreamOutput ::writeException );
788
+ out .writeZLong (timeSinceLastFetchMillis );
743
789
}
744
790
745
791
@ Override
746
792
public XContentBuilder toXContent (final XContentBuilder builder , final Params params ) throws IOException {
747
793
builder .startObject ();
748
794
{
795
+ builder .field (LEADER_INDEX .getPreferredName (), leaderIndex );
749
796
builder .field (SHARD_ID .getPreferredName (), shardId );
750
797
builder .field (LEADER_GLOBAL_CHECKPOINT_FIELD .getPreferredName (), leaderGlobalCheckpoint );
751
798
builder .field (LEADER_MAX_SEQ_NO_FIELD .getPreferredName (), leaderMaxSeqNo );
@@ -791,6 +838,10 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
791
838
}
792
839
}
793
840
builder .endArray ();
841
+ builder .humanReadableField (
842
+ TIME_SINCE_LAST_FETCH_MILLIS_FIELD .getPreferredName (),
843
+ "time_since_last_fetch" ,
844
+ new TimeValue (timeSinceLastFetchMillis , TimeUnit .MILLISECONDS ));
794
845
}
795
846
builder .endObject ();
796
847
return builder ;
@@ -805,7 +856,8 @@ public boolean equals(final Object o) {
805
856
if (this == o ) return true ;
806
857
if (o == null || getClass () != o .getClass ()) return false ;
807
858
final Status that = (Status ) o ;
808
- return shardId == that .shardId &&
859
+ return leaderIndex .equals (that .leaderIndex ) &&
860
+ shardId == that .shardId &&
809
861
leaderGlobalCheckpoint == that .leaderGlobalCheckpoint &&
810
862
leaderMaxSeqNo == that .leaderMaxSeqNo &&
811
863
followerGlobalCheckpoint == that .followerGlobalCheckpoint &&
@@ -829,12 +881,14 @@ public boolean equals(final Object o) {
829
881
* keys.
830
882
*/
831
883
fetchExceptions .keySet ().equals (that .fetchExceptions .keySet ()) &&
832
- getFetchExceptionMessages (this ).equals (getFetchExceptionMessages (that ));
884
+ getFetchExceptionMessages (this ).equals (getFetchExceptionMessages (that )) &&
885
+ timeSinceLastFetchMillis == that .timeSinceLastFetchMillis ;
833
886
}
834
887
835
888
@ Override
836
889
public int hashCode () {
837
890
return Objects .hash (
891
+ leaderIndex ,
838
892
shardId ,
839
893
leaderGlobalCheckpoint ,
840
894
leaderMaxSeqNo ,
@@ -858,7 +912,8 @@ public int hashCode() {
858
912
* messages. Note that we are relying on the fact that the fetch exceptions are ordered by keys.
859
913
*/
860
914
fetchExceptions .keySet (),
861
- getFetchExceptionMessages (this ));
915
+ getFetchExceptionMessages (this ),
916
+ timeSinceLastFetchMillis );
862
917
}
863
918
864
919
private static List <String > getFetchExceptionMessages (final Status status ) {
0 commit comments