42
42
import org .elasticsearch .cluster .routing .RoutingTable ;
43
43
import org .elasticsearch .common .Nullable ;
44
44
import org .elasticsearch .common .Priority ;
45
- import org .elasticsearch .common .Strings ;
46
45
import org .elasticsearch .common .component .AbstractLifecycleComponent ;
47
46
import org .elasticsearch .common .logging .ESLogger ;
48
47
import org .elasticsearch .common .logging .Loggers ;
@@ -512,33 +511,37 @@ public String source() {
512
511
513
512
<T > void runTasksForExecutor (ClusterStateTaskExecutor <T > executor ) {
514
513
final ArrayList <UpdateTask <T >> toExecute = new ArrayList <>();
515
- final ArrayList <String > sources = new ArrayList <>();
514
+ final Map <String , ArrayList < T >> processTasksBySource = new HashMap <>();
516
515
synchronized (updateTasksPerExecutor ) {
517
516
List <UpdateTask > pending = updateTasksPerExecutor .remove (executor );
518
517
if (pending != null ) {
519
518
for (UpdateTask <T > task : pending ) {
520
519
if (task .processed .getAndSet (true ) == false ) {
521
- logger .trace ("will process [{}] " , task .source );
520
+ logger .trace ("will process [{}[{}]] " , task .source , task . task );
522
521
toExecute .add (task );
523
- sources . add (task .source );
522
+ processTasksBySource . computeIfAbsent (task .source , s -> new ArrayList <>()). add ( task . task );
524
523
} else {
525
- logger .trace ("skipping [{}] , already processed" , task .source );
524
+ logger .trace ("skipping [{}[{}]] , already processed" , task .source , task . task );
526
525
}
527
526
}
528
527
}
529
528
}
530
529
if (toExecute .isEmpty ()) {
531
530
return ;
532
531
}
533
- final String source = Strings .collectionToCommaDelimitedString (sources );
532
+ final String tasksSummary = processTasksBySource .entrySet ().stream ().map (entry -> {
533
+ String tasks = executor .describeTasks (entry .getValue ());
534
+ return tasks .isEmpty () ? entry .getKey () : entry .getKey () + "[" + tasks + "]" ;
535
+ }).reduce ((s1 , s2 ) -> s1 + ", " + s2 ).orElse ("" );
536
+
534
537
if (!lifecycle .started ()) {
535
- logger .debug ("processing [{}]: ignoring, cluster_service not started" , source );
538
+ logger .debug ("processing [{}]: ignoring, cluster_service not started" , tasksSummary );
536
539
return ;
537
540
}
538
- logger .debug ("processing [{}]: execute" , source );
541
+ logger .debug ("processing [{}]: execute" , tasksSummary );
539
542
ClusterState previousClusterState = clusterState ;
540
543
if (!previousClusterState .nodes ().isLocalNodeElectedMaster () && executor .runOnlyOnMaster ()) {
541
- logger .debug ("failing [{}]: local node is no longer master" , source );
544
+ logger .debug ("failing [{}]: local node is no longer master" , tasksSummary );
542
545
toExecute .stream ().forEach (task -> task .listener .onNoLongerMaster (task .source ));
543
546
return ;
544
547
}
@@ -551,10 +554,10 @@ <T> void runTasksForExecutor(ClusterStateTaskExecutor<T> executor) {
551
554
TimeValue executionTime = TimeValue .timeValueMillis (Math .max (0 , TimeValue .nsecToMSec (currentTimeInNanos () - startTimeNS )));
552
555
if (logger .isTraceEnabled ()) {
553
556
logger .trace ("failed to execute cluster state update in [{}], state:\n version [{}], source [{}]\n {}{}{}" , e , executionTime ,
554
- previousClusterState .version (), source , previousClusterState .nodes ().prettyPrint (),
557
+ previousClusterState .version (), tasksSummary , previousClusterState .nodes ().prettyPrint (),
555
558
previousClusterState .routingTable ().prettyPrint (), previousClusterState .getRoutingNodes ().prettyPrint ());
556
559
}
557
- warnAboutSlowTaskIfNeeded (executionTime , source );
560
+ warnAboutSlowTaskIfNeeded (executionTime , tasksSummary );
558
561
batchResult = ClusterStateTaskExecutor .BatchResult .<T >builder ()
559
562
.failures (toExecute .stream ().map (updateTask -> updateTask .task )::iterator , e )
560
563
.build (previousClusterState );
@@ -597,8 +600,8 @@ <T> void runTasksForExecutor(ClusterStateTaskExecutor<T> executor) {
597
600
task .listener .clusterStateProcessed (task .source , previousClusterState , newClusterState );
598
601
}
599
602
TimeValue executionTime = TimeValue .timeValueMillis (Math .max (0 , TimeValue .nsecToMSec (currentTimeInNanos () - startTimeNS )));
600
- logger .debug ("processing [{}]: took [{}] no change in cluster_state" , source , executionTime );
601
- warnAboutSlowTaskIfNeeded (executionTime , source );
603
+ logger .debug ("processing [{}]: took [{}] no change in cluster_state" , tasksSummary , executionTime );
604
+ warnAboutSlowTaskIfNeeded (executionTime , tasksSummary );
602
605
return ;
603
606
}
604
607
@@ -640,18 +643,18 @@ <T> void runTasksForExecutor(ClusterStateTaskExecutor<T> executor) {
640
643
newClusterState .status (ClusterState .ClusterStateStatus .BEING_APPLIED );
641
644
642
645
if (logger .isTraceEnabled ()) {
643
- logger .trace ("cluster state updated, source [{}]\n {}" , source , newClusterState .prettyPrint ());
646
+ logger .trace ("cluster state updated, source [{}]\n {}" , tasksSummary , newClusterState .prettyPrint ());
644
647
} else if (logger .isDebugEnabled ()) {
645
- logger .debug ("cluster state updated, version [{}], source [{}]" , newClusterState .version (), source );
648
+ logger .debug ("cluster state updated, version [{}], source [{}]" , newClusterState .version (), tasksSummary );
646
649
}
647
650
648
- ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent (source , newClusterState , previousClusterState );
651
+ ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent (tasksSummary , newClusterState , previousClusterState );
649
652
// new cluster state, notify all listeners
650
653
final DiscoveryNodes .Delta nodesDelta = clusterChangedEvent .nodesDelta ();
651
654
if (nodesDelta .hasChanges () && logger .isInfoEnabled ()) {
652
655
String summary = nodesDelta .shortSummary ();
653
656
if (summary .length () > 0 ) {
654
- logger .info ("{}, reason: {}" , summary , source );
657
+ logger .info ("{}, reason: {}" , summary , tasksSummary );
655
658
}
656
659
}
657
660
@@ -665,7 +668,7 @@ <T> void runTasksForExecutor(ClusterStateTaskExecutor<T> executor) {
665
668
try {
666
669
clusterStatePublisher .accept (clusterChangedEvent , ackListener );
667
670
} catch (Discovery .FailedToCommitClusterStateException t ) {
668
- logger .warn ("failing [{}]: failed to commit cluster state version [{}]" , t , source , newClusterState .version ());
671
+ logger .warn ("failing [{}]: failed to commit cluster state version [{}]" , t , tasksSummary , newClusterState .version ());
669
672
proccessedListeners .forEach (task -> task .listener .onFailure (task .source , t ));
670
673
return ;
671
674
}
@@ -719,17 +722,17 @@ <T> void runTasksForExecutor(ClusterStateTaskExecutor<T> executor) {
719
722
try {
720
723
executor .clusterStatePublished (clusterChangedEvent );
721
724
} catch (Exception e ) {
722
- logger .error ("exception thrown while notifying executor of new cluster state publication [{}]" , e , source );
725
+ logger .error ("exception thrown while notifying executor of new cluster state publication [{}]" , e , tasksSummary );
723
726
}
724
727
725
728
TimeValue executionTime = TimeValue .timeValueMillis (Math .max (0 , TimeValue .nsecToMSec (currentTimeInNanos () - startTimeNS )));
726
- logger .debug ("processing [{}]: took [{}] done applying updated cluster_state (version: {}, uuid: {})" , source , executionTime ,
727
- newClusterState .version (), newClusterState .stateUUID ());
728
- warnAboutSlowTaskIfNeeded (executionTime , source );
729
+ logger .debug ("processing [{}]: took [{}] done applying updated cluster_state (version: {}, uuid: {})" , tasksSummary ,
730
+ executionTime , newClusterState .version (), newClusterState .stateUUID ());
731
+ warnAboutSlowTaskIfNeeded (executionTime , tasksSummary );
729
732
} catch (Exception e ) {
730
733
TimeValue executionTime = TimeValue .timeValueMillis (Math .max (0 , TimeValue .nsecToMSec (currentTimeInNanos () - startTimeNS )));
731
734
logger .warn ("failed to apply updated cluster state in [{}]:\n version [{}], uuid [{}], source [{}]\n {}" , e , executionTime ,
732
- newClusterState .version (), newClusterState .stateUUID (), source , newClusterState .prettyPrint ());
735
+ newClusterState .version (), newClusterState .stateUUID (), tasksSummary , newClusterState .prettyPrint ());
733
736
// TODO: do we want to call updateTask.onFailure here?
734
737
}
735
738
0 commit comments