@@ -57,25 +57,37 @@ public class IndexingMemoryController implements IndexingOperationListener, Clos
57
57
public static final Setting <ByteSizeValue > INDEX_BUFFER_SIZE_SETTING =
58
58
Setting .memorySizeSetting ("indices.memory.index_buffer_size" , "10%" , Property .NodeScope );
59
59
60
- /** Only applies when <code>indices.memory.index_buffer_size</code> is a %, to set a floor on the actual size in bytes (default: 48 MB). */
61
- public static final Setting <ByteSizeValue > MIN_INDEX_BUFFER_SIZE_SETTING = Setting .byteSizeSetting ("indices.memory.min_index_buffer_size" ,
62
- new ByteSizeValue (48 , ByteSizeUnit .MB ),
63
- new ByteSizeValue (0 , ByteSizeUnit .BYTES ),
64
- new ByteSizeValue (Long .MAX_VALUE , ByteSizeUnit .BYTES ),
65
- Property .NodeScope );
66
-
67
- /** Only applies when <code>indices.memory.index_buffer_size</code> is a %, to set a ceiling on the actual size in bytes (default: not set). */
68
- public static final Setting <ByteSizeValue > MAX_INDEX_BUFFER_SIZE_SETTING = Setting .byteSizeSetting ("indices.memory.max_index_buffer_size" ,
69
- new ByteSizeValue (-1 ),
70
- new ByteSizeValue (-1 ),
71
- new ByteSizeValue (Long .MAX_VALUE , ByteSizeUnit .BYTES ),
72
- Property .NodeScope );
73
-
74
- /** If we see no indexing operations after this much time for a given shard, we consider that shard inactive (default: 5 minutes). */
75
- public static final Setting <TimeValue > SHARD_INACTIVE_TIME_SETTING = Setting .positiveTimeSetting ("indices.memory.shard_inactive_time" , TimeValue .timeValueMinutes (5 ), Property .NodeScope );
60
+ /** Only applies when <code>indices.memory.index_buffer_size</code> is a %,
61
+ * to set a floor on the actual size in bytes (default: 48 MB). */
62
+ public static final Setting <ByteSizeValue > MIN_INDEX_BUFFER_SIZE_SETTING = Setting .byteSizeSetting (
63
+ "indices.memory.min_index_buffer_size" ,
64
+ new ByteSizeValue (48 , ByteSizeUnit .MB ),
65
+ new ByteSizeValue (0 , ByteSizeUnit .BYTES ),
66
+ new ByteSizeValue (Long .MAX_VALUE , ByteSizeUnit .BYTES ),
67
+ Property .NodeScope );
68
+
69
+ /** Only applies when <code>indices.memory.index_buffer_size</code> is a %,
70
+ * to set a ceiling on the actual size in bytes (default: not set). */
71
+ public static final Setting <ByteSizeValue > MAX_INDEX_BUFFER_SIZE_SETTING = Setting .byteSizeSetting (
72
+ "indices.memory.max_index_buffer_size" ,
73
+ new ByteSizeValue (-1 ),
74
+ new ByteSizeValue (-1 ),
75
+ new ByteSizeValue (Long .MAX_VALUE , ByteSizeUnit .BYTES ),
76
+ Property .NodeScope );
77
+
78
+ /** If we see no indexing operations after this much time for a given shard,
79
+ * we consider that shard inactive (default: 5 minutes). */
80
+ public static final Setting <TimeValue > SHARD_INACTIVE_TIME_SETTING = Setting .positiveTimeSetting (
81
+ "indices.memory.shard_inactive_time" ,
82
+ TimeValue .timeValueMinutes (5 ),
83
+ Property .NodeScope
84
+ );
76
85
77
86
/** How frequently we check indexing memory usage (default: 5 seconds). */
78
- public static final Setting <TimeValue > SHARD_MEMORY_INTERVAL_TIME_SETTING = Setting .positiveTimeSetting ("indices.memory.interval" , TimeValue .timeValueSeconds (5 ), Property .NodeScope );
87
+ public static final Setting <TimeValue > SHARD_MEMORY_INTERVAL_TIME_SETTING = Setting .positiveTimeSetting (
88
+ "indices.memory.interval" ,
89
+ TimeValue .timeValueSeconds (5 ),
90
+ Property .NodeScope );
79
91
80
92
private final ThreadPool threadPool ;
81
93
@@ -251,10 +263,11 @@ public void bytesWritten(int bytes) {
251
263
totalBytes = bytesWrittenSinceCheck .get ();
252
264
if (totalBytes > indexingBuffer .getBytes ()/30 ) {
253
265
bytesWrittenSinceCheck .addAndGet (-totalBytes );
254
- // NOTE: this is only an approximate check, because bytes written is to the translog, vs indexing memory buffer which is
255
- // typically smaller but can be larger in extreme cases (many unique terms). This logic is here only as a safety against
256
- // thread starvation or too infrequent checking, to ensure we are still checking periodically, in proportion to bytes
257
- // processed by indexing:
266
+ // NOTE: this is only an approximate check, because bytes written is to the translog,
267
+ // vs indexing memory buffer which is typically smaller but can be larger in extreme
268
+ // cases (many unique terms). This logic is here only as a safety against thread
269
+ // starvation or too infrequent checking, to ensure we are still checking periodically,
270
+ // in proportion to bytes processed by indexing:
258
271
runUnlocked ();
259
272
}
260
273
} finally {
@@ -313,7 +326,8 @@ private void runUnlocked() {
313
326
314
327
if (logger .isTraceEnabled ()) {
315
328
logger .trace ("total indexing heap bytes used [{}] vs {} [{}], currently writing bytes [{}]" ,
316
- new ByteSizeValue (totalBytesUsed ), INDEX_BUFFER_SIZE_SETTING .getKey (), indexingBuffer , new ByteSizeValue (totalBytesWriting ));
329
+ new ByteSizeValue (totalBytesUsed ), INDEX_BUFFER_SIZE_SETTING .getKey (), indexingBuffer ,
330
+ new ByteSizeValue (totalBytesWriting ));
317
331
}
318
332
319
333
// If we are using more than 50% of our budget across both indexing buffer and bytes we are still moving to disk, then we now
@@ -343,7 +357,8 @@ private void runUnlocked() {
343
357
if (shardBytesUsed > 0 ) {
344
358
if (logger .isTraceEnabled ()) {
345
359
if (shardWritingBytes != 0 ) {
346
- logger .trace ("shard [{}] is using [{}] heap, writing [{}] heap" , shard .shardId (), shardBytesUsed , shardWritingBytes );
360
+ logger .trace ("shard [{}] is using [{}] heap, writing [{}] heap" , shard .shardId (), shardBytesUsed ,
361
+ shardWritingBytes );
347
362
} else {
348
363
logger .trace ("shard [{}] is using [{}] heap, not writing any bytes" , shard .shardId (), shardBytesUsed );
349
364
}
@@ -352,12 +367,14 @@ private void runUnlocked() {
352
367
}
353
368
}
354
369
355
- logger .debug ("now write some indexing buffers: total indexing heap bytes used [{}] vs {} [{}], currently writing bytes [{}], [{}] shards with non-zero indexing buffer" ,
356
- new ByteSizeValue (totalBytesUsed ), INDEX_BUFFER_SIZE_SETTING .getKey (), indexingBuffer , new ByteSizeValue (totalBytesWriting ), queue .size ());
370
+ logger .debug ("now write some indexing buffers: total indexing heap bytes used [{}] vs {} [{}], " +
371
+ "currently writing bytes [{}], [{}] shards with non-zero indexing buffer" , new ByteSizeValue (totalBytesUsed ),
372
+ INDEX_BUFFER_SIZE_SETTING .getKey (), indexingBuffer , new ByteSizeValue (totalBytesWriting ), queue .size ());
357
373
358
374
while (totalBytesUsed > indexingBuffer .getBytes () && queue .isEmpty () == false ) {
359
375
ShardAndBytesUsed largest = queue .poll ();
360
- logger .debug ("write indexing buffer to disk for shard [{}] to free up its [{}] indexing buffer" , largest .shard .shardId (), new ByteSizeValue (largest .bytesUsed ));
376
+ logger .debug ("write indexing buffer to disk for shard [{}] to free up its [{}] indexing buffer" ,
377
+ largest .shard .shardId (), new ByteSizeValue (largest .bytesUsed ));
361
378
writeIndexingBufferAsync (largest .shard );
362
379
totalBytesUsed -= largest .bytesUsed ;
363
380
if (doThrottle && throttled .contains (largest .shard ) == false ) {
0 commit comments