11
11
import org .elasticsearch .cluster .ClusterState ;
12
12
import org .elasticsearch .cluster .metadata .Metadata ;
13
13
import org .elasticsearch .cluster .node .DiscoveryNode ;
14
+ import org .elasticsearch .cluster .node .DiscoveryNodeRole ;
14
15
import org .elasticsearch .cluster .node .DiscoveryNodes ;
15
16
import org .elasticsearch .common .Randomness ;
16
17
import org .elasticsearch .common .collect .MapBuilder ;
42
43
import java .util .HashMap ;
43
44
import java .util .List ;
44
45
import java .util .Map ;
46
+ import java .util .Set ;
45
47
import java .util .SortedMap ;
46
48
import java .util .TreeMap ;
47
49
@@ -59,6 +61,10 @@ public class JobNodeSelectorTests extends ESTestCase {
59
61
// To simplify the logic in this class all jobs have the same memory requirement
60
62
private static final long MAX_JOB_BYTES = ByteSizeValue .ofGb (1 ).getBytes ();
61
63
private static final ByteSizeValue JOB_MEMORY_REQUIREMENT = ByteSizeValue .ofMb (10 );
64
+ private static final Set <DiscoveryNodeRole > ROLES_WITH_ML =
65
+ Set .of (DiscoveryNodeRole .MASTER_ROLE , DiscoveryNodeRole .ML_ROLE , DiscoveryNodeRole .DATA_ROLE );
66
+ private static final Set <DiscoveryNodeRole > ROLES_WITHOUT_ML =
67
+ Set .of (DiscoveryNodeRole .MASTER_ROLE , DiscoveryNodeRole .DATA_ROLE );
62
68
63
69
private MlMemoryTracker memoryTracker ;
64
70
private boolean isMemoryTrackerRecentlyRefreshed ;
@@ -77,22 +83,22 @@ public void testNodeNameAndVersion() {
77
83
TransportAddress ta = new TransportAddress (InetAddress .getLoopbackAddress (), 9300 );
78
84
Map <String , String > attributes = new HashMap <>();
79
85
attributes .put ("unrelated" , "attribute" );
80
- DiscoveryNode node = new DiscoveryNode ("_node_name1" , "_node_id1" , ta , attributes , Collections . emptySet () , Version .CURRENT );
86
+ DiscoveryNode node = new DiscoveryNode ("_node_name1" , "_node_id1" , ta , attributes , ROLES_WITHOUT_ML , Version .CURRENT );
81
87
assertEquals ("{_node_name1}{version=" + node .getVersion () + "}" , JobNodeSelector .nodeNameAndVersion (node ));
82
88
}
83
89
84
90
public void testNodeNameAndMlAttributes () {
85
91
TransportAddress ta = new TransportAddress (InetAddress .getLoopbackAddress (), 9300 );
86
92
SortedMap <String , String > attributes = new TreeMap <>();
87
93
attributes .put ("unrelated" , "attribute" );
88
- DiscoveryNode node = new DiscoveryNode ("_node_name1" , "_node_id1" , ta , attributes , Collections . emptySet () , Version .CURRENT );
94
+ DiscoveryNode node = new DiscoveryNode ("_node_name1" , "_node_id1" , ta , attributes , ROLES_WITHOUT_ML , Version .CURRENT );
89
95
assertEquals ("{_node_name1}" , JobNodeSelector .nodeNameAndMlAttributes (node ));
90
96
91
97
attributes .put ("ml.machine_memory" , "5" );
92
- node = new DiscoveryNode ("_node_name1" , "_node_id1" , ta , attributes , Collections . emptySet () , Version .CURRENT );
98
+ node = new DiscoveryNode ("_node_name1" , "_node_id1" , ta , attributes , ROLES_WITH_ML , Version .CURRENT );
93
99
assertEquals ("{_node_name1}{ml.machine_memory=5}" , JobNodeSelector .nodeNameAndMlAttributes (node ));
94
100
95
- node = new DiscoveryNode (null , "_node_id1" , ta , attributes , Collections . emptySet () , Version .CURRENT );
101
+ node = new DiscoveryNode (null , "_node_id1" , ta , attributes , ROLES_WITH_ML , Version .CURRENT );
96
102
assertEquals ("{_node_id1}{ml.machine_memory=5}" , JobNodeSelector .nodeNameAndMlAttributes (node ));
97
103
}
98
104
@@ -321,9 +327,9 @@ public void testSelectLeastLoadedMlNodeForDataFrameAnalyticsJob_firstJobTooBigMe
321
327
public void testSelectLeastLoadedMlNode_noMlNodes () {
322
328
DiscoveryNodes nodes = DiscoveryNodes .builder ()
323
329
.add (new DiscoveryNode ("_node_name1" , "_node_id1" , new TransportAddress (InetAddress .getLoopbackAddress (), 9300 ),
324
- Collections .emptyMap (), Collections . emptySet () , Version .CURRENT ))
330
+ Collections .emptyMap (), ROLES_WITHOUT_ML , Version .CURRENT ))
325
331
.add (new DiscoveryNode ("_node_name2" , "_node_id2" , new TransportAddress (InetAddress .getLoopbackAddress (), 9301 ),
326
- Collections .emptyMap (), Collections . emptySet () , Version .CURRENT ))
332
+ Collections .emptyMap (), ROLES_WITHOUT_ML , Version .CURRENT ))
327
333
.build ();
328
334
329
335
PersistentTasksCustomMetadata .Builder tasksBuilder = PersistentTasksCustomMetadata .builder ();
@@ -358,11 +364,11 @@ public void testSelectLeastLoadedMlNode_maxConcurrentOpeningJobs() {
358
364
Map <String , String > nodeAttr = Map .of (MachineLearning .MACHINE_MEMORY_NODE_ATTR , "1000000000" );
359
365
DiscoveryNodes nodes = DiscoveryNodes .builder ()
360
366
.add (new DiscoveryNode ("_node_name1" , "_node_id1" , new TransportAddress (InetAddress .getLoopbackAddress (), 9300 ),
361
- nodeAttr , Collections . emptySet () , Version .CURRENT ))
367
+ nodeAttr , ROLES_WITH_ML , Version .CURRENT ))
362
368
.add (new DiscoveryNode ("_node_name2" , "_node_id2" , new TransportAddress (InetAddress .getLoopbackAddress (), 9301 ),
363
- nodeAttr , Collections . emptySet () , Version .CURRENT ))
369
+ nodeAttr , ROLES_WITH_ML , Version .CURRENT ))
364
370
.add (new DiscoveryNode ("_node_name3" , "_node_id3" , new TransportAddress (InetAddress .getLoopbackAddress (), 9302 ),
365
- nodeAttr , Collections . emptySet () , Version .CURRENT ))
371
+ nodeAttr , ROLES_WITH_ML , Version .CURRENT ))
366
372
.build ();
367
373
368
374
PersistentTasksCustomMetadata .Builder tasksBuilder = PersistentTasksCustomMetadata .builder ();
@@ -465,11 +471,11 @@ public void testSelectLeastLoadedMlNode_concurrentOpeningJobsAndStaleFailedJob()
465
471
Map <String , String > nodeAttr = Map .of (MachineLearning .MACHINE_MEMORY_NODE_ATTR , "1000000000" );
466
472
DiscoveryNodes nodes = DiscoveryNodes .builder ()
467
473
.add (new DiscoveryNode ("_node_name1" , "_node_id1" , new TransportAddress (InetAddress .getLoopbackAddress (), 9300 ),
468
- nodeAttr , Collections . emptySet () , Version .CURRENT ))
474
+ nodeAttr , ROLES_WITH_ML , Version .CURRENT ))
469
475
.add (new DiscoveryNode ("_node_name2" , "_node_id2" , new TransportAddress (InetAddress .getLoopbackAddress (), 9301 ),
470
- nodeAttr , Collections . emptySet () , Version .CURRENT ))
476
+ nodeAttr , ROLES_WITH_ML , Version .CURRENT ))
471
477
.add (new DiscoveryNode ("_node_name3" , "_node_id3" , new TransportAddress (InetAddress .getLoopbackAddress (), 9302 ),
472
- nodeAttr , Collections . emptySet () , Version .CURRENT ))
478
+ nodeAttr , ROLES_WITH_ML , Version .CURRENT ))
473
479
.build ();
474
480
475
481
PersistentTasksCustomMetadata .Builder tasksBuilder = PersistentTasksCustomMetadata .builder ();
@@ -532,9 +538,9 @@ public void testSelectLeastLoadedMlNode_noCompatibleJobTypeNodes() {
532
538
Map <String , String > nodeAttr = Map .of (MachineLearning .MACHINE_MEMORY_NODE_ATTR , "1000000000" );
533
539
DiscoveryNodes nodes = DiscoveryNodes .builder ()
534
540
.add (new DiscoveryNode ("_node_name1" , "_node_id1" , new TransportAddress (InetAddress .getLoopbackAddress (), 9300 ),
535
- nodeAttr , Collections . emptySet () , Version .CURRENT ))
541
+ nodeAttr , ROLES_WITH_ML , Version .CURRENT ))
536
542
.add (new DiscoveryNode ("_node_name2" , "_node_id2" , new TransportAddress (InetAddress .getLoopbackAddress (), 9301 ),
537
- nodeAttr , Collections . emptySet () , Version .CURRENT ))
543
+ nodeAttr , ROLES_WITH_ML , Version .CURRENT ))
538
544
.build ();
539
545
540
546
PersistentTasksCustomMetadata .Builder tasksBuilder = PersistentTasksCustomMetadata .builder ();
@@ -577,7 +583,7 @@ public void testSelectLeastLoadedMlNode_reasonsAreInDeterministicOrder() {
577
583
"_node_id1" ,
578
584
new TransportAddress (InetAddress .getLoopbackAddress (), 9300 ),
579
585
nodeAttr ,
580
- Collections . emptySet () ,
586
+ ROLES_WITH_ML ,
581
587
Version .CURRENT
582
588
)
583
589
)
@@ -587,7 +593,7 @@ public void testSelectLeastLoadedMlNode_reasonsAreInDeterministicOrder() {
587
593
"_node_id2" ,
588
594
new TransportAddress (InetAddress .getLoopbackAddress (), 9301 ),
589
595
nodeAttr ,
590
- Collections . emptySet () ,
596
+ ROLES_WITH_ML ,
591
597
Version .CURRENT
592
598
)
593
599
)
@@ -639,9 +645,9 @@ public void testSelectLeastLoadedMlNode_noNodesMatchingModelSnapshotMinVersion()
639
645
Map <String , String > nodeAttr = Map .of (MachineLearning .MACHINE_MEMORY_NODE_ATTR , "1000000000" );
640
646
DiscoveryNodes nodes = DiscoveryNodes .builder ()
641
647
.add (new DiscoveryNode ("_node_name1" , "_node_id1" , new TransportAddress (InetAddress .getLoopbackAddress (), 9300 ),
642
- nodeAttr , Collections . emptySet () , Version .fromString ("6.2.0" )))
648
+ nodeAttr , ROLES_WITH_ML , Version .fromString ("6.2.0" )))
643
649
.add (new DiscoveryNode ("_node_name2" , "_node_id2" , new TransportAddress (InetAddress .getLoopbackAddress (), 9301 ),
644
- nodeAttr , Collections . emptySet () , Version .fromString ("6.1.0" )))
650
+ nodeAttr , ROLES_WITH_ML , Version .fromString ("6.1.0" )))
645
651
.build ();
646
652
647
653
PersistentTasksCustomMetadata .Builder tasksBuilder = PersistentTasksCustomMetadata .builder ();
@@ -677,9 +683,9 @@ public void testSelectLeastLoadedMlNode_jobWithRules() {
677
683
Map <String , String > nodeAttr = Map .of (MachineLearning .MACHINE_MEMORY_NODE_ATTR , "1000000000" );
678
684
DiscoveryNodes nodes = DiscoveryNodes .builder ()
679
685
.add (new DiscoveryNode ("_node_name1" , "_node_id1" , new TransportAddress (InetAddress .getLoopbackAddress (), 9300 ),
680
- nodeAttr , Collections . emptySet () , Version .fromString ("6.2.0" )))
686
+ nodeAttr , ROLES_WITH_ML , Version .fromString ("6.2.0" )))
681
687
.add (new DiscoveryNode ("_node_name2" , "_node_id2" , new TransportAddress (InetAddress .getLoopbackAddress (), 9301 ),
682
- nodeAttr , Collections . emptySet () , Version .fromString ("6.4.0" )))
688
+ nodeAttr , ROLES_WITH_ML , Version .fromString ("6.4.0" )))
683
689
.build ();
684
690
685
691
PersistentTasksCustomMetadata .Builder tasksBuilder = PersistentTasksCustomMetadata .builder ();
@@ -711,9 +717,9 @@ public void testSelectMlNodeOnlyOutOfCandidates() {
711
717
Map <String , String > nodeAttr = Map .of (MachineLearning .MACHINE_MEMORY_NODE_ATTR , "1000000000" );
712
718
DiscoveryNodes nodes = DiscoveryNodes .builder ()
713
719
.add (new DiscoveryNode ("_node_name1" , "_node_id1" , new TransportAddress (InetAddress .getLoopbackAddress (), 9300 ),
714
- nodeAttr , Collections . emptySet () , Version .CURRENT ))
720
+ nodeAttr , ROLES_WITH_ML , Version .CURRENT ))
715
721
.add (new DiscoveryNode ("_node_name2" , "_node_id2" , new TransportAddress (InetAddress .getLoopbackAddress (), 9301 ),
716
- nodeAttr , Collections . emptySet () , Version .CURRENT ))
722
+ nodeAttr , ROLES_WITH_ML , Version .CURRENT ))
717
723
.build ();
718
724
719
725
PersistentTasksCustomMetadata .Builder tasksBuilder = PersistentTasksCustomMetadata .builder ();
@@ -745,9 +751,9 @@ public void testSelectMlNodeOnlyOutOfCandidates() {
745
751
public void testConsiderLazyAssignmentWithNoLazyNodes () {
746
752
DiscoveryNodes nodes = DiscoveryNodes .builder ()
747
753
.add (new DiscoveryNode ("_node_name1" , "_node_id1" , new TransportAddress (InetAddress .getLoopbackAddress (), 9300 ),
748
- Collections .emptyMap (), Collections . emptySet () , Version .CURRENT ))
754
+ Collections .emptyMap (), ROLES_WITHOUT_ML , Version .CURRENT ))
749
755
.add (new DiscoveryNode ("_node_name2" , "_node_id2" , new TransportAddress (InetAddress .getLoopbackAddress (), 9301 ),
750
- Collections .emptyMap (), Collections . emptySet () , Version .CURRENT ))
756
+ Collections .emptyMap (), ROLES_WITHOUT_ML , Version .CURRENT ))
751
757
.build ();
752
758
753
759
ClusterState .Builder cs = ClusterState .builder (new ClusterName ("_name" ));
@@ -769,9 +775,9 @@ public void testConsiderLazyAssignmentWithNoLazyNodes() {
769
775
public void testConsiderLazyAssignmentWithLazyNodes () {
770
776
DiscoveryNodes nodes = DiscoveryNodes .builder ()
771
777
.add (new DiscoveryNode ("_node_name1" , "_node_id1" , new TransportAddress (InetAddress .getLoopbackAddress (), 9300 ),
772
- Collections .emptyMap (), Collections . emptySet () , Version .CURRENT ))
778
+ Collections .emptyMap (), ROLES_WITHOUT_ML , Version .CURRENT ))
773
779
.add (new DiscoveryNode ("_node_name2" , "_node_id2" , new TransportAddress (InetAddress .getLoopbackAddress (), 9301 ),
774
- Collections .emptyMap (), Collections . emptySet () , Version .CURRENT ))
780
+ Collections .emptyMap (), ROLES_WITHOUT_ML , Version .CURRENT ))
775
781
.build ();
776
782
777
783
ClusterState .Builder cs = ClusterState .builder (new ClusterName ("_name" ));
@@ -823,7 +829,7 @@ public void testMaximumPossibleNodeMemoryTooSmall() {
823
829
public void testPerceivedCapacityAndMaxFreeMemory () {
824
830
DiscoveryNodes nodes = DiscoveryNodes .builder ()
825
831
.add (new DiscoveryNode ("not_ml_node_name" , "_node_id" , new TransportAddress (InetAddress .getLoopbackAddress (), 9300 ),
826
- Collections .emptyMap (), Collections . emptySet () , Version .CURRENT ))
832
+ Collections .emptyMap (), ROLES_WITHOUT_ML , Version .CURRENT ))
827
833
.add (new DiscoveryNode (
828
834
"filled_ml_node_name" ,
829
835
"filled_ml_node_id" ,
@@ -832,7 +838,7 @@ public void testPerceivedCapacityAndMaxFreeMemory() {
832
838
.put (MachineLearning .MAX_JVM_SIZE_NODE_ATTR , "10" )
833
839
.put (MachineLearning .MACHINE_MEMORY_NODE_ATTR , Long .toString (ByteSizeValue .ofGb (30 ).getBytes ()))
834
840
.map (),
835
- Collections . emptySet () ,
841
+ ROLES_WITH_ML ,
836
842
Version .CURRENT ))
837
843
.add (new DiscoveryNode ("not_filled_ml_node" ,
838
844
"not_filled_ml_node_id" ,
@@ -841,7 +847,7 @@ public void testPerceivedCapacityAndMaxFreeMemory() {
841
847
.put (MachineLearning .MAX_JVM_SIZE_NODE_ATTR , "10" )
842
848
.put (MachineLearning .MACHINE_MEMORY_NODE_ATTR , Long .toString (ByteSizeValue .ofGb (30 ).getBytes ()))
843
849
.map (),
844
- Collections . emptySet () ,
850
+ ROLES_WITH_ML ,
845
851
Version .CURRENT ))
846
852
.add (new DiscoveryNode ("not_filled_smaller_ml_node" ,
847
853
"not_filled_smaller_ml_node_id" ,
@@ -850,7 +856,7 @@ public void testPerceivedCapacityAndMaxFreeMemory() {
850
856
.put (MachineLearning .MAX_JVM_SIZE_NODE_ATTR , "10" )
851
857
.put (MachineLearning .MACHINE_MEMORY_NODE_ATTR , Long .toString (ByteSizeValue .ofGb (10 ).getBytes ()))
852
858
.map (),
853
- Collections . emptySet () ,
859
+ ROLES_WITH_ML ,
854
860
Version .CURRENT ))
855
861
.build ();
856
862
@@ -894,7 +900,7 @@ private ClusterState.Builder fillNodesWithRunningJobs(Map<String, String> nodeAt
894
900
for (int i = 0 ; i < numNodes ; i ++) {
895
901
String nodeId = "_node_id" + i ;
896
902
TransportAddress address = new TransportAddress (InetAddress .getLoopbackAddress (), 9300 + i );
897
- nodes .add (new DiscoveryNode ("_node_name" + i , nodeId , address , nodeAttr , Collections . emptySet () , Version .CURRENT ));
903
+ nodes .add (new DiscoveryNode ("_node_name" + i , nodeId , address , nodeAttr , ROLES_WITH_ML , Version .CURRENT ));
898
904
for (int j = 0 ; j < numRunningJobsPerNode ; j ++) {
899
905
int id = j + (numRunningJobsPerNode * i );
900
906
// Both anomaly detector jobs and data frame analytics jobs should count towards the limit
0 commit comments