@@ -451,7 +451,7 @@ public <T> void submitStateUpdateTasks(final String source,
451
451
// convert to an identity map to check for dups based on update tasks semantics of using identity instead of equal
452
452
final IdentityHashMap <T , ClusterStateTaskListener > tasksIdentity = new IdentityHashMap <>(tasks );
453
453
final List <UpdateTask <T >> updateTasks = tasksIdentity .entrySet ().stream ().map (
454
- entry -> new UpdateTask <>(source , entry .getKey (), config , executor , safe (entry .getValue (), logger ))
454
+ entry -> new UpdateTask <>(source , entry .getKey (), config . priority () , executor , safe (entry .getValue (), logger ))
455
455
).collect (Collectors .toList ());
456
456
457
457
synchronized (updateTasksPerExecutor ) {
@@ -590,11 +590,11 @@ <T> void runTasksForExecutor(ClusterStateTaskExecutor<T> executor) {
590
590
if (pending != null ) {
591
591
for (UpdateTask <T > task : pending ) {
592
592
if (task .processed .getAndSet (true ) == false ) {
593
- logger .trace ("will process {}" , task . toString ( executor ) );
593
+ logger .trace ("will process {}" , task );
594
594
toExecute .add (task );
595
595
processTasksBySource .computeIfAbsent (task .source , s -> new ArrayList <>()).add (task .task );
596
596
} else {
597
- logger .trace ("skipping {}, already processed" , task . toString ( executor ) );
597
+ logger .trace ("skipping {}, already processed" , task );
598
598
}
599
599
}
600
600
}
@@ -652,23 +652,23 @@ <T> void runTasksForExecutor(ClusterStateTaskExecutor<T> executor) {
652
652
if (assertsEnabled ) {
653
653
for (UpdateTask <T > updateTask : toExecute ) {
654
654
assert batchResult .executionResults .containsKey (updateTask .task ) :
655
- "missing task result for " + updateTask . toString ( executor ) ;
655
+ "missing task result for " + updateTask ;
656
656
}
657
657
}
658
658
659
659
ClusterState newClusterState = batchResult .resultingState ;
660
660
final ArrayList <UpdateTask <T >> proccessedListeners = new ArrayList <>();
661
661
// fail all tasks that have failed and extract those that are waiting for results
662
662
for (UpdateTask <T > updateTask : toExecute ) {
663
- assert batchResult .executionResults .containsKey (updateTask .task ) : "missing " + updateTask . toString ( executor ) ;
663
+ assert batchResult .executionResults .containsKey (updateTask .task ) : "missing " + updateTask ;
664
664
final ClusterStateTaskExecutor .TaskResult executionResult =
665
665
batchResult .executionResults .get (updateTask .task );
666
666
executionResult .handle (
667
667
() -> proccessedListeners .add (updateTask ),
668
668
ex -> {
669
669
logger .debug (
670
670
(Supplier <?>)
671
- () -> new ParameterizedMessage ("cluster state update task {} failed" , updateTask . toString ( executor ) ), ex );
671
+ () -> new ParameterizedMessage ("cluster state update task {} failed" , updateTask ), ex );
672
672
updateTask .listener .onFailure (updateTask .source , ex );
673
673
}
674
674
);
@@ -944,16 +944,13 @@ public TimeValue ackTimeout() {
944
944
class UpdateTask <T > extends SourcePrioritizedRunnable {
945
945
946
946
public final T task ;
947
- public final ClusterStateTaskConfig config ;
948
- public final ClusterStateTaskExecutor <T > executor ;
949
947
public final ClusterStateTaskListener listener ;
948
+ private final ClusterStateTaskExecutor <T > executor ;
950
949
public final AtomicBoolean processed = new AtomicBoolean ();
951
950
952
- UpdateTask (String source , T task , ClusterStateTaskConfig config , ClusterStateTaskExecutor <T > executor ,
953
- ClusterStateTaskListener listener ) {
954
- super (config .priority (), source );
951
+ UpdateTask (String source , T task , Priority priority , ClusterStateTaskExecutor <T > executor , ClusterStateTaskListener listener ) {
952
+ super (priority , source );
955
953
this .task = task ;
956
- this .config = config ;
957
954
this .executor = executor ;
958
955
this .listener = listener ;
959
956
}
@@ -967,7 +964,8 @@ public void run() {
967
964
}
968
965
}
969
966
970
- public String toString (ClusterStateTaskExecutor <T > executor ) {
967
+ @ Override
968
+ public String toString () {
971
969
String taskDescription = executor .describeTasks (Collections .singletonList (task ));
972
970
if (taskDescription .isEmpty ()) {
973
971
return "[" + source + "]" ;
0 commit comments