27
27
import org .elasticsearch .cluster .ClusterInfo ;
28
28
import org .elasticsearch .cluster .DiskUsage ;
29
29
import org .elasticsearch .cluster .metadata .IndexMetaData ;
30
+ import org .elasticsearch .cluster .metadata .MetaData ;
30
31
import org .elasticsearch .cluster .routing .IndexShardRoutingTable ;
31
32
import org .elasticsearch .cluster .routing .RecoverySource ;
32
33
import org .elasticsearch .cluster .routing .RoutingNode ;
34
+ import org .elasticsearch .cluster .routing .RoutingTable ;
33
35
import org .elasticsearch .cluster .routing .ShardRouting ;
34
36
import org .elasticsearch .cluster .routing .ShardRoutingState ;
35
37
import org .elasticsearch .cluster .routing .allocation .DiskThresholdSettings ;
@@ -86,10 +88,9 @@ public DiskThresholdDecider(Settings settings, ClusterSettings clusterSettings)
86
88
*
87
89
* If subtractShardsMovingAway is true then the size of shards moving away is subtracted from the total size of all shards
88
90
*/
89
- static long sizeOfRelocatingShards (RoutingNode node , RoutingAllocation allocation ,
90
- boolean subtractShardsMovingAway , String dataPath ) {
91
- ClusterInfo clusterInfo = allocation .clusterInfo ();
92
- long totalSize = 0 ;
91
+ public static long sizeOfRelocatingShards (RoutingNode node , boolean subtractShardsMovingAway , String dataPath , ClusterInfo clusterInfo ,
92
+ MetaData metaData , RoutingTable routingTable ) {
93
+ long totalSize = 0L ;
93
94
94
95
for (ShardRouting routing : node .shardsWithState (ShardRoutingState .INITIALIZING )) {
95
96
if (routing .relocatingNodeId () == null ) {
@@ -103,7 +104,7 @@ static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocatio
103
104
// if we don't yet know the actual path of the incoming shard then conservatively assume it's going to the path with the least
104
105
// free space
105
106
if (actualPath == null || actualPath .equals (dataPath )) {
106
- totalSize += getExpectedShardSize (routing , allocation , 0 );
107
+ totalSize += getExpectedShardSize (routing , 0L , clusterInfo , metaData , routingTable );
107
108
}
108
109
}
109
110
@@ -115,7 +116,7 @@ static long sizeOfRelocatingShards(RoutingNode node, RoutingAllocation allocatio
115
116
actualPath = clusterInfo .getDataPath (routing .cancelRelocation ());
116
117
}
117
118
if (dataPath .equals (actualPath )) {
118
- totalSize -= getExpectedShardSize (routing , allocation , 0 );
119
+ totalSize -= getExpectedShardSize (routing , 0L , clusterInfo , metaData , routingTable );
119
120
}
120
121
}
121
122
}
@@ -239,7 +240,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
239
240
}
240
241
241
242
// Secondly, check that allocating the shard to this node doesn't put it above the high watermark
242
- final long shardSize = getExpectedShardSize (shardRouting , allocation , 0 );
243
+ final long shardSize = getExpectedShardSize (shardRouting , 0L ,
244
+ allocation .clusterInfo (), allocation .metaData (), allocation .routingTable ());
243
245
double freeSpaceAfterShard = freeDiskPercentageAfterShardAssigned (usage , shardSize );
244
246
long freeBytesAfterShard = freeBytes - shardSize ;
245
247
if (freeBytesAfterShard < diskThresholdSettings .getFreeBytesThresholdHigh ().getBytes ()) {
@@ -339,7 +341,8 @@ private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation,
339
341
node .nodeId (), usage .getTotalBytes (), usage .getFreeBytes (), usage .getFreeDiskAsPercentage ());
340
342
}
341
343
342
- final long relocatingShardsSize = sizeOfRelocatingShards (node , allocation , subtractLeavingShards , usage .getPath ());
344
+ final long relocatingShardsSize = sizeOfRelocatingShards (node , subtractLeavingShards , usage .getPath (),
345
+ allocation .clusterInfo (), allocation .metaData (), allocation .routingTable ());
343
346
final DiskUsage usageIncludingRelocations = new DiskUsage (node .nodeId (), node .node ().getName (), usage .getPath (),
344
347
usage .getTotalBytes (), usage .getFreeBytes () - relocatingShardsSize );
345
348
logger .trace ("getDiskUsage: usage [{}] with [{}] bytes relocating yields [{}]" ,
@@ -418,29 +421,28 @@ private Decision earlyTerminate(RoutingAllocation allocation, ImmutableOpenMap<S
418
421
* Returns the expected shard size for the given shard or the default value provided if not enough information are available
419
422
* to estimate the shards size.
420
423
*/
421
- public static long getExpectedShardSize (ShardRouting shard , RoutingAllocation allocation , long defaultValue ) {
422
- final IndexMetaData metaData = allocation . metaData (). getIndexSafe ( shard . index ());
423
- final ClusterInfo info = allocation . clusterInfo ( );
424
- if (metaData .getResizeSourceIndex () != null && shard .active () == false &&
424
+ public static long getExpectedShardSize (ShardRouting shard , long defaultValue , ClusterInfo clusterInfo , MetaData metaData ,
425
+ RoutingTable routingTable ) {
426
+ final IndexMetaData indexMetaData = metaData . getIndexSafe ( shard . index () );
427
+ if (indexMetaData .getResizeSourceIndex () != null && shard .active () == false &&
425
428
shard .recoverySource ().getType () == RecoverySource .Type .LOCAL_SHARDS ) {
426
429
// in the shrink index case we sum up the source index shards since we basically make a copy of the shard in
427
430
// the worst case
428
431
long targetShardSize = 0 ;
429
- final Index mergeSourceIndex = metaData .getResizeSourceIndex ();
430
- final IndexMetaData sourceIndexMeta = allocation . metaData () .index (mergeSourceIndex );
432
+ final Index mergeSourceIndex = indexMetaData .getResizeSourceIndex ();
433
+ final IndexMetaData sourceIndexMeta = metaData .index (mergeSourceIndex );
431
434
if (sourceIndexMeta != null ) {
432
435
final Set <ShardId > shardIds = IndexMetaData .selectRecoverFromShards (shard .id (),
433
- sourceIndexMeta , metaData .getNumberOfShards ());
434
- for (IndexShardRoutingTable shardRoutingTable : allocation . routingTable () .index (mergeSourceIndex .getName ())) {
436
+ sourceIndexMeta , indexMetaData .getNumberOfShards ());
437
+ for (IndexShardRoutingTable shardRoutingTable : routingTable .index (mergeSourceIndex .getName ())) {
435
438
if (shardIds .contains (shardRoutingTable .shardId ())) {
436
- targetShardSize += info .getShardSize (shardRoutingTable .primaryShard (), 0 );
439
+ targetShardSize += clusterInfo .getShardSize (shardRoutingTable .primaryShard (), 0 );
437
440
}
438
441
}
439
442
}
440
443
return targetShardSize == 0 ? defaultValue : targetShardSize ;
441
444
} else {
442
- return info .getShardSize (shard , defaultValue );
445
+ return clusterInfo .getShardSize (shard , defaultValue );
443
446
}
444
-
445
447
}
446
448
}
0 commit comments