@@ -140,6 +140,12 @@ public class InternalEngine extends Engine {
140
140
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong (-1 );
141
141
private final CounterMetric numVersionLookups = new CounterMetric ();
142
142
private final CounterMetric numIndexVersionsLookups = new CounterMetric ();
143
+ /**
144
+ * How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
145
+ * across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents
146
+ * being indexed/deleted.
147
+ */
148
+ private final AtomicLong writingBytes = new AtomicLong ();
143
149
144
150
@ Nullable
145
151
private final String historyUUID ;
@@ -422,6 +428,12 @@ public String getHistoryUUID() {
422
428
return historyUUID ;
423
429
}
424
430
431
+ /** Returns how many bytes we are currently moving from indexing buffer to segments on disk */
432
+ @ Override
433
+ public long getWritingBytes () {
434
+ return writingBytes .get ();
435
+ }
436
+
425
437
/**
426
438
* Reads the current stored translog ID from the IW commit data. If the id is not found, recommits the current
427
439
* translog id into lucene and returns null.
@@ -1230,21 +1242,26 @@ public void refresh(String source) throws EngineException {
1230
1242
}
1231
1243
1232
1244
final void refresh (String source , SearcherScope scope ) throws EngineException {
1245
+ long bytes = 0 ;
1233
1246
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
1234
1247
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
1235
1248
try (ReleasableLock lock = readLock .acquire ()) {
1236
1249
ensureOpen ();
1250
+ bytes = indexWriter .ramBytesUsed ();
1237
1251
switch (scope ) {
1238
1252
case EXTERNAL :
1239
1253
// even though we maintain 2 managers we really do the heavy-lifting only once.
1240
1254
// the second refresh will only do the extra work we have to do for warming caches etc.
1255
+ writingBytes .addAndGet (bytes );
1241
1256
externalSearcherManager .maybeRefreshBlocking ();
1242
1257
// the break here is intentional we never refresh both internal / external together
1243
1258
break ;
1244
1259
case INTERNAL :
1260
+ final long versionMapBytes = versionMap .ramBytesUsedForRefresh ();
1261
+ bytes += versionMapBytes ;
1262
+ writingBytes .addAndGet (bytes );
1245
1263
internalSearcherManager .maybeRefreshBlocking ();
1246
1264
break ;
1247
-
1248
1265
default :
1249
1266
throw new IllegalArgumentException ("unknown scope: " + scope );
1250
1267
}
@@ -1258,6 +1275,8 @@ final void refresh(String source, SearcherScope scope) throws EngineException {
1258
1275
e .addSuppressed (inner );
1259
1276
}
1260
1277
throw new RefreshFailedEngineException (shardId , e );
1278
+ } finally {
1279
+ writingBytes .addAndGet (-bytes );
1261
1280
}
1262
1281
1263
1282
// TODO: maybe we should just put a scheduled job in threadPool?
@@ -1271,24 +1290,7 @@ final void refresh(String source, SearcherScope scope) throws EngineException {
1271
1290
public void writeIndexingBuffer () throws EngineException {
1272
1291
// we obtain a read lock here, since we don't want a flush to happen while we are writing
1273
1292
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
1274
- try (ReleasableLock lock = readLock .acquire ()) {
1275
- ensureOpen ();
1276
- final long versionMapBytes = versionMap .ramBytesUsedForRefresh ();
1277
- final long indexingBufferBytes = indexWriter .ramBytesUsed ();
1278
- logger .debug ("use refresh to write indexing buffer (heap size=[{}]), to also clear version map (heap size=[{}])" ,
1279
- new ByteSizeValue (indexingBufferBytes ), new ByteSizeValue (versionMapBytes ));
1280
- refresh ("write indexing buffer" , SearcherScope .INTERNAL );
1281
- } catch (AlreadyClosedException e ) {
1282
- failOnTragicEvent (e );
1283
- throw e ;
1284
- } catch (Exception e ) {
1285
- try {
1286
- failEngine ("writeIndexingBuffer failed" , e );
1287
- } catch (Exception inner ) {
1288
- e .addSuppressed (inner );
1289
- }
1290
- throw new RefreshFailedEngineException (shardId , e );
1291
- }
1293
+ refresh ("write indexing buffer" , SearcherScope .INTERNAL );
1292
1294
}
1293
1295
1294
1296
@ Override
0 commit comments