@@ -54,25 +54,37 @@ public class IndexingMemoryController extends AbstractComponent implements Index
54
54
public static final Setting <ByteSizeValue > INDEX_BUFFER_SIZE_SETTING =
55
55
Setting .memorySizeSetting ("indices.memory.index_buffer_size" , "10%" , Property .NodeScope );
56
56
57
- /** Only applies when <code>indices.memory.index_buffer_size</code> is a %, to set a floor on the actual size in bytes (default: 48 MB). */
58
- public static final Setting <ByteSizeValue > MIN_INDEX_BUFFER_SIZE_SETTING = Setting .byteSizeSetting ("indices.memory.min_index_buffer_size" ,
59
- new ByteSizeValue (48 , ByteSizeUnit .MB ),
60
- new ByteSizeValue (0 , ByteSizeUnit .BYTES ),
61
- new ByteSizeValue (Long .MAX_VALUE , ByteSizeUnit .BYTES ),
62
- Property .NodeScope );
63
-
64
- /** Only applies when <code>indices.memory.index_buffer_size</code> is a %, to set a ceiling on the actual size in bytes (default: not set). */
65
- public static final Setting <ByteSizeValue > MAX_INDEX_BUFFER_SIZE_SETTING = Setting .byteSizeSetting ("indices.memory.max_index_buffer_size" ,
66
- new ByteSizeValue (-1 ),
67
- new ByteSizeValue (-1 ),
68
- new ByteSizeValue (Long .MAX_VALUE , ByteSizeUnit .BYTES ),
69
- Property .NodeScope );
70
-
71
- /** If we see no indexing operations after this much time for a given shard, we consider that shard inactive (default: 5 minutes). */
72
- public static final Setting <TimeValue > SHARD_INACTIVE_TIME_SETTING = Setting .positiveTimeSetting ("indices.memory.shard_inactive_time" , TimeValue .timeValueMinutes (5 ), Property .NodeScope );
57
+ /** Only applies when <code>indices.memory.index_buffer_size</code> is a %,
58
+ * to set a floor on the actual size in bytes (default: 48 MB). */
59
+ public static final Setting <ByteSizeValue > MIN_INDEX_BUFFER_SIZE_SETTING = Setting .byteSizeSetting (
60
+ "indices.memory.min_index_buffer_size" ,
61
+ new ByteSizeValue (48 , ByteSizeUnit .MB ),
62
+ new ByteSizeValue (0 , ByteSizeUnit .BYTES ),
63
+ new ByteSizeValue (Long .MAX_VALUE , ByteSizeUnit .BYTES ),
64
+ Property .NodeScope );
65
+
66
+ /** Only applies when <code>indices.memory.index_buffer_size</code> is a %,
67
+ * to set a ceiling on the actual size in bytes (default: not set). */
68
+ public static final Setting <ByteSizeValue > MAX_INDEX_BUFFER_SIZE_SETTING = Setting .byteSizeSetting (
69
+ "indices.memory.max_index_buffer_size" ,
70
+ new ByteSizeValue (-1 ),
71
+ new ByteSizeValue (-1 ),
72
+ new ByteSizeValue (Long .MAX_VALUE , ByteSizeUnit .BYTES ),
73
+ Property .NodeScope );
74
+
75
+ /** If we see no indexing operations after this much time for a given shard,
76
+ * we consider that shard inactive (default: 5 minutes). */
77
+ public static final Setting <TimeValue > SHARD_INACTIVE_TIME_SETTING = Setting .positiveTimeSetting (
78
+ "indices.memory.shard_inactive_time" ,
79
+ TimeValue .timeValueMinutes (5 ),
80
+ Property .NodeScope
81
+ );
73
82
74
83
/** How frequently we check indexing memory usage (default: 5 seconds). */
75
- public static final Setting <TimeValue > SHARD_MEMORY_INTERVAL_TIME_SETTING = Setting .positiveTimeSetting ("indices.memory.interval" , TimeValue .timeValueSeconds (5 ), Property .NodeScope );
84
+ public static final Setting <TimeValue > SHARD_MEMORY_INTERVAL_TIME_SETTING = Setting .positiveTimeSetting (
85
+ "indices.memory.interval" ,
86
+ TimeValue .timeValueSeconds (5 ),
87
+ Property .NodeScope );
76
88
77
89
private final ThreadPool threadPool ;
78
90
@@ -248,10 +260,11 @@ public void bytesWritten(int bytes) {
248
260
totalBytes = bytesWrittenSinceCheck .get ();
249
261
if (totalBytes > indexingBuffer .getBytes ()/30 ) {
250
262
bytesWrittenSinceCheck .addAndGet (-totalBytes );
251
- // NOTE: this is only an approximate check, because bytes written is to the translog, vs indexing memory buffer which is
252
- // typically smaller but can be larger in extreme cases (many unique terms). This logic is here only as a safety against
253
- // thread starvation or too infrequent checking, to ensure we are still checking periodically, in proportion to bytes
254
- // processed by indexing:
263
+ // NOTE: this is only an approximate check, because bytes written is to the translog,
264
+ // vs indexing memory buffer which is typically smaller but can be larger in extreme
265
+ // cases (many unique terms). This logic is here only as a safety against thread
266
+ // starvation or too infrequent checking, to ensure we are still checking periodically,
267
+ // in proportion to bytes processed by indexing:
255
268
runUnlocked ();
256
269
}
257
270
} finally {
@@ -310,7 +323,8 @@ private void runUnlocked() {
310
323
311
324
if (logger .isTraceEnabled ()) {
312
325
logger .trace ("total indexing heap bytes used [{}] vs {} [{}], currently writing bytes [{}]" ,
313
- new ByteSizeValue (totalBytesUsed ), INDEX_BUFFER_SIZE_SETTING .getKey (), indexingBuffer , new ByteSizeValue (totalBytesWriting ));
326
+ new ByteSizeValue (totalBytesUsed ), INDEX_BUFFER_SIZE_SETTING .getKey (), indexingBuffer ,
327
+ new ByteSizeValue (totalBytesWriting ));
314
328
}
315
329
316
330
// If we are using more than 50% of our budget across both indexing buffer and bytes we are still moving to disk, then we now
@@ -340,7 +354,8 @@ private void runUnlocked() {
340
354
if (shardBytesUsed > 0 ) {
341
355
if (logger .isTraceEnabled ()) {
342
356
if (shardWritingBytes != 0 ) {
343
- logger .trace ("shard [{}] is using [{}] heap, writing [{}] heap" , shard .shardId (), shardBytesUsed , shardWritingBytes );
357
+ logger .trace ("shard [{}] is using [{}] heap, writing [{}] heap" , shard .shardId (), shardBytesUsed ,
358
+ shardWritingBytes );
344
359
} else {
345
360
logger .trace ("shard [{}] is using [{}] heap, not writing any bytes" , shard .shardId (), shardBytesUsed );
346
361
}
@@ -349,12 +364,14 @@ private void runUnlocked() {
349
364
}
350
365
}
351
366
352
- logger .debug ("now write some indexing buffers: total indexing heap bytes used [{}] vs {} [{}], currently writing bytes [{}], [{}] shards with non-zero indexing buffer" ,
353
- new ByteSizeValue (totalBytesUsed ), INDEX_BUFFER_SIZE_SETTING .getKey (), indexingBuffer , new ByteSizeValue (totalBytesWriting ), queue .size ());
367
+ logger .debug ("now write some indexing buffers: total indexing heap bytes used [{}] vs {} [{}], " +
368
+ "currently writing bytes [{}], [{}] shards with non-zero indexing buffer" , new ByteSizeValue (totalBytesUsed ),
369
+ INDEX_BUFFER_SIZE_SETTING .getKey (), indexingBuffer , new ByteSizeValue (totalBytesWriting ), queue .size ());
354
370
355
371
while (totalBytesUsed > indexingBuffer .getBytes () && queue .isEmpty () == false ) {
356
372
ShardAndBytesUsed largest = queue .poll ();
357
- logger .debug ("write indexing buffer to disk for shard [{}] to free up its [{}] indexing buffer" , largest .shard .shardId (), new ByteSizeValue (largest .bytesUsed ));
373
+ logger .debug ("write indexing buffer to disk for shard [{}] to free up its [{}] indexing buffer" ,
374
+ largest .shard .shardId (), new ByteSizeValue (largest .bytesUsed ));
358
375
writeIndexingBufferAsync (largest .shard );
359
376
totalBytesUsed -= largest .bytesUsed ;
360
377
if (doThrottle && throttled .contains (largest .shard ) == false ) {
0 commit comments