@@ -407,7 +407,7 @@ public <T> void submitStateUpdateTasks(final String source,
407
407
// convert to an identity map to check for dups based on update tasks semantics of using identity instead of equal
408
408
final IdentityHashMap <T , ClusterStateTaskListener > tasksIdentity = new IdentityHashMap <>(tasks );
409
409
final List <UpdateTask <T >> updateTasks = tasksIdentity .entrySet ().stream ().map (
410
- entry -> new UpdateTask <>(source , entry .getKey (), config , executor , safe (entry .getValue (), logger ))
410
+ entry -> new UpdateTask <>(source , entry .getKey (), config . priority () , executor , safe (entry .getValue (), logger ))
411
411
).collect (Collectors .toList ());
412
412
413
413
synchronized (updateTasksPerExecutor ) {
@@ -546,11 +546,11 @@ <T> void runTasksForExecutor(ClusterStateTaskExecutor<T> executor) {
546
546
if (pending != null ) {
547
547
for (UpdateTask <T > task : pending ) {
548
548
if (task .processed .getAndSet (true ) == false ) {
549
- logger .trace ("will process {}" , task . toString ( executor ) );
549
+ logger .trace ("will process {}" , task );
550
550
toExecute .add (task );
551
551
processTasksBySource .computeIfAbsent (task .source , s -> new ArrayList <>()).add (task .task );
552
552
} else {
553
- logger .trace ("skipping {}, already processed" , task . toString ( executor ) );
553
+ logger .trace ("skipping {}, already processed" , task );
554
554
}
555
555
}
556
556
}
@@ -608,23 +608,23 @@ <T> void runTasksForExecutor(ClusterStateTaskExecutor<T> executor) {
608
608
if (assertsEnabled ) {
609
609
for (UpdateTask <T > updateTask : toExecute ) {
610
610
assert batchResult .executionResults .containsKey (updateTask .task ) :
611
- "missing task result for " + updateTask . toString ( executor ) ;
611
+ "missing task result for " + updateTask ;
612
612
}
613
613
}
614
614
615
615
ClusterState newClusterState = batchResult .resultingState ;
616
616
final ArrayList <UpdateTask <T >> proccessedListeners = new ArrayList <>();
617
617
// fail all tasks that have failed and extract those that are waiting for results
618
618
for (UpdateTask <T > updateTask : toExecute ) {
619
- assert batchResult .executionResults .containsKey (updateTask .task ) : "missing " + updateTask . toString ( executor ) ;
619
+ assert batchResult .executionResults .containsKey (updateTask .task ) : "missing " + updateTask ;
620
620
final ClusterStateTaskExecutor .TaskResult executionResult =
621
621
batchResult .executionResults .get (updateTask .task );
622
622
executionResult .handle (
623
623
() -> proccessedListeners .add (updateTask ),
624
624
ex -> {
625
625
logger .debug (
626
626
(Supplier <?>)
627
- () -> new ParameterizedMessage ("cluster state update task {} failed" , updateTask . toString ( executor ) ), ex );
627
+ () -> new ParameterizedMessage ("cluster state update task {} failed" , updateTask ), ex );
628
628
updateTask .listener .onFailure (updateTask .source , ex );
629
629
}
630
630
);
@@ -901,16 +901,13 @@ public TimeValue ackTimeout() {
901
901
class UpdateTask <T > extends SourcePrioritizedRunnable {
902
902
903
903
public final T task ;
904
- public final ClusterStateTaskConfig config ;
905
- public final ClusterStateTaskExecutor <T > executor ;
906
904
public final ClusterStateTaskListener listener ;
905
+ private final ClusterStateTaskExecutor <T > executor ;
907
906
public final AtomicBoolean processed = new AtomicBoolean ();
908
907
909
- UpdateTask (String source , T task , ClusterStateTaskConfig config , ClusterStateTaskExecutor <T > executor ,
910
- ClusterStateTaskListener listener ) {
911
- super (config .priority (), source );
908
+ UpdateTask (String source , T task , Priority priority , ClusterStateTaskExecutor <T > executor , ClusterStateTaskListener listener ) {
909
+ super (priority , source );
912
910
this .task = task ;
913
- this .config = config ;
914
911
this .executor = executor ;
915
912
this .listener = listener ;
916
913
}
@@ -924,7 +921,8 @@ public void run() {
924
921
}
925
922
}
926
923
927
- public String toString (ClusterStateTaskExecutor <T > executor ) {
924
+ @ Override
925
+ public String toString () {
928
926
String taskDescription = executor .describeTasks (Collections .singletonList (task ));
929
927
if (taskDescription .isEmpty ()) {
930
928
return "[" + source + "]" ;
0 commit comments