diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 4299fa0cb6ea3..42805a19b340f 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -187,6 +187,9 @@ public MergeStats getMergeStats() { /** returns the history uuid for the engine */ public abstract String getHistoryUUID(); + /** Returns how many bytes we are currently moving from heap to disk */ + public abstract long getWritingBytes(); + /** * A throttling class that can be activated, causing the * {@code acquireThrottle} method to block on a lock when throttling @@ -707,7 +710,7 @@ protected void writerSegmentStats(SegmentsStats stats) { } /** How much heap is used that would be freed by a refresh. Note that this may throw {@link AlreadyClosedException}. */ - public abstract long getIndexBufferRAMBytesUsed(); + public abstract long getIndexBufferRAMBytesUsed(); protected Segment[] getSegmentInfo(SegmentInfos lastCommittedSegmentInfos, boolean verbose) { ensureOpen(); diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 76b367dd418d2..ac02099987373 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -140,6 +140,12 @@ public class InternalEngine extends Engine { private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1); private final CounterMetric numVersionLookups = new CounterMetric(); private final CounterMetric numIndexVersionsLookups = new CounterMetric(); + /** + * How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this + * across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents + * being indexed/deleted. + */ + private final AtomicLong writingBytes = new AtomicLong(); @Nullable private final String historyUUID; @@ -409,6 +415,12 @@ public String getHistoryUUID() { return historyUUID; } + /** Returns how many bytes we are currently moving from indexing buffer to segments on disk */ + @Override + public long getWritingBytes() { + return writingBytes.get(); + } + /** * Reads the current stored translog ID from the IW commit data. If the id is not found, recommits the current * translog id into lucene and returns null. @@ -1217,21 +1229,26 @@ public void refresh(String source) throws EngineException { } final void refresh(String source, SearcherScope scope) throws EngineException { + long bytes = 0; // we obtain a read lock here, since we don't want a flush to happen while we are refreshing // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) try (ReleasableLock lock = readLock.acquire()) { ensureOpen(); + bytes = indexWriter.ramBytesUsed(); switch (scope) { case EXTERNAL: // even though we maintain 2 managers we really do the heavy-lifting only once. // the second refresh will only do the extra work we have to do for warming caches etc. + writingBytes.addAndGet(bytes); externalSearcherManager.maybeRefreshBlocking(); // the break here is intentional we never refresh both internal / external together break; case INTERNAL: + final long versionMapBytes = versionMap.ramBytesUsedForRefresh(); + bytes += versionMapBytes; + writingBytes.addAndGet(bytes); internalSearcherManager.maybeRefreshBlocking(); break; - default: throw new IllegalArgumentException("unknown scope: " + scope); } @@ -1245,6 +1262,8 @@ final void refresh(String source, SearcherScope scope) throws EngineException { e.addSuppressed(inner); } throw new RefreshFailedEngineException(shardId, e); + } finally { + writingBytes.addAndGet(-bytes); } // TODO: maybe we should just put a scheduled job in threadPool? @@ -1258,24 +1277,7 @@ final void refresh(String source, SearcherScope scope) throws EngineException { public void writeIndexingBuffer() throws EngineException { // we obtain a read lock here, since we don't want a flush to happen while we are writing // since it flushes the index as well (though, in terms of concurrency, we are allowed to do it) - try (ReleasableLock lock = readLock.acquire()) { - ensureOpen(); - final long versionMapBytes = versionMap.ramBytesUsedForRefresh(); - final long indexingBufferBytes = indexWriter.ramBytesUsed(); - logger.debug("use refresh to write indexing buffer (heap size=[{}]), to also clear version map (heap size=[{}])", - new ByteSizeValue(indexingBufferBytes), new ByteSizeValue(versionMapBytes)); - refresh("write indexing buffer", SearcherScope.INTERNAL); - } catch (AlreadyClosedException e) { - failOnTragicEvent(e); - throw e; - } catch (Exception e) { - try { - failEngine("writeIndexingBuffer failed", e); - } catch (Exception inner) { - e.addSuppressed(inner); - } - throw new RefreshFailedEngineException(shardId, e); - } + refresh("write indexing buffer", SearcherScope.INTERNAL); } @Override diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index f85026aac8a48..d27a8a0ada446 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -183,12 +183,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl private final QueryCachingPolicy cachingPolicy; private final Supplier indexSortSupplier; - /** - * How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this - * across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents - * being indexed/deleted. - */ - private final AtomicLong writingBytes = new AtomicLong(); private final SearchOperationListener searchOperationListener; protected volatile ShardRouting shardRouting; @@ -324,12 +318,6 @@ public Store store() { public Sort getIndexSort() { return indexSortSupplier.get(); } - /** - * returns true if this shard supports indexing (i.e., write) operations. - */ - public boolean canIndex() { - return true; - } public ShardGetService getService() { return this.getService; @@ -840,34 +828,21 @@ public Engine.GetResult get(Engine.Get get) { */ public void refresh(String source) { verifyNotClosed(); - - if (canIndex()) { - long bytes = getEngine().getIndexBufferRAMBytesUsed(); - writingBytes.addAndGet(bytes); - try { - if (logger.isTraceEnabled()) { - logger.trace("refresh with source [{}] indexBufferRAMBytesUsed [{}]", source, new ByteSizeValue(bytes)); - } - getEngine().refresh(source); - } finally { - if (logger.isTraceEnabled()) { - logger.trace("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId()); - } - writingBytes.addAndGet(-bytes); - } - } else { - if (logger.isTraceEnabled()) { - logger.trace("refresh with source [{}]", source); - } - getEngine().refresh(source); + if (logger.isTraceEnabled()) { + logger.trace("refresh with source [{}]", source); } + getEngine().refresh(source); } /** * Returns how many bytes we are currently moving from heap to disk */ public long getWritingBytes() { - return writingBytes.get(); + Engine engine = getEngineOrNull(); + if (engine == null) { + return 0; + } + return engine.getWritingBytes(); } public RefreshStats refreshStats() { @@ -1672,24 +1647,9 @@ private void handleRefreshException(Exception e) { * Called when our shard is using too much heap and should move buffered indexed/deleted documents to disk. */ public void writeIndexingBuffer() { - if (canIndex() == false) { - throw new UnsupportedOperationException(); - } try { Engine engine = getEngine(); - long bytes = engine.getIndexBufferRAMBytesUsed(); - - // NOTE: this can be an overestimate by up to 20%, if engine uses IW.flush not refresh, because version map - // memory is low enough, but this is fine because after the writes finish, IMC will poll again and see that - // there's still up to the 20% being used and continue writing if necessary: - logger.debug("add [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId()); - writingBytes.addAndGet(bytes); - try { - engine.writeIndexingBuffer(); - } finally { - writingBytes.addAndGet(-bytes); - logger.debug("remove [{}] writing bytes for shard [{}]", new ByteSizeValue(bytes), shardId()); - } + engine.writeIndexingBuffer(); } catch (Exception e) { handleRefreshException(e); } diff --git a/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java b/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java index e1c88fff3bee3..73ba9342175d4 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java +++ b/core/src/main/java/org/elasticsearch/indices/IndexingMemoryController.java @@ -152,8 +152,7 @@ ByteSizeValue indexingBufferSize() { protected List availableShards() { List availableShards = new ArrayList<>(); for (IndexShard shard : indexShards) { - // shadow replica doesn't have an indexing buffer - if (shard.canIndex() && CAN_WRITE_INDEX_BUFFER_STATES.contains(shard.state())) { + if (CAN_WRITE_INDEX_BUFFER_STATES.contains(shard.state())) { availableShards.add(shard); } }