19
19
20
20
package org .elasticsearch .cluster .routing .allocation .decider ;
21
21
22
- import java .util .Set ;
23
-
24
22
import com .carrotsearch .hppc .cursors .ObjectCursor ;
25
23
import org .apache .logging .log4j .LogManager ;
26
24
import org .apache .logging .log4j .Logger ;
44
42
import org .elasticsearch .index .Index ;
45
43
import org .elasticsearch .index .shard .ShardId ;
46
44
45
+ import java .util .Set ;
46
+
47
47
import static org .elasticsearch .cluster .routing .allocation .DiskThresholdSettings .CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING ;
48
48
import static org .elasticsearch .cluster .routing .allocation .DiskThresholdSettings .CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING ;
49
49
@@ -139,12 +139,25 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
139
139
140
140
// subtractLeavingShards is passed as false here, because they still use disk space, and therefore should we should be extra careful
141
141
// and take the size into account
142
- DiskUsage usage = getDiskUsage (node , allocation , usages , false );
142
+ final DiskUsageWithRelocations usage = getDiskUsage (node , allocation , usages , false );
143
143
// First, check that the node currently over the low watermark
144
144
double freeDiskPercentage = usage .getFreeDiskAsPercentage ();
145
145
// Cache the used disk percentage for displaying disk percentages consistent with documentation
146
146
double usedDiskPercentage = usage .getUsedDiskAsPercentage ();
147
147
long freeBytes = usage .getFreeBytes ();
148
+ if (freeBytes < 0L ) {
149
+ final long sizeOfRelocatingShards = sizeOfRelocatingShards (node , false , usage .getPath (),
150
+ allocation .clusterInfo (), allocation .metaData (), allocation .routingTable ());
151
+ logger .debug ("fewer free bytes remaining than the size of all incoming shards: " +
152
+ "usage {} on node {} including {} bytes of relocations, preventing allocation" ,
153
+ usage , node .nodeId (), sizeOfRelocatingShards );
154
+
155
+ return allocation .decision (Decision .NO , NAME ,
156
+ "the node has fewer free bytes remaining than the total size of all incoming shards: " +
157
+ "free space [%sB], relocating shards [%sB]" ,
158
+ freeBytes + sizeOfRelocatingShards , sizeOfRelocatingShards );
159
+ }
160
+
148
161
ByteSizeValue freeBytesValue = new ByteSizeValue (freeBytes );
149
162
if (logger .isTraceEnabled ()) {
150
163
logger .trace ("node [{}] has {}% used disk" , node .nodeId (), usedDiskPercentage );
@@ -242,6 +255,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
242
255
// Secondly, check that allocating the shard to this node doesn't put it above the high watermark
243
256
final long shardSize = getExpectedShardSize (shardRouting , 0L ,
244
257
allocation .clusterInfo (), allocation .metaData (), allocation .routingTable ());
258
+ assert shardSize >= 0 : shardSize ;
245
259
double freeSpaceAfterShard = freeDiskPercentageAfterShardAssigned (usage , shardSize );
246
260
long freeBytesAfterShard = freeBytes - shardSize ;
247
261
if (freeBytesAfterShard < diskThresholdSettings .getFreeBytesThresholdHigh ().getBytes ()) {
@@ -268,6 +282,7 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
268
282
diskThresholdSettings .getHighWatermarkRaw (), usedDiskThresholdHigh , freeSpaceAfterShard );
269
283
}
270
284
285
+ assert freeBytesAfterShard >= 0 : freeBytesAfterShard ;
271
286
return allocation .decision (Decision .YES , NAME ,
272
287
"enough disk for shard on node, free: [%s], shard size: [%s], free after allocating shard: [%s]" ,
273
288
freeBytesValue ,
@@ -289,7 +304,7 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
289
304
290
305
// subtractLeavingShards is passed as true here, since this is only for shards remaining, we will *eventually* have enough disk
291
306
// since shards are moving away. No new shards will be incoming since in canAllocate we pass false for this check.
292
- final DiskUsage usage = getDiskUsage (node , allocation , usages , true );
307
+ final DiskUsageWithRelocations usage = getDiskUsage (node , allocation , usages , true );
293
308
final String dataPath = clusterInfo .getDataPath (shardRouting );
294
309
// If this node is already above the high threshold, the shard cannot remain (get it off!)
295
310
final double freeDiskPercentage = usage .getFreeDiskAsPercentage ();
@@ -301,6 +316,17 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
301
316
return allocation .decision (Decision .YES , NAME ,
302
317
"this shard is not allocated on the most utilized disk and can remain" );
303
318
}
319
+ if (freeBytes < 0L ) {
320
+ final long sizeOfRelocatingShards = sizeOfRelocatingShards (node , false , usage .getPath (),
321
+ allocation .clusterInfo (), allocation .metaData (), allocation .routingTable ());
322
+ logger .debug ("fewer free bytes remaining than the size of all incoming shards: " +
323
+ "usage {} on node {} including {} bytes of relocations, shard cannot remain" ,
324
+ usage , node .nodeId (), sizeOfRelocatingShards );
325
+ return allocation .decision (Decision .NO , NAME ,
326
+ "the shard cannot remain on this node because the node has fewer free bytes remaining than the total size of all " +
327
+ "incoming shards: free space [%s], relocating shards [%s]" ,
328
+ freeBytes + sizeOfRelocatingShards , sizeOfRelocatingShards );
329
+ }
304
330
if (freeBytes < diskThresholdSettings .getFreeBytesThresholdHigh ().getBytes ()) {
305
331
if (logger .isDebugEnabled ()) {
306
332
logger .debug ("less than the required {} free bytes threshold ({} bytes free) on node {}, shard cannot remain" ,
@@ -330,8 +356,8 @@ public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAl
330
356
"there is enough disk on this node for the shard to remain, free: [%s]" , new ByteSizeValue (freeBytes ));
331
357
}
332
358
333
- private DiskUsage getDiskUsage (RoutingNode node , RoutingAllocation allocation ,
334
- ImmutableOpenMap <String , DiskUsage > usages , boolean subtractLeavingShards ) {
359
+ private DiskUsageWithRelocations getDiskUsage (RoutingNode node , RoutingAllocation allocation ,
360
+ ImmutableOpenMap <String , DiskUsage > usages , boolean subtractLeavingShards ) {
335
361
DiskUsage usage = usages .get (node .nodeId ());
336
362
if (usage == null ) {
337
363
// If there is no usage, and we have other nodes in the cluster,
@@ -343,18 +369,14 @@ private DiskUsage getDiskUsage(RoutingNode node, RoutingAllocation allocation,
343
369
}
344
370
}
345
371
346
- if (diskThresholdSettings .includeRelocations ()) {
347
- final long relocatingShardsSize = sizeOfRelocatingShards (node , subtractLeavingShards , usage .getPath (),
348
- allocation .clusterInfo (), allocation .metaData (), allocation .routingTable ());
349
- DiskUsage usageIncludingRelocations = new DiskUsage (node .nodeId (), node .node ().getName (), usage .getPath (),
350
- usage .getTotalBytes (), usage .getFreeBytes () - relocatingShardsSize );
351
- if (logger .isTraceEnabled ()) {
352
- logger .trace ("usage without relocations: {}" , usage );
353
- logger .trace ("usage with relocations: [{} bytes] {}" , relocatingShardsSize , usageIncludingRelocations );
354
- }
355
- usage = usageIncludingRelocations ;
372
+ final DiskUsageWithRelocations diskUsageWithRelocations = new DiskUsageWithRelocations (usage ,
373
+ diskThresholdSettings .includeRelocations () ? sizeOfRelocatingShards (node , subtractLeavingShards , usage .getPath (),
374
+ allocation .clusterInfo (), allocation .metaData (), allocation .routingTable ()) : 0 );
375
+ if (logger .isTraceEnabled ()) {
376
+ logger .trace ("getDiskUsage(subtractLeavingShards={}) returning {}" , subtractLeavingShards , diskUsageWithRelocations );
356
377
}
357
- return usage ;
378
+
379
+ return diskUsageWithRelocations ;
358
380
}
359
381
360
382
/**
@@ -384,7 +406,7 @@ DiskUsage averageUsage(RoutingNode node, ImmutableOpenMap<String, DiskUsage> usa
384
406
* @param shardSize Size in bytes of the shard
385
407
* @return Percentage of free space after the shard is assigned to the node
386
408
*/
387
- double freeDiskPercentageAfterShardAssigned (DiskUsage usage , Long shardSize ) {
409
+ double freeDiskPercentageAfterShardAssigned (DiskUsageWithRelocations usage , Long shardSize ) {
388
410
shardSize = (shardSize == null ) ? 0 : shardSize ;
389
411
DiskUsage newUsage = new DiskUsage (usage .getNodeId (), usage .getNodeName (), usage .getPath (),
390
412
usage .getTotalBytes (), usage .getFreeBytes () - shardSize );
@@ -452,4 +474,59 @@ public static long getExpectedShardSize(ShardRouting shard, long defaultValue, C
452
474
return clusterInfo .getShardSize (shard , defaultValue );
453
475
}
454
476
}
477
+
478
+ static class DiskUsageWithRelocations {
479
+
480
+ private final DiskUsage diskUsage ;
481
+ private final long relocatingShardSize ;
482
+
483
+ DiskUsageWithRelocations (DiskUsage diskUsage , long relocatingShardSize ) {
484
+ this .diskUsage = diskUsage ;
485
+ this .relocatingShardSize = relocatingShardSize ;
486
+ }
487
+
488
+ @ Override
489
+ public String toString () {
490
+ return "DiskUsageWithRelocations{" +
491
+ "diskUsage=" + diskUsage +
492
+ ", relocatingShardSize=" + relocatingShardSize +
493
+ '}' ;
494
+ }
495
+
496
+ double getFreeDiskAsPercentage () {
497
+ if (getTotalBytes () == 0L ) {
498
+ return 100.0 ;
499
+ }
500
+ return 100.0 * ((double )getFreeBytes () / getTotalBytes ());
501
+ }
502
+
503
+ double getUsedDiskAsPercentage () {
504
+ return 100.0 - getFreeDiskAsPercentage ();
505
+ }
506
+
507
+ long getFreeBytes () {
508
+ try {
509
+ return Math .subtractExact (diskUsage .getFreeBytes (), relocatingShardSize );
510
+ } catch (ArithmeticException e ) {
511
+ return Long .MAX_VALUE ;
512
+ }
513
+ }
514
+
515
+ String getPath () {
516
+ return diskUsage .getPath ();
517
+ }
518
+
519
+ String getNodeId () {
520
+ return diskUsage .getNodeId ();
521
+ }
522
+
523
+ String getNodeName () {
524
+ return diskUsage .getNodeName ();
525
+ }
526
+
527
+ long getTotalBytes () {
528
+ return diskUsage .getTotalBytes ();
529
+ }
530
+ }
531
+
455
532
}
0 commit comments