Skip to content

Commit f928d61

Browse files
authored
Move IndexShard#getWritingBytes() under InternalEngine (#27209)
We do some accounting in IndexShard that is not necessarily correct since we maintain two different index readers. This change moves the accounting under the engine which knows what reader we are refreshing. Relates to #26972
1 parent 2f65f3a commit f928d61

File tree

4 files changed

+35
-71
lines changed

4 files changed

+35
-71
lines changed

core/src/main/java/org/elasticsearch/index/engine/Engine.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,9 @@ public MergeStats getMergeStats() {
187187
/** returns the history uuid for the engine */
188188
public abstract String getHistoryUUID();
189189

190+
/** Returns how many bytes we are currently moving from heap to disk */
191+
public abstract long getWritingBytes();
192+
190193
/**
191194
* A throttling class that can be activated, causing the
192195
* {@code acquireThrottle} method to block on a lock when throttling
@@ -707,7 +710,7 @@ protected void writerSegmentStats(SegmentsStats stats) {
707710
}
708711

709712
/** How much heap is used that would be freed by a refresh. Note that this may throw {@link AlreadyClosedException}. */
710-
public abstract long getIndexBufferRAMBytesUsed();
713+
public abstract long getIndexBufferRAMBytesUsed();
711714

712715
protected Segment[] getSegmentInfo(SegmentInfos lastCommittedSegmentInfos, boolean verbose) {
713716
ensureOpen();

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

+21-19
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,12 @@ public class InternalEngine extends Engine {
140140
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
141141
private final CounterMetric numVersionLookups = new CounterMetric();
142142
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
143+
/**
144+
* How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
145+
* across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents
146+
* being indexed/deleted.
147+
*/
148+
private final AtomicLong writingBytes = new AtomicLong();
143149

144150
@Nullable
145151
private final String historyUUID;
@@ -409,6 +415,12 @@ public String getHistoryUUID() {
409415
return historyUUID;
410416
}
411417

418+
/** Returns how many bytes we are currently moving from indexing buffer to segments on disk */
419+
@Override
420+
public long getWritingBytes() {
421+
return writingBytes.get();
422+
}
423+
412424
/**
413425
* Reads the current stored translog ID from the IW commit data. If the id is not found, recommits the current
414426
* translog id into lucene and returns null.
@@ -1217,21 +1229,26 @@ public void refresh(String source) throws EngineException {
12171229
}
12181230

12191231
final void refresh(String source, SearcherScope scope) throws EngineException {
1232+
long bytes = 0;
12201233
// we obtain a read lock here, since we don't want a flush to happen while we are refreshing
12211234
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
12221235
try (ReleasableLock lock = readLock.acquire()) {
12231236
ensureOpen();
1237+
bytes = indexWriter.ramBytesUsed();
12241238
switch (scope) {
12251239
case EXTERNAL:
12261240
// even though we maintain 2 managers we really do the heavy-lifting only once.
12271241
// the second refresh will only do the extra work we have to do for warming caches etc.
1242+
writingBytes.addAndGet(bytes);
12281243
externalSearcherManager.maybeRefreshBlocking();
12291244
// the break here is intentional we never refresh both internal / external together
12301245
break;
12311246
case INTERNAL:
1247+
final long versionMapBytes = versionMap.ramBytesUsedForRefresh();
1248+
bytes += versionMapBytes;
1249+
writingBytes.addAndGet(bytes);
12321250
internalSearcherManager.maybeRefreshBlocking();
12331251
break;
1234-
12351252
default:
12361253
throw new IllegalArgumentException("unknown scope: " + scope);
12371254
}
@@ -1245,6 +1262,8 @@ final void refresh(String source, SearcherScope scope) throws EngineException {
12451262
e.addSuppressed(inner);
12461263
}
12471264
throw new RefreshFailedEngineException(shardId, e);
1265+
} finally {
1266+
writingBytes.addAndGet(-bytes);
12481267
}
12491268

12501269
// TODO: maybe we should just put a scheduled job in threadPool?
@@ -1258,24 +1277,7 @@ final void refresh(String source, SearcherScope scope) throws EngineException {
12581277
public void writeIndexingBuffer() throws EngineException {
12591278
// we obtain a read lock here, since we don't want a flush to happen while we are writing
12601279
// since it flushes the index as well (though, in terms of concurrency, we are allowed to do it)
1261-
try (ReleasableLock lock = readLock.acquire()) {
1262-
ensureOpen();
1263-
final long versionMapBytes = versionMap.ramBytesUsedForRefresh();
1264-
final long indexingBufferBytes = indexWriter.ramBytesUsed();
1265-
logger.debug("use refresh to write indexing buffer (heap size=[{}]), to also clear version map (heap size=[{}])",
1266-
new ByteSizeValue(indexingBufferBytes), new ByteSizeValue(versionMapBytes));
1267-
refresh("write indexing buffer", SearcherScope.INTERNAL);
1268-
} catch (AlreadyClosedException e) {
1269-
failOnTragicEvent(e);
1270-
throw e;
1271-
} catch (Exception e) {
1272-
try {
1273-
failEngine("writeIndexingBuffer failed", e);
1274-
} catch (Exception inner) {
1275-
e.addSuppressed(inner);
1276-
}
1277-
throw new RefreshFailedEngineException(shardId, e);
1278-
}
1280+
refresh("write indexing buffer", SearcherScope.INTERNAL);
12791281
}
12801282

12811283
@Override

core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

+9-49
Original file line numberDiff line numberDiff line change
@@ -183,12 +183,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
183183
private final QueryCachingPolicy cachingPolicy;
184184
private final Supplier<Sort> indexSortSupplier;
185185

186-
/**
187-
* How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
188-
* across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents
189-
* being indexed/deleted.
190-
*/
191-
private final AtomicLong writingBytes = new AtomicLong();
192186
private final SearchOperationListener searchOperationListener;
193187

194188
protected volatile ShardRouting shardRouting;
@@ -324,12 +318,6 @@ public Store store() {
324318
public Sort getIndexSort() {
325319
return indexSortSupplier.get();
326320
}
327-
/**
328-
* returns true if this shard supports indexing (i.e., write) operations.
329-
*/
330-
public boolean canIndex() {
331-
return true;
332-
}
333321

334322
public ShardGetService getService() {
335323
return this.getService;
@@ -840,34 +828,21 @@ public Engine.GetResult get(Engine.Get get) {
840828
*/
841829
public void refresh(String source) {
842830
verifyNotClosed();
843-
844-
if (canIndex()) {
845-
long bytes = getEngine().getIndexBufferRAMBytesUsed();
846-
writingBytes.addAndGet(bytes);
847-
try {
848-
if (logger.isTraceEnabled()) {
849-
logger.trace("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(bytes));
850-
}
851-
getEngine().refresh(source);
852-
} finally {
853-
if (logger.isTraceEnabled()) {
854-
logger.trace("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId());
855-
}
856-
writingBytes.addAndGet(-bytes);
857-
}
858-
} else {
859-
if (logger.isTraceEnabled()) {
860-
logger.trace("refresh with source [{}]", source);
861-
}
862-
getEngine().refresh(source);
831+
if (logger.isTraceEnabled()) {
832+
logger.trace("refresh with source [{}]", source);
863833
}
834+
getEngine().refresh(source);
864835
}
865836

866837
/**
867838
* Returns how many bytes we are currently moving from heap to disk
868839
*/
869840
public long getWritingBytes() {
870-
return writingBytes.get();
841+
Engine engine = getEngineOrNull();
842+
if (engine == null) {
843+
return 0;
844+
}
845+
return engine.getWritingBytes();
871846
}
872847

873848
public RefreshStats refreshStats() {
@@ -1672,24 +1647,9 @@ private void handleRefreshException(Exception e) {
16721647
* Called when our shard is using too much heap and should move buffered indexed/deleted documents to disk.
16731648
*/
16741649
public void writeIndexingBuffer() {
1675-
if (canIndex() == false) {
1676-
throw new UnsupportedOperationException();
1677-
}
16781650
try {
16791651
Engine engine = getEngine();
1680-
long bytes = engine.getIndexBufferRAMBytesUsed();
1681-
1682-
// NOTE: this can be an overestimate by up to 20%, if engine uses IW.flush not refresh, because version map
1683-
// memory is low enough, but this is fine because after the writes finish, IMC will poll again and see that
1684-
// there's still up to the 20% being used and continue writing if necessary:
1685-
logger.debug("add [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId());
1686-
writingBytes.addAndGet(bytes);
1687-
try {
1688-
engine.writeIndexingBuffer();
1689-
} finally {
1690-
writingBytes.addAndGet(-bytes);
1691-
logger.debug("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId());
1692-
}
1652+
engine.writeIndexingBuffer();
16931653
} catch (Exception e) {
16941654
handleRefreshException(e);
16951655
}

core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,7 @@ ByteSizeValue indexingBufferSize() {
152152
protected List<IndexShard> availableShards() {
153153
List<IndexShard> availableShards = new ArrayList<>();
154154
for (IndexShard shard : indexShards) {
155-
// shadow replica doesn't have an indexing buffer
156-
if (shard.canIndex() && CAN_WRITE_INDEX_BUFFER_STATES.contains(shard.state())) {
155+
if (CAN_WRITE_INDEX_BUFFER_STATES.contains(shard.state())) {
157156
availableShards.add(shard);
158157
}
159158
}

0 commit comments

Comments
 (0)