16
16
import org .elasticsearch .common .io .stream .StreamOutput ;
17
17
import org .elasticsearch .common .logging .Loggers ;
18
18
import org .elasticsearch .common .transport .NetworkExceptionHelper ;
19
+ import org .elasticsearch .common .unit .ByteSizeUnit ;
20
+ import org .elasticsearch .common .unit .ByteSizeValue ;
19
21
import org .elasticsearch .common .unit .TimeValue ;
20
22
import org .elasticsearch .common .xcontent .ConstructingObjectParser ;
21
23
import org .elasticsearch .common .xcontent .XContentBuilder ;
36
38
import java .util .Objects ;
37
39
import java .util .PriorityQueue ;
38
40
import java .util .Queue ;
41
+ import java .util .concurrent .TimeUnit ;
39
42
import java .util .concurrent .atomic .AtomicInteger ;
40
43
import java .util .function .BiConsumer ;
41
44
import java .util .function .Consumer ;
@@ -380,6 +383,7 @@ public ShardId getFollowShardId() {
380
383
@ Override
381
384
public synchronized Status getStatus () {
382
385
return new Status (
386
+ getFollowShardId ().getId (),
383
387
leaderGlobalCheckpoint ,
384
388
leaderMaxSeqNo ,
385
389
followerGlobalCheckpoint ,
@@ -404,6 +408,7 @@ public static class Status implements Task.Status {
404
408
405
409
public static final String NAME = "shard-follow-node-task-status" ;
406
410
411
+ static final ParseField SHARD_ID = new ParseField ("shard_id" );
407
412
static final ParseField LEADER_GLOBAL_CHECKPOINT_FIELD = new ParseField ("leader_global_checkpoint" );
408
413
static final ParseField LEADER_MAX_SEQ_NO_FIELD = new ParseField ("leader_max_seq_no" );
409
414
static final ParseField FOLLOWER_GLOBAL_CHECKPOINT_FIELD = new ParseField ("follower_global_checkpoint" );
@@ -425,15 +430,15 @@ public static class Status implements Task.Status {
425
430
426
431
static final ConstructingObjectParser <Status , Void > PARSER = new ConstructingObjectParser <>(NAME ,
427
432
args -> new Status (
428
- (long ) args [0 ],
433
+ (int ) args [0 ],
429
434
(long ) args [1 ],
430
435
(long ) args [2 ],
431
436
(long ) args [3 ],
432
437
(long ) args [4 ],
433
- (int ) args [5 ],
438
+ (long ) args [5 ],
434
439
(int ) args [6 ],
435
440
(int ) args [7 ],
436
- (long ) args [8 ],
441
+ (int ) args [8 ],
437
442
(long ) args [9 ],
438
443
(long ) args [10 ],
439
444
(long ) args [11 ],
@@ -442,9 +447,11 @@ public static class Status implements Task.Status {
442
447
(long ) args [14 ],
443
448
(long ) args [15 ],
444
449
(long ) args [16 ],
445
- (long ) args [17 ]));
450
+ (long ) args [17 ],
451
+ (long ) args [18 ]));
446
452
447
453
static {
454
+ PARSER .declareInt (ConstructingObjectParser .constructorArg (), SHARD_ID );
448
455
PARSER .declareLong (ConstructingObjectParser .constructorArg (), LEADER_GLOBAL_CHECKPOINT_FIELD );
449
456
PARSER .declareLong (ConstructingObjectParser .constructorArg (), LEADER_MAX_SEQ_NO_FIELD );
450
457
PARSER .declareLong (ConstructingObjectParser .constructorArg (), FOLLOWER_GLOBAL_CHECKPOINT_FIELD );
@@ -465,6 +472,12 @@ public static class Status implements Task.Status {
465
472
PARSER .declareLong (ConstructingObjectParser .constructorArg (), NUMBER_OF_OPERATIONS_INDEXED_FIELD );
466
473
}
467
474
475
+ private final int shardId ;
476
+
477
+ public int getShardId () {
478
+ return shardId ;
479
+ }
480
+
468
481
private final long leaderGlobalCheckpoint ;
469
482
470
483
public long leaderGlobalCheckpoint () {
@@ -574,6 +587,7 @@ public long numberOfOperationsIndexed() {
574
587
}
575
588
576
589
Status (
590
+ final int shardId ,
577
591
final long leaderGlobalCheckpoint ,
578
592
final long leaderMaxSeqNo ,
579
593
final long followerGlobalCheckpoint ,
@@ -592,6 +606,7 @@ public long numberOfOperationsIndexed() {
592
606
final long numberOfSuccessfulBulkOperations ,
593
607
final long numberOfFailedBulkOperations ,
594
608
final long numberOfOperationsIndexed ) {
609
+ this .shardId = shardId ;
595
610
this .leaderGlobalCheckpoint = leaderGlobalCheckpoint ;
596
611
this .leaderMaxSeqNo = leaderMaxSeqNo ;
597
612
this .followerGlobalCheckpoint = followerGlobalCheckpoint ;
@@ -613,6 +628,7 @@ public long numberOfOperationsIndexed() {
613
628
}
614
629
615
630
public Status (final StreamInput in ) throws IOException {
631
+ this .shardId = in .readVInt ();
616
632
this .leaderGlobalCheckpoint = in .readZLong ();
617
633
this .leaderMaxSeqNo = in .readZLong ();
618
634
this .followerGlobalCheckpoint = in .readZLong ();
@@ -640,6 +656,7 @@ public String getWriteableName() {
640
656
641
657
@ Override
642
658
public void writeTo (final StreamOutput out ) throws IOException {
659
+ out .writeVInt (shardId );
643
660
out .writeZLong (leaderGlobalCheckpoint );
644
661
out .writeZLong (leaderMaxSeqNo );
645
662
out .writeZLong (followerGlobalCheckpoint );
@@ -664,6 +681,7 @@ public void writeTo(final StreamOutput out) throws IOException {
664
681
public XContentBuilder toXContent (final XContentBuilder builder , final Params params ) throws IOException {
665
682
builder .startObject ();
666
683
{
684
+ builder .field (SHARD_ID .getPreferredName (), shardId );
667
685
builder .field (LEADER_GLOBAL_CHECKPOINT_FIELD .getPreferredName (), leaderGlobalCheckpoint );
668
686
builder .field (LEADER_MAX_SEQ_NO_FIELD .getPreferredName (), leaderMaxSeqNo );
669
687
builder .field (FOLLOWER_GLOBAL_CHECKPOINT_FIELD .getPreferredName (), followerGlobalCheckpoint );
@@ -673,12 +691,21 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
673
691
builder .field (NUMBER_OF_CONCURRENT_WRITES_FIELD .getPreferredName (), numberOfConcurrentWrites );
674
692
builder .field (NUMBER_OF_QUEUED_WRITES_FIELD .getPreferredName (), numberOfQueuedWrites );
675
693
builder .field (INDEX_METADATA_VERSION_FIELD .getPreferredName (), indexMetadataVersion );
676
- builder .field (TOTAL_FETCH_TIME_NANOS_FIELD .getPreferredName (), totalFetchTimeNanos );
694
+ builder .humanReadableField (
695
+ TOTAL_FETCH_TIME_NANOS_FIELD .getPreferredName (),
696
+ "total_fetch_time" ,
697
+ new TimeValue (totalFetchTimeNanos , TimeUnit .NANOSECONDS ));
677
698
builder .field (NUMBER_OF_SUCCESSFUL_FETCHES_FIELD .getPreferredName (), numberOfSuccessfulFetches );
678
699
builder .field (NUMBER_OF_FAILED_FETCHES_FIELD .getPreferredName (), numberOfFailedFetches );
679
700
builder .field (OPERATIONS_RECEIVED_FIELD .getPreferredName (), operationsReceived );
680
- builder .field (TOTAL_TRANSFERRED_BYTES .getPreferredName (), totalTransferredBytes );
681
- builder .field (TOTAL_INDEX_TIME_NANOS_FIELD .getPreferredName (), totalIndexTimeNanos );
701
+ builder .humanReadableField (
702
+ TOTAL_TRANSFERRED_BYTES .getPreferredName (),
703
+ "total_transferred" ,
704
+ new ByteSizeValue (totalTransferredBytes , ByteSizeUnit .BYTES ));
705
+ builder .humanReadableField (
706
+ TOTAL_INDEX_TIME_NANOS_FIELD .getPreferredName (),
707
+ "total_index_time" ,
708
+ new TimeValue (totalIndexTimeNanos , TimeUnit .NANOSECONDS ));
682
709
builder .field (NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD .getPreferredName (), numberOfSuccessfulBulkOperations );
683
710
builder .field (NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD .getPreferredName (), numberOfFailedBulkOperations );
684
711
builder .field (NUMBER_OF_OPERATIONS_INDEXED_FIELD .getPreferredName (), numberOfOperationsIndexed );
@@ -696,7 +723,8 @@ public boolean equals(final Object o) {
696
723
if (this == o ) return true ;
697
724
if (o == null || getClass () != o .getClass ()) return false ;
698
725
final Status that = (Status ) o ;
699
- return leaderGlobalCheckpoint == that .leaderGlobalCheckpoint &&
726
+ return shardId == that .shardId &&
727
+ leaderGlobalCheckpoint == that .leaderGlobalCheckpoint &&
700
728
leaderMaxSeqNo == that .leaderMaxSeqNo &&
701
729
followerGlobalCheckpoint == that .followerGlobalCheckpoint &&
702
730
followerMaxSeqNo == that .followerMaxSeqNo &&
@@ -717,6 +745,7 @@ public boolean equals(final Object o) {
717
745
@ Override
718
746
public int hashCode () {
719
747
return Objects .hash (
748
+ shardId ,
720
749
leaderGlobalCheckpoint ,
721
750
leaderMaxSeqNo ,
722
751
followerGlobalCheckpoint ,
0 commit comments