30
30
import org .elasticsearch .xpack .ccr .action .bulk .BulkShardOperationsResponse ;
31
31
32
32
import java .io .IOException ;
33
+ import java .util .AbstractMap ;
33
34
import java .util .ArrayList ;
34
35
import java .util .Arrays ;
35
36
import java .util .Comparator ;
37
+ import java .util .LinkedHashMap ;
36
38
import java .util .List ;
37
39
import java .util .Map ;
40
+ import java .util .NavigableMap ;
38
41
import java .util .Objects ;
39
42
import java .util .PriorityQueue ;
40
43
import java .util .Queue ;
44
+ import java .util .TreeMap ;
41
45
import java .util .concurrent .TimeUnit ;
42
46
import java .util .concurrent .atomic .AtomicInteger ;
43
47
import java .util .function .BiConsumer ;
44
48
import java .util .function .Consumer ;
45
49
import java .util .function .LongConsumer ;
46
50
import java .util .function .LongSupplier ;
51
+ import java .util .stream .Collectors ;
47
52
48
53
/**
49
54
* The node task that fetch the write operations from a leader shard and
@@ -86,6 +91,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
86
91
private long numberOfFailedBulkOperations = 0 ;
87
92
private long numberOfOperationsIndexed = 0 ;
88
93
private final Queue <Translog .Operation > buffer = new PriorityQueue <>(Comparator .comparing (Translog .Operation ::seqNo ));
94
+ private final LinkedHashMap <Long , ElasticsearchException > fetchExceptions ;
89
95
90
96
ShardFollowNodeTask (long id , String type , String action , String description , TaskId parentTask , Map <String , String > headers ,
91
97
ShardFollowTask params , BiConsumer <TimeValue , Runnable > scheduler , final LongSupplier relativeTimeProvider ) {
@@ -95,6 +101,17 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
95
101
this .relativeTimeProvider = relativeTimeProvider ;
96
102
this .retryTimeout = params .getRetryTimeout ();
97
103
this .idleShardChangesRequestDelay = params .getIdleShardRetryDelay ();
104
+ /*
105
+ * We keep track of the most recent fetch exceptions, with the number of exceptions that we track equal to the maximum number of
106
+ * concurrent fetches. For each failed fetch, we track the from sequence number associated with the request, and we clear the entry
107
+ * when the fetch task associated with that from sequence number succeeds.
108
+ */
109
+ this .fetchExceptions = new LinkedHashMap <Long , ElasticsearchException >() {
110
+ @ Override
111
+ protected boolean removeEldestEntry (final Map .Entry <Long , ElasticsearchException > eldest ) {
112
+ return size () > params .getMaxConcurrentReadBatches ();
113
+ }
114
+ };
98
115
}
99
116
100
117
void start (
@@ -224,6 +241,7 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR
224
241
synchronized (ShardFollowNodeTask .this ) {
225
242
totalFetchTimeMillis += TimeUnit .NANOSECONDS .toMillis (relativeTimeProvider .getAsLong () - startTime );
226
243
numberOfSuccessfulFetches ++;
244
+ fetchExceptions .remove (from );
227
245
operationsReceived += response .getOperations ().length ;
228
246
totalTransferredBytes += Arrays .stream (response .getOperations ()).mapToLong (Translog .Operation ::estimateSize ).sum ();
229
247
}
@@ -233,6 +251,7 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR
233
251
synchronized (ShardFollowNodeTask .this ) {
234
252
totalFetchTimeMillis += TimeUnit .NANOSECONDS .toMillis (relativeTimeProvider .getAsLong () - startTime );
235
253
numberOfFailedFetches ++;
254
+ fetchExceptions .put (from , new ElasticsearchException (e ));
236
255
}
237
256
handleFailure (e , retryCounter , () -> sendShardChangesRequest (from , maxOperationCount , maxRequiredSeqNo , retryCounter ));
238
257
});
@@ -412,12 +431,13 @@ public synchronized Status getStatus() {
412
431
totalIndexTimeMillis ,
413
432
numberOfSuccessfulBulkOperations ,
414
433
numberOfFailedBulkOperations ,
415
- numberOfOperationsIndexed );
434
+ numberOfOperationsIndexed ,
435
+ new TreeMap <>(fetchExceptions ));
416
436
}
417
437
418
438
public static class Status implements Task .Status {
419
439
420
- public static final String NAME = "shard-follow-node-task-status" ;
440
+ public static final String STATUS_PARSER_NAME = "shard-follow-node-task-status" ;
421
441
422
442
static final ParseField SHARD_ID = new ParseField ("shard_id" );
423
443
static final ParseField LEADER_GLOBAL_CHECKPOINT_FIELD = new ParseField ("leader_global_checkpoint" );
@@ -438,8 +458,10 @@ public static class Status implements Task.Status {
438
458
static final ParseField NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD = new ParseField ("number_of_successful_bulk_operations" );
439
459
static final ParseField NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD = new ParseField ("number_of_failed_bulk_operations" );
440
460
static final ParseField NUMBER_OF_OPERATIONS_INDEXED_FIELD = new ParseField ("number_of_operations_indexed" );
461
+ static final ParseField FETCH_EXCEPTIONS = new ParseField ("fetch_exceptions" );
441
462
442
- static final ConstructingObjectParser <Status , Void > PARSER = new ConstructingObjectParser <>(NAME ,
463
+ @ SuppressWarnings ("unchecked" )
464
+ static final ConstructingObjectParser <Status , Void > STATUS_PARSER = new ConstructingObjectParser <>(STATUS_PARSER_NAME ,
443
465
args -> new Status (
444
466
(int ) args [0 ],
445
467
(long ) args [1 ],
@@ -459,28 +481,51 @@ public static class Status implements Task.Status {
459
481
(long ) args [15 ],
460
482
(long ) args [16 ],
461
483
(long ) args [17 ],
462
- (long ) args [18 ]));
484
+ (long ) args [18 ],
485
+ new TreeMap <>(
486
+ ((List <Map .Entry <Long , ElasticsearchException >>) args [19 ])
487
+ .stream ()
488
+ .collect (Collectors .toMap (Map .Entry ::getKey , Map .Entry ::getValue )))));
489
+
490
+ public static final String FETCH_EXCEPTIONS_ENTRY_PARSER_NAME = "shard-follow-node-task-status-fetch-exceptions-entry" ;
491
+
492
+ static final ConstructingObjectParser <Map .Entry <Long , ElasticsearchException >, Void > FETCH_EXCEPTIONS_ENTRY_PARSER =
493
+ new ConstructingObjectParser <>(
494
+ FETCH_EXCEPTIONS_ENTRY_PARSER_NAME ,
495
+ args -> new AbstractMap .SimpleEntry <>((long ) args [0 ], (ElasticsearchException ) args [1 ]));
496
+
497
+ static {
498
+ STATUS_PARSER .declareInt (ConstructingObjectParser .constructorArg (), SHARD_ID );
499
+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), LEADER_GLOBAL_CHECKPOINT_FIELD );
500
+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), LEADER_MAX_SEQ_NO_FIELD );
501
+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), FOLLOWER_GLOBAL_CHECKPOINT_FIELD );
502
+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), FOLLOWER_MAX_SEQ_NO_FIELD );
503
+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), LAST_REQUESTED_SEQ_NO_FIELD );
504
+ STATUS_PARSER .declareInt (ConstructingObjectParser .constructorArg (), NUMBER_OF_CONCURRENT_READS_FIELD );
505
+ STATUS_PARSER .declareInt (ConstructingObjectParser .constructorArg (), NUMBER_OF_CONCURRENT_WRITES_FIELD );
506
+ STATUS_PARSER .declareInt (ConstructingObjectParser .constructorArg (), NUMBER_OF_QUEUED_WRITES_FIELD );
507
+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), INDEX_METADATA_VERSION_FIELD );
508
+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), TOTAL_FETCH_TIME_MILLIS_FIELD );
509
+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD );
510
+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), NUMBER_OF_FAILED_FETCHES_FIELD );
511
+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), OPERATIONS_RECEIVED_FIELD );
512
+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), TOTAL_TRANSFERRED_BYTES );
513
+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), TOTAL_INDEX_TIME_MILLIS_FIELD );
514
+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD );
515
+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD );
516
+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), NUMBER_OF_OPERATIONS_INDEXED_FIELD );
517
+ STATUS_PARSER .declareObjectArray (ConstructingObjectParser .constructorArg (), FETCH_EXCEPTIONS_ENTRY_PARSER , FETCH_EXCEPTIONS );
518
+ }
519
+
520
+ static final ParseField FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO = new ParseField ("from_seq_no" );
521
+ static final ParseField FETCH_EXCEPTIONS_ENTRY_EXCEPTION = new ParseField ("exception" );
463
522
464
523
static {
465
- PARSER .declareInt (ConstructingObjectParser .constructorArg (), SHARD_ID );
466
- PARSER .declareLong (ConstructingObjectParser .constructorArg (), LEADER_GLOBAL_CHECKPOINT_FIELD );
467
- PARSER .declareLong (ConstructingObjectParser .constructorArg (), LEADER_MAX_SEQ_NO_FIELD );
468
- PARSER .declareLong (ConstructingObjectParser .constructorArg (), FOLLOWER_GLOBAL_CHECKPOINT_FIELD );
469
- PARSER .declareLong (ConstructingObjectParser .constructorArg (), FOLLOWER_MAX_SEQ_NO_FIELD );
470
- PARSER .declareLong (ConstructingObjectParser .constructorArg (), LAST_REQUESTED_SEQ_NO_FIELD );
471
- PARSER .declareInt (ConstructingObjectParser .constructorArg (), NUMBER_OF_CONCURRENT_READS_FIELD );
472
- PARSER .declareInt (ConstructingObjectParser .constructorArg (), NUMBER_OF_CONCURRENT_WRITES_FIELD );
473
- PARSER .declareInt (ConstructingObjectParser .constructorArg (), NUMBER_OF_QUEUED_WRITES_FIELD );
474
- PARSER .declareLong (ConstructingObjectParser .constructorArg (), INDEX_METADATA_VERSION_FIELD );
475
- PARSER .declareLong (ConstructingObjectParser .constructorArg (), TOTAL_FETCH_TIME_MILLIS_FIELD );
476
- PARSER .declareLong (ConstructingObjectParser .constructorArg (), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD );
477
- PARSER .declareLong (ConstructingObjectParser .constructorArg (), NUMBER_OF_FAILED_FETCHES_FIELD );
478
- PARSER .declareLong (ConstructingObjectParser .constructorArg (), OPERATIONS_RECEIVED_FIELD );
479
- PARSER .declareLong (ConstructingObjectParser .constructorArg (), TOTAL_TRANSFERRED_BYTES );
480
- PARSER .declareLong (ConstructingObjectParser .constructorArg (), TOTAL_INDEX_TIME_MILLIS_FIELD );
481
- PARSER .declareLong (ConstructingObjectParser .constructorArg (), NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD );
482
- PARSER .declareLong (ConstructingObjectParser .constructorArg (), NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD );
483
- PARSER .declareLong (ConstructingObjectParser .constructorArg (), NUMBER_OF_OPERATIONS_INDEXED_FIELD );
524
+ FETCH_EXCEPTIONS_ENTRY_PARSER .declareLong (ConstructingObjectParser .constructorArg (), FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO );
525
+ FETCH_EXCEPTIONS_ENTRY_PARSER .declareObject (
526
+ ConstructingObjectParser .constructorArg (),
527
+ (p , c ) -> ElasticsearchException .fromXContent (p ),
528
+ FETCH_EXCEPTIONS_ENTRY_EXCEPTION );
484
529
}
485
530
486
531
private final int shardId ;
@@ -597,6 +642,12 @@ public long numberOfOperationsIndexed() {
597
642
return numberOfOperationsIndexed ;
598
643
}
599
644
645
+ private final NavigableMap <Long , ElasticsearchException > fetchExceptions ;
646
+
647
+ public NavigableMap <Long , ElasticsearchException > fetchExceptions () {
648
+ return fetchExceptions ;
649
+ }
650
+
600
651
Status (
601
652
final int shardId ,
602
653
final long leaderGlobalCheckpoint ,
@@ -616,7 +667,8 @@ public long numberOfOperationsIndexed() {
616
667
final long totalIndexTimeMillis ,
617
668
final long numberOfSuccessfulBulkOperations ,
618
669
final long numberOfFailedBulkOperations ,
619
- final long numberOfOperationsIndexed ) {
670
+ final long numberOfOperationsIndexed ,
671
+ final NavigableMap <Long , ElasticsearchException > fetchExceptions ) {
620
672
this .shardId = shardId ;
621
673
this .leaderGlobalCheckpoint = leaderGlobalCheckpoint ;
622
674
this .leaderMaxSeqNo = leaderMaxSeqNo ;
@@ -636,6 +688,7 @@ public long numberOfOperationsIndexed() {
636
688
this .numberOfSuccessfulBulkOperations = numberOfSuccessfulBulkOperations ;
637
689
this .numberOfFailedBulkOperations = numberOfFailedBulkOperations ;
638
690
this .numberOfOperationsIndexed = numberOfOperationsIndexed ;
691
+ this .fetchExceptions = fetchExceptions ;
639
692
}
640
693
641
694
public Status (final StreamInput in ) throws IOException {
@@ -658,11 +711,12 @@ public Status(final StreamInput in) throws IOException {
658
711
this .numberOfSuccessfulBulkOperations = in .readVLong ();
659
712
this .numberOfFailedBulkOperations = in .readVLong ();
660
713
this .numberOfOperationsIndexed = in .readVLong ();
714
+ this .fetchExceptions = new TreeMap <>(in .readMap (StreamInput ::readVLong , StreamInput ::readException ));
661
715
}
662
716
663
717
@ Override
664
718
public String getWriteableName () {
665
- return NAME ;
719
+ return STATUS_PARSER_NAME ;
666
720
}
667
721
668
722
@ Override
@@ -686,6 +740,7 @@ public void writeTo(final StreamOutput out) throws IOException {
686
740
out .writeVLong (numberOfSuccessfulBulkOperations );
687
741
out .writeVLong (numberOfFailedBulkOperations );
688
742
out .writeVLong (numberOfOperationsIndexed );
743
+ out .writeMap (fetchExceptions , StreamOutput ::writeVLong , StreamOutput ::writeException );
689
744
}
690
745
691
746
@ Override
@@ -720,13 +775,30 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
720
775
builder .field (NUMBER_OF_SUCCESSFUL_BULK_OPERATIONS_FIELD .getPreferredName (), numberOfSuccessfulBulkOperations );
721
776
builder .field (NUMBER_OF_FAILED_BULK_OPERATIONS_FIELD .getPreferredName (), numberOfFailedBulkOperations );
722
777
builder .field (NUMBER_OF_OPERATIONS_INDEXED_FIELD .getPreferredName (), numberOfOperationsIndexed );
778
+ builder .startArray (FETCH_EXCEPTIONS .getPreferredName ());
779
+ {
780
+ for (final Map .Entry <Long , ElasticsearchException > entry : fetchExceptions .entrySet ()) {
781
+ builder .startObject ();
782
+ {
783
+ builder .field (FETCH_EXCEPTIONS_ENTRY_FROM_SEQ_NO .getPreferredName (), entry .getKey ());
784
+ builder .field (FETCH_EXCEPTIONS_ENTRY_EXCEPTION .getPreferredName ());
785
+ builder .startObject ();
786
+ {
787
+ ElasticsearchException .generateThrowableXContent (builder , params , entry .getValue ());
788
+ }
789
+ builder .endObject ();
790
+ }
791
+ builder .endObject ();
792
+ }
793
+ }
794
+ builder .endArray ();
723
795
}
724
796
builder .endObject ();
725
797
return builder ;
726
798
}
727
799
728
800
public static Status fromXContent (final XContentParser parser ) {
729
- return PARSER .apply (parser , null );
801
+ return STATUS_PARSER .apply (parser , null );
730
802
}
731
803
732
804
@ Override
0 commit comments