44
44
import org .elasticsearch .common .logging .Loggers ;
45
45
import org .elasticsearch .common .settings .ClusterSettings ;
46
46
import org .elasticsearch .common .settings .Settings ;
47
+ import org .elasticsearch .common .util .Maps ;
47
48
import org .elasticsearch .index .shard .ShardId ;
48
49
import org .elasticsearch .snapshots .SnapshotShardSizeInfo ;
49
50
import org .elasticsearch .test .ESTestCase ;
72
73
import static org .hamcrest .Matchers .aMapWithSize ;
73
74
import static org .hamcrest .Matchers .allOf ;
74
75
import static org .hamcrest .Matchers .equalTo ;
76
+ import static org .hamcrest .Matchers .everyItem ;
75
77
import static org .hamcrest .Matchers .hasEntry ;
78
+ import static org .hamcrest .Matchers .lessThanOrEqualTo ;
76
79
import static org .mockito .Mockito .mock ;
77
80
import static org .mockito .Mockito .when ;
78
81
@@ -578,7 +581,7 @@ public void testDesiredBalanceShouldConvergeInABigCluster() {
578
581
var nodes = randomIntBetween (3 , 7 );
579
582
var nodeIds = new ArrayList <String >(nodes );
580
583
var discoveryNodesBuilder = DiscoveryNodes .builder ();
581
- var usedDiskSpace = new HashMap <String , Long >( );
584
+ var usedDiskSpace = Maps . <String , Long >newMapWithExpectedSize ( nodes );
582
585
for (int node = 0 ; node < nodes ; node ++) {
583
586
var nodeId = "node-" + node ;
584
587
nodeIds .add (nodeId );
@@ -588,6 +591,7 @@ public void testDesiredBalanceShouldConvergeInABigCluster() {
588
591
589
592
var indices = scaledRandomIntBetween (1 , 500 );
590
593
var totalShards = 0 ;
594
+ var totalShardsSize = 0L ;
591
595
592
596
var shardSizes = new HashMap <String , Long >();
593
597
var dataPath = new HashMap <NodeAndShard , String >();
@@ -626,6 +630,7 @@ public void testDesiredBalanceShouldConvergeInABigCluster() {
626
630
627
631
var primaryNodeId = pickAndRemoveRandomValueFrom (remainingNodeIds );
628
632
shardSizes .put (ClusterInfo .shardIdentifierFromRouting (shardId , true ), thisShardSize );
633
+ totalShardsSize += thisShardSize ;
629
634
if (primaryNodeId != null ) {
630
635
dataPath .put (new NodeAndShard (primaryNodeId , shardId ), "/data" );
631
636
usedDiskSpace .compute (primaryNodeId , (k , v ) -> v + thisShardSize );
@@ -644,9 +649,10 @@ public void testDesiredBalanceShouldConvergeInABigCluster() {
644
649
for (int replica = 0 ; replica < replicas ; replica ++) {
645
650
var replicaNodeId = primaryNodeId == null ? null : pickAndRemoveRandomValueFrom (remainingNodeIds );
646
651
shardSizes .put (ClusterInfo .shardIdentifierFromRouting (shardId , false ), thisShardSize );
652
+ totalShardsSize += thisShardSize ;
647
653
if (replicaNodeId != null ) {
648
654
dataPath .put (new NodeAndShard (replicaNodeId , shardId ), "/data" );
649
- usedDiskSpace .compute (primaryNodeId , (k , v ) -> v + thisShardSize );
655
+ usedDiskSpace .compute (replicaNodeId , (k , v ) -> v + thisShardSize );
650
656
}
651
657
652
658
indexRoutingTableBuilder .addShard (
@@ -675,7 +681,9 @@ public void testDesiredBalanceShouldConvergeInABigCluster() {
675
681
676
682
var iteration = new AtomicInteger (0 );
677
683
678
- long diskSize = usedDiskSpace .values ().stream ().max (Long ::compare ).get () * 125 / 100 ;
684
+ long diskSize = Math .max (totalShardsSize / nodes , usedDiskSpace .values ().stream ().max (Long ::compare ).get ()) * 120 / 100 ;
685
+ assertTrue ("Should have enough space for all shards" , diskSize * nodes > totalShardsSize );
686
+
679
687
var diskUsage = usedDiskSpace .entrySet ()
680
688
.stream ()
681
689
.collect (toMap (Map .Entry ::getKey , it -> new DiskUsage (it .getKey (), it .getKey (), "/data" , diskSize , diskSize - it .getValue ())));
@@ -691,32 +699,34 @@ public void testDesiredBalanceShouldConvergeInABigCluster() {
691
699
new BalancedShardsAllocator (settings )
692
700
).compute (DesiredBalance .INITIAL , input , queue (), ignored -> iteration .incrementAndGet () < 1000 );
693
701
694
- try {
695
- assertThat (
696
- "Balance should converge, but exited by the iteration limit" ,
697
- desiredBalance .lastConvergedIndex (),
698
- equalTo (input .index ())
699
- );
700
- logger .info (
701
- "Balance converged after [{}] iterations for [{}] nodes and [{}] total shards" ,
702
- iteration .get (),
703
- nodes ,
704
- totalShards
705
- );
706
- } catch (AssertionError e ) {
707
- logger .error (
708
- "Failed to converge desired balance for [{}] nodes and [{}] total shards:\n {}" ,
709
- nodes ,
710
- totalShards ,
711
- clusterState .getRoutingNodes ().toString ()
702
+ var desiredDiskUsage = Maps .<String , Long >newMapWithExpectedSize (nodes );
703
+ for (var assignment : desiredBalance .assignments ().entrySet ()) {
704
+ var shardSize = Math .min (
705
+ clusterInfo .getShardSize (assignment .getKey (), true ),
706
+ clusterInfo .getShardSize (assignment .getKey (), false )
712
707
);
713
- throw e ;
708
+ for (String nodeId : assignment .getValue ().nodeIds ()) {
709
+ desiredDiskUsage .compute (nodeId , (key , value ) -> (value != null ? value : 0 ) + shardSize );
710
+ }
714
711
}
712
+
713
+ assertThat (
714
+ "Balance should converge, but exited by the iteration limit" ,
715
+ desiredBalance .lastConvergedIndex (),
716
+ equalTo (input .index ())
717
+ );
718
+ logger .info ("Balance converged after [{}] iterations" , iteration .get ());
719
+
720
+ assertThat (
721
+ "All desired disk usages " + desiredDiskUsage + " should be smaller then actual disk sizes: " + diskSize ,
722
+ desiredDiskUsage .values (),
723
+ everyItem (lessThanOrEqualTo (diskSize ))
724
+ );
715
725
}
716
726
717
727
private static long smallShardSizeDeviation (long originalSize ) {
718
- var deviation = randomIntBetween (0 , 50 ) - 100L ;
719
- return originalSize * (1000 + deviation ) / 1000 ;
728
+ var deviation = randomIntBetween (- 5 , 5 ) ;
729
+ return originalSize * (100 + deviation ) / 100 ;
720
730
}
721
731
722
732
private String pickAndRemoveRandomValueFrom (List <String > values ) {
0 commit comments