@@ -81,7 +81,7 @@ public abstract class ShardFollowNodeTask extends AllocatedPersistentTask {
81
81
private long followerMaxSeqNo = 0 ;
82
82
private int numConcurrentReads = 0 ;
83
83
private int numConcurrentWrites = 0 ;
84
- private long currentIndexMetadataVersion = 0 ;
84
+ private long currentMappingVersion = 0 ;
85
85
private long totalFetchTimeMillis = 0 ;
86
86
private long numberOfSuccessfulFetches = 0 ;
87
87
private long numberOfFailedFetches = 0 ;
@@ -139,14 +139,13 @@ void start(
139
139
this .lastRequestedSeqNo = followerGlobalCheckpoint ;
140
140
}
141
141
142
- // Forcefully updates follower mapping, this gets us the leader imd version and
143
- // makes sure that leader and follower mapping are identical.
144
- updateMapping (imdVersion -> {
142
+ // updates follower mapping, this gets us the leader mapping version and makes sure that leader and follower mapping are identical
143
+ updateMapping (mappingVersion -> {
145
144
synchronized (ShardFollowNodeTask .this ) {
146
- currentIndexMetadataVersion = imdVersion ;
145
+ currentMappingVersion = mappingVersion ;
147
146
}
148
- LOGGER .info ("{} Started to follow leader shard {}, followGlobalCheckPoint={}, indexMetaDataVersion ={}" ,
149
- params .getFollowShardId (), params .getLeaderShardId (), followerGlobalCheckpoint , imdVersion );
147
+ LOGGER .info ("{} Started to follow leader shard {}, followGlobalCheckPoint={}, mappingVersion ={}" ,
148
+ params .getFollowShardId (), params .getLeaderShardId (), followerGlobalCheckpoint , mappingVersion );
150
149
coordinateReads ();
151
150
});
152
151
}
@@ -269,7 +268,7 @@ private void sendShardChangesRequest(long from, int maxOperationCount, long maxR
269
268
}
270
269
271
270
void handleReadResponse (long from , long maxRequiredSeqNo , ShardChangesAction .Response response ) {
272
- maybeUpdateMapping (response .getIndexMetadataVersion (), () -> innerHandleReadResponse (from , maxRequiredSeqNo , response ));
271
+ maybeUpdateMapping (response .getMappingVersion (), () -> innerHandleReadResponse (from , maxRequiredSeqNo , response ));
273
272
}
274
273
275
274
/** Called when some operations are fetched from the leading */
@@ -355,16 +354,16 @@ private synchronized void handleWriteResponse(final BulkShardOperationsResponse
355
354
coordinateReads ();
356
355
}
357
356
358
- private synchronized void maybeUpdateMapping (Long minimumRequiredIndexMetadataVersion , Runnable task ) {
359
- if (currentIndexMetadataVersion >= minimumRequiredIndexMetadataVersion ) {
360
- LOGGER .trace ("{} index metadata version [{}] is higher or equal than minimum required index metadata version [{}]" ,
361
- params .getFollowShardId (), currentIndexMetadataVersion , minimumRequiredIndexMetadataVersion );
357
+ private synchronized void maybeUpdateMapping (Long minimumRequiredMappingVersion , Runnable task ) {
358
+ if (currentMappingVersion >= minimumRequiredMappingVersion ) {
359
+ LOGGER .trace ("{} mapping version [{}] is higher or equal than minimum required mapping version [{}]" ,
360
+ params .getFollowShardId (), currentMappingVersion , minimumRequiredMappingVersion );
362
361
task .run ();
363
362
} else {
364
- LOGGER .trace ("{} updating mapping, index metadata version [{}] is lower than minimum required index metadata version [{}]" ,
365
- params .getFollowShardId (), currentIndexMetadataVersion , minimumRequiredIndexMetadataVersion );
366
- updateMapping (imdVersion -> {
367
- currentIndexMetadataVersion = imdVersion ;
363
+ LOGGER .trace ("{} updating mapping, mapping version [{}] is lower than minimum required mapping version [{}]" ,
364
+ params .getFollowShardId (), currentMappingVersion , minimumRequiredMappingVersion );
365
+ updateMapping (mappingVersion -> {
366
+ currentMappingVersion = mappingVersion ;
368
367
task .run ();
369
368
});
370
369
}
@@ -441,7 +440,7 @@ public synchronized Status getStatus() {
441
440
numConcurrentReads ,
442
441
numConcurrentWrites ,
443
442
buffer .size (),
444
- currentIndexMetadataVersion ,
443
+ currentMappingVersion ,
445
444
totalFetchTimeMillis ,
446
445
numberOfSuccessfulFetches ,
447
446
numberOfFailedFetches ,
@@ -469,7 +468,7 @@ public static class Status implements Task.Status {
469
468
static final ParseField NUMBER_OF_CONCURRENT_READS_FIELD = new ParseField ("number_of_concurrent_reads" );
470
469
static final ParseField NUMBER_OF_CONCURRENT_WRITES_FIELD = new ParseField ("number_of_concurrent_writes" );
471
470
static final ParseField NUMBER_OF_QUEUED_WRITES_FIELD = new ParseField ("number_of_queued_writes" );
472
- static final ParseField INDEX_METADATA_VERSION_FIELD = new ParseField ("index_metadata_version " );
471
+ static final ParseField MAPPING_VERSION_FIELD = new ParseField ("mapping_version " );
473
472
static final ParseField TOTAL_FETCH_TIME_MILLIS_FIELD = new ParseField ("total_fetch_time_millis" );
474
473
static final ParseField NUMBER_OF_SUCCESSFUL_FETCHES_FIELD = new ParseField ("number_of_successful_fetches" );
475
474
static final ParseField NUMBER_OF_FAILED_FETCHES_FIELD = new ParseField ("number_of_failed_fetches" );
@@ -529,7 +528,7 @@ public static class Status implements Task.Status {
529
528
STATUS_PARSER .declareInt (ConstructingObjectParser .constructorArg (), NUMBER_OF_CONCURRENT_READS_FIELD );
530
529
STATUS_PARSER .declareInt (ConstructingObjectParser .constructorArg (), NUMBER_OF_CONCURRENT_WRITES_FIELD );
531
530
STATUS_PARSER .declareInt (ConstructingObjectParser .constructorArg (), NUMBER_OF_QUEUED_WRITES_FIELD );
532
- STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), INDEX_METADATA_VERSION_FIELD );
531
+ STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), MAPPING_VERSION_FIELD );
533
532
STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), TOTAL_FETCH_TIME_MILLIS_FIELD );
534
533
STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), NUMBER_OF_SUCCESSFUL_FETCHES_FIELD );
535
534
STATUS_PARSER .declareLong (ConstructingObjectParser .constructorArg (), NUMBER_OF_FAILED_FETCHES_FIELD );
@@ -614,10 +613,10 @@ public int numberOfQueuedWrites() {
614
613
return numberOfQueuedWrites ;
615
614
}
616
615
617
- private final long indexMetadataVersion ;
616
+ private final long mappingVersion ;
618
617
619
- public long indexMetadataVersion () {
620
- return indexMetadataVersion ;
618
+ public long mappingVersion () {
619
+ return mappingVersion ;
621
620
}
622
621
623
622
private final long totalFetchTimeMillis ;
@@ -697,7 +696,7 @@ public long timeSinceLastFetchMillis() {
697
696
final int numberOfConcurrentReads ,
698
697
final int numberOfConcurrentWrites ,
699
698
final int numberOfQueuedWrites ,
700
- final long indexMetadataVersion ,
699
+ final long mappingVersion ,
701
700
final long totalFetchTimeMillis ,
702
701
final long numberOfSuccessfulFetches ,
703
702
final long numberOfFailedFetches ,
@@ -719,7 +718,7 @@ public long timeSinceLastFetchMillis() {
719
718
this .numberOfConcurrentReads = numberOfConcurrentReads ;
720
719
this .numberOfConcurrentWrites = numberOfConcurrentWrites ;
721
720
this .numberOfQueuedWrites = numberOfQueuedWrites ;
722
- this .indexMetadataVersion = indexMetadataVersion ;
721
+ this .mappingVersion = mappingVersion ;
723
722
this .totalFetchTimeMillis = totalFetchTimeMillis ;
724
723
this .numberOfSuccessfulFetches = numberOfSuccessfulFetches ;
725
724
this .numberOfFailedFetches = numberOfFailedFetches ;
@@ -729,7 +728,7 @@ public long timeSinceLastFetchMillis() {
729
728
this .numberOfSuccessfulBulkOperations = numberOfSuccessfulBulkOperations ;
730
729
this .numberOfFailedBulkOperations = numberOfFailedBulkOperations ;
731
730
this .numberOfOperationsIndexed = numberOfOperationsIndexed ;
732
- this .fetchExceptions = fetchExceptions ;
731
+ this .fetchExceptions = Objects . requireNonNull ( fetchExceptions ) ;
733
732
this .timeSinceLastFetchMillis = timeSinceLastFetchMillis ;
734
733
}
735
734
@@ -744,7 +743,7 @@ public Status(final StreamInput in) throws IOException {
744
743
this .numberOfConcurrentReads = in .readVInt ();
745
744
this .numberOfConcurrentWrites = in .readVInt ();
746
745
this .numberOfQueuedWrites = in .readVInt ();
747
- this .indexMetadataVersion = in .readVLong ();
746
+ this .mappingVersion = in .readVLong ();
748
747
this .totalFetchTimeMillis = in .readVLong ();
749
748
this .numberOfSuccessfulFetches = in .readVLong ();
750
749
this .numberOfFailedFetches = in .readVLong ();
@@ -775,7 +774,7 @@ public void writeTo(final StreamOutput out) throws IOException {
775
774
out .writeVInt (numberOfConcurrentReads );
776
775
out .writeVInt (numberOfConcurrentWrites );
777
776
out .writeVInt (numberOfQueuedWrites );
778
- out .writeVLong (indexMetadataVersion );
777
+ out .writeVLong (mappingVersion );
779
778
out .writeVLong (totalFetchTimeMillis );
780
779
out .writeVLong (numberOfSuccessfulFetches );
781
780
out .writeVLong (numberOfFailedFetches );
@@ -803,7 +802,7 @@ public XContentBuilder toXContent(final XContentBuilder builder, final Params pa
803
802
builder .field (NUMBER_OF_CONCURRENT_READS_FIELD .getPreferredName (), numberOfConcurrentReads );
804
803
builder .field (NUMBER_OF_CONCURRENT_WRITES_FIELD .getPreferredName (), numberOfConcurrentWrites );
805
804
builder .field (NUMBER_OF_QUEUED_WRITES_FIELD .getPreferredName (), numberOfQueuedWrites );
806
- builder .field (INDEX_METADATA_VERSION_FIELD .getPreferredName (), indexMetadataVersion );
805
+ builder .field (MAPPING_VERSION_FIELD .getPreferredName (), mappingVersion );
807
806
builder .humanReadableField (
808
807
TOTAL_FETCH_TIME_MILLIS_FIELD .getPreferredName (),
809
808
"total_fetch_time" ,
@@ -867,14 +866,22 @@ public boolean equals(final Object o) {
867
866
numberOfConcurrentReads == that .numberOfConcurrentReads &&
868
867
numberOfConcurrentWrites == that .numberOfConcurrentWrites &&
869
868
numberOfQueuedWrites == that .numberOfQueuedWrites &&
870
- indexMetadataVersion == that .indexMetadataVersion &&
869
+ mappingVersion == that .mappingVersion &&
871
870
totalFetchTimeMillis == that .totalFetchTimeMillis &&
872
871
numberOfSuccessfulFetches == that .numberOfSuccessfulFetches &&
873
872
numberOfFailedFetches == that .numberOfFailedFetches &&
874
873
operationsReceived == that .operationsReceived &&
875
874
totalTransferredBytes == that .totalTransferredBytes &&
876
875
numberOfSuccessfulBulkOperations == that .numberOfSuccessfulBulkOperations &&
877
876
numberOfFailedBulkOperations == that .numberOfFailedBulkOperations &&
877
+ numberOfOperationsIndexed == that .numberOfOperationsIndexed &&
878
+ /*
879
+ * ElasticsearchException does not implement equals so we will assume the fetch exceptions are equal if they are equal
880
+ * up to the key set and their messages. Note that we are relying on the fact that the fetch exceptions are ordered by
881
+ * keys.
882
+ */
883
+ fetchExceptions .keySet ().equals (that .fetchExceptions .keySet ()) &&
884
+ getFetchExceptionMessages (this ).equals (getFetchExceptionMessages (that )) &&
878
885
timeSinceLastFetchMillis == that .timeSinceLastFetchMillis ;
879
886
}
880
887
@@ -891,16 +898,26 @@ public int hashCode() {
891
898
numberOfConcurrentReads ,
892
899
numberOfConcurrentWrites ,
893
900
numberOfQueuedWrites ,
894
- indexMetadataVersion ,
901
+ mappingVersion ,
895
902
totalFetchTimeMillis ,
896
903
numberOfSuccessfulFetches ,
897
904
numberOfFailedFetches ,
898
905
operationsReceived ,
899
906
totalTransferredBytes ,
900
907
numberOfSuccessfulBulkOperations ,
901
908
numberOfFailedBulkOperations ,
909
+ numberOfOperationsIndexed ,
910
+ /*
911
+ * ElasticsearchException does not implement hash code so we will compute the hash code based on the key set and the
912
+ * messages. Note that we are relying on the fact that the fetch exceptions are ordered by keys.
913
+ */
914
+ fetchExceptions .keySet (),
915
+ getFetchExceptionMessages (this ),
902
916
timeSinceLastFetchMillis );
917
+ }
903
918
919
+ private static List <String > getFetchExceptionMessages (final Status status ) {
920
+ return status .fetchExceptions ().values ().stream ().map (ElasticsearchException ::getMessage ).collect (Collectors .toList ());
904
921
}
905
922
906
923
public String toString () {
0 commit comments