Skip to content

Commit 33408ac

Browse files
committed
Move IndexShard#getWritingBytes() under InternalEngine
We do some accouting in IndexShard that is not necessarily correct since we maintain two different index readers. This change moves the accounting under the engine which knows what reader we are refreshing. Relates to elastic#26972
1 parent 99aca9c commit 33408ac

File tree

3 files changed

+33
-53
lines changed

3 files changed

+33
-53
lines changed

core/src/main/java/org/elasticsearch/index/engine/Engine.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,8 @@ public MergeStats getMergeStats() {
187187
/** returns the history uuid for the engine */
188188
public abstract String getHistoryUUID();
189189

190+
public abstract long getWritingBytes();
191+
190192
/**
191193
* A throttling class that can be activated, causing the
192194
* {@code acquireThrottle} method to block on a lock when throttling
@@ -707,7 +709,7 @@ protected void writerSegmentStats(SegmentsStats stats) {
707709
}
708710

709711
/** How much heap is used that would be freed by a refresh. Note that this may throw {@link AlreadyClosedException}. */
710-
public abstract long getIndexBufferRAMBytesUsed();
712+
public abstract long getIndexBufferRAMBytesUsed();
711713

712714
protected Segment[] getSegmentInfo(SegmentInfos lastCommittedSegmentInfos, boolean verbose) {
713715
ensureOpen();

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

+23-19
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,12 @@ public class InternalEngine extends Engine {
140140
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
141141
private final CounterMetric numVersionLookups = new CounterMetric();
142142
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
143+
/**
144+
* How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
145+
* across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents
146+
* being indexed/deleted.
147+
*/
148+
private final AtomicLong writingBytes = new AtomicLong();
143149

144150
@Nullable
145151
private final String historyUUID;
@@ -409,6 +415,14 @@ public String getHistoryUUID() {
409415
return historyUUID;
410416
}
411417

418+
/**
419+
* Returns how many bytes we are currently moving from heap to disk
420+
*/
421+
@Override
422+
public long getWritingBytes() {
423+
return writingBytes.get();
424+
}
425+
412426
/**
413427
* Reads the current stored translog ID from the IW commit data. If the id is not found, recommits the current
414428
* translog id into lucene and returns null.
@@ -1217,21 +1231,26 @@ public void refresh(String source) throws EngineException {
12171231
}
12181232

12191233
final void refresh(String source, SearcherScope scope) throws EngineException {
1234+
long bytes = 0;
12201235
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
12211236
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
12221237
try (ReleasableLock lock = readLock.acquire()) {
12231238
ensureOpen();
1239+
bytes = indexWriter.ramBytesUsed();
12241240
switch (scope) {
12251241
case EXTERNAL:
12261242
// even though we maintain 2 managers we really do the heavy-lifting only once.
12271243
// the second refresh will only do the extra work we have to do for warming caches etc.
1244+
writingBytes.addAndGet(bytes);
12281245
externalSearcherManager.maybeRefreshBlocking();
12291246
// the break here is intentional we never refresh both internal / external together
12301247
break;
12311248
case INTERNAL:
1249+
final long versionMapBytes = versionMap.ramBytesUsedForRefresh();
1250+
bytes += versionMapBytes;
1251+
writingBytes.addAndGet(bytes);
12321252
internalSearcherManager.maybeRefreshBlocking();
12331253
break;
1234-
12351254
default:
12361255
throw new IllegalArgumentException("unknown scope: " + scope);
12371256
}
@@ -1245,6 +1264,8 @@ final void refresh(String source, SearcherScope scope) throws EngineException {
12451264
e.addSuppressed(inner);
12461265
}
12471266
throw new RefreshFailedEngineException(shardId, e);
1267+
} finally {
1268+
writingBytes.addAndGet(-bytes);
12481269
}
12491270

12501271
// TODO: maybe we should just put a scheduled job in threadPool?
@@ -1258,24 +1279,7 @@ final void refresh(String source, SearcherScope scope) throws EngineException {
12581279
public void writeIndexingBuffer() throws EngineException {
12591280
// we obtain a read lock here, since we don't want a flush to happen while we are writing
12601281
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
1261-
try (ReleasableLock lock = readLock.acquire()) {
1262-
ensureOpen();
1263-
final long versionMapBytes = versionMap.ramBytesUsedForRefresh();
1264-
final long indexingBufferBytes = indexWriter.ramBytesUsed();
1265-
logger.debug("use refresh to write indexing buffer (heap size=[{}]), to also clear version map (heap size=[{}])",
1266-
new ByteSizeValue(indexingBufferBytes), new ByteSizeValue(versionMapBytes));
1267-
refresh("write indexing buffer", SearcherScope.INTERNAL);
1268-
} catch (AlreadyClosedException e) {
1269-
failOnTragicEvent(e);
1270-
throw e;
1271-
} catch (Exception e) {
1272-
try {
1273-
failEngine("writeIndexingBuffer failed", e);
1274-
} catch (Exception inner) {
1275-
e.addSuppressed(inner);
1276-
}
1277-
throw new RefreshFailedEngineException(shardId, e);
1278-
}
1282+
refresh("write indexing buffer", SearcherScope.INTERNAL);
12791283
}
12801284

12811285
@Override

core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+7-33
Original file line numberDiff line numberDiff line change
@@ -183,12 +183,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
183183
private final QueryCachingPolicy cachingPolicy;
184184
private final Supplier<Sort> indexSortSupplier;
185185

186-
/**
187-
* How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
188-
* across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents
189-
* being indexed/deleted.
190-
*/
191-
private final AtomicLong writingBytes = new AtomicLong();
192186
private final SearchOperationListener searchOperationListener;
193187

194188
protected volatile ShardRouting shardRouting;
@@ -842,19 +836,7 @@ public void refresh(String source) {
842836
verifyNotClosed();
843837

844838
if (canIndex()) {
845-
long bytes = getEngine().getIndexBufferRAMBytesUsed();
846-
writingBytes.addAndGet(bytes);
847-
try {
848-
if (logger.isTraceEnabled()) {
849-
logger.trace("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(bytes));
850-
}
851-
getEngine().refresh(source);
852-
} finally {
853-
if (logger.isTraceEnabled()) {
854-
logger.trace("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId());
855-
}
856-
writingBytes.addAndGet(-bytes);
857-
}
839+
858840
} else {
859841
if (logger.isTraceEnabled()) {
860842
logger.trace("refresh with source [{}]", source);
@@ -867,7 +849,11 @@ public void refresh(String source) {
867849
* Returns how many bytes we are currently moving from heap to disk
868850
*/
869851
public long getWritingBytes() {
870-
return writingBytes.get();
852+
Engine engine = getEngineOrNull();
853+
if (engine == null) {
854+
return 0;
855+
}
856+
return engine.getWritingBytes();
871857
}
872858

873859
public RefreshStats refreshStats() {
@@ -1677,19 +1663,7 @@ public void writeIndexingBuffer() {
16771663
}
16781664
try {
16791665
Engine engine = getEngine();
1680-
long bytes = engine.getIndexBufferRAMBytesUsed();
1681-
1682-
// NOTE: this can be an overestimate by up to 20%, if engine uses IW.flush not refresh, because version map
1683-
// memory is low enough, but this is fine because after the writes finish, IMC will poll again and see that
1684-
// there's still up to the 20% being used and continue writing if necessary:
1685-
logger.debug("add [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId());
1686-
writingBytes.addAndGet(bytes);
1687-
try {
1688-
engine.writeIndexingBuffer();
1689-
} finally {
1690-
writingBytes.addAndGet(-bytes);
1691-
logger.debug("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId());
1692-
}
1666+
engine.writeIndexingBuffer();
16931667
} catch (Exception e) {
16941668
handleRefreshException(e);
16951669
}

0 commit comments

Comments
 (0)