Skip to content

Commit 41a9f3a

Browse files
committed
Use reader attributes to control term dict memory useage (#42838)
This change makes use of the reader attributes added in LUCENE-8671 to ensure that `_id` fields are always on-heap for best update performance and term dicts are generally off-heap on Read-Only engines. Closes #38390
1 parent 955aee8 commit 41a9f3a

File tree

13 files changed

+127
-210
lines changed

13 files changed

+127
-210
lines changed

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
package org.elasticsearch.index.engine;
2121

2222
import org.apache.logging.log4j.Logger;
23+
import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader;
24+
import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader.FSTLoadMode;
2325
import org.apache.lucene.document.Field;
2426
import org.apache.lucene.document.NumericDocValuesField;
2527
import org.apache.lucene.index.DirectoryReader;
@@ -42,7 +44,9 @@
4244
import org.apache.lucene.search.TermQuery;
4345
import org.apache.lucene.store.AlreadyClosedException;
4446
import org.apache.lucene.store.Directory;
47+
import org.apache.lucene.store.FilterDirectory;
4548
import org.apache.lucene.store.LockObtainFailedException;
49+
import org.apache.lucene.store.MMapDirectory;
4650
import org.apache.lucene.util.BytesRef;
4751
import org.apache.lucene.util.InfoStream;
4852
import org.elasticsearch.Assertions;
@@ -77,6 +81,7 @@
7781
import org.elasticsearch.index.seqno.SequenceNumbers;
7882
import org.elasticsearch.index.shard.ElasticsearchMergePolicy;
7983
import org.elasticsearch.index.shard.ShardId;
84+
import org.elasticsearch.index.store.FsDirectoryFactory;
8085
import org.elasticsearch.index.store.Store;
8186
import org.elasticsearch.index.translog.Translog;
8287
import org.elasticsearch.index.translog.TranslogConfig;
@@ -2143,10 +2148,21 @@ IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOEx
21432148
}
21442149
}
21452150

2151+
static Map<String, String> getReaderAttributes(Directory directory) {
2152+
Directory unwrap = FilterDirectory.unwrap(directory);
2153+
boolean defaultOffHeap = FsDirectoryFactory.isHybridFs(unwrap) || unwrap instanceof MMapDirectory;
2154+
return Map.of(
2155+
BlockTreeTermsReader.FST_MODE_KEY, // if we are using MMAP for term dics we force all off heap unless it's the ID field
2156+
defaultOffHeap ? FSTLoadMode.OFF_HEAP.name() : FSTLoadMode.ON_HEAP.name()
2157+
, BlockTreeTermsReader.FST_MODE_KEY + "." + IdFieldMapper.NAME, // always force ID field on-heap for fast updates
2158+
FSTLoadMode.ON_HEAP.name());
2159+
}
2160+
21462161
private IndexWriterConfig getIndexWriterConfig() {
21472162
final IndexWriterConfig iwc = new IndexWriterConfig(engineConfig.getAnalyzer());
21482163
iwc.setCommitOnClose(false); // we by default don't commit on close
21492164
iwc.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
2165+
iwc.setReaderAttributes(getReaderAttributes(store.directory()));
21502166
iwc.setIndexDeletionPolicy(combinedDeletionPolicy);
21512167
// with tests.verbose, lucene sets this up: plumb to align with filesystem stream
21522168
boolean verbose = false;

server/src/main/java/org/elasticsearch/index/engine/NoOpEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public NoOpEngine(EngineConfig config) {
4646
super(config, null, null, true, Function.identity());
4747
this.stats = new SegmentsStats();
4848
Directory directory = store.directory();
49-
try (DirectoryReader reader = DirectoryReader.open(directory)) {
49+
try (DirectoryReader reader = DirectoryReader.open(directory, OFF_HEAP_READER_ATTRIBUTES)) {
5050
for (LeafReaderContext ctx : reader.getContext().leaves()) {
5151
SegmentReader segmentReader = Lucene.segmentReader(ctx.reader());
5252
fillSegmentStats(segmentReader, true, stats);

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.elasticsearch.index.engine;
2020

21+
import org.apache.lucene.codecs.blocktree.BlockTreeTermsReader;
2122
import org.apache.lucene.index.DirectoryReader;
2223
import org.apache.lucene.index.IndexCommit;
2324
import org.apache.lucene.index.IndexReader;
@@ -47,7 +48,9 @@
4748
import java.io.IOException;
4849
import java.io.UncheckedIOException;
4950
import java.util.Arrays;
51+
import java.util.Collections;
5052
import java.util.List;
53+
import java.util.Map;
5154
import java.util.concurrent.CountDownLatch;
5255
import java.util.function.BiFunction;
5356
import java.util.function.Function;
@@ -62,6 +65,12 @@
6265
*/
6366
public class ReadOnlyEngine extends Engine {
6467

68+
/**
69+
* Reader attributes used for read only engines. These attributes prevent loading term dictionaries on-heap even if the field is an
70+
* ID field.
71+
*/
72+
public static final Map<String, String> OFF_HEAP_READER_ATTRIBUTES = Collections.singletonMap(BlockTreeTermsReader.FST_MODE_KEY,
73+
BlockTreeTermsReader.FSTLoadMode.OFF_HEAP.name());
6574
private final SegmentInfos lastCommittedSegmentInfos;
6675
private final SeqNoStats seqNoStats;
6776
private final TranslogStats translogStats;
@@ -165,7 +174,7 @@ protected final DirectoryReader wrapReader(DirectoryReader reader,
165174
}
166175

167176
protected DirectoryReader open(IndexCommit commit) throws IOException {
168-
return DirectoryReader.open(commit);
177+
return DirectoryReader.open(commit, OFF_HEAP_READER_ATTRIBUTES);
169178
}
170179

171180
private DocsStats docsStats(final SegmentInfos lastCommittedSegmentInfos) {

server/src/main/java/org/elasticsearch/index/store/FsDirectoryFactory.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.lucene.store.Directory;
2323
import org.apache.lucene.store.FSDirectory;
2424
import org.apache.lucene.store.FileSwitchDirectory;
25+
import org.apache.lucene.store.FilterDirectory;
2526
import org.apache.lucene.store.IOContext;
2627
import org.apache.lucene.store.IndexInput;
2728
import org.apache.lucene.store.LockFactory;
@@ -121,6 +122,14 @@ public String[] listAll() throws IOException {
121122
return directory;
122123
}
123124

125+
/**
126+
* Returns true iff the directory is a hybrid fs directory
127+
*/
128+
public static boolean isHybridFs(Directory directory) {
129+
Directory unwrap = FilterDirectory.unwrap(directory);
130+
return unwrap instanceof HybridDirectory;
131+
}
132+
124133
static final class HybridDirectory extends NIOFSDirectory {
125134
private final FSDirectory randomAccessDirectory;
126135

server/src/main/java/org/elasticsearch/index/store/Store.java

Lines changed: 3 additions & 144 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,13 @@
3838
import org.apache.lucene.store.AlreadyClosedException;
3939
import org.apache.lucene.store.BufferedChecksum;
4040
import org.apache.lucene.store.ByteArrayDataInput;
41-
import org.apache.lucene.store.ByteBufferIndexInput;
4241
import org.apache.lucene.store.ChecksumIndexInput;
4342
import org.apache.lucene.store.Directory;
4443
import org.apache.lucene.store.FilterDirectory;
4544
import org.apache.lucene.store.IOContext;
4645
import org.apache.lucene.store.IndexInput;
4746
import org.apache.lucene.store.IndexOutput;
4847
import org.apache.lucene.store.Lock;
49-
import org.apache.lucene.store.RandomAccessInput;
5048
import org.apache.lucene.store.SimpleFSDirectory;
5149
import org.apache.lucene.util.ArrayUtil;
5250
import org.apache.lucene.util.BytesRef;
@@ -98,7 +96,6 @@
9896
import java.util.Iterator;
9997
import java.util.List;
10098
import java.util.Map;
101-
import java.util.Set;
10299
import java.util.concurrent.TimeUnit;
103100
import java.util.concurrent.atomic.AtomicBoolean;
104101
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -137,7 +134,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
137134
* this by exploiting lucene internals and wrapping the IndexInput in a simple delegate.
138135
*/
139136
public static final Setting<Boolean> FORCE_RAM_TERM_DICT = Setting.boolSetting("index.force_memory_term_dictionary", false,
140-
Property.IndexScope);
137+
Property.IndexScope, Property.Deprecated);
141138
static final String CODEC = "store";
142139
static final int VERSION_WRITE_THROWABLE= 2; // we write throwable since 2.0
143140
static final int VERSION_STACK_TRACE = 1; // we write the stack trace too since 1.4.0
@@ -172,8 +169,7 @@ public Store(ShardId shardId, IndexSettings indexSettings, Directory directory,
172169
final TimeValue refreshInterval = indexSettings.getValue(INDEX_STORE_STATS_REFRESH_INTERVAL_SETTING);
173170
logger.debug("store stats are refreshed with refresh_interval [{}]", refreshInterval);
174171
ByteSizeCachingDirectory sizeCachingDir = new ByteSizeCachingDirectory(directory, refreshInterval);
175-
this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", shardId),
176-
indexSettings.getValue(FORCE_RAM_TERM_DICT));
172+
this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", shardId));
177173
this.shardLock = shardLock;
178174
this.onClose = onClose;
179175

@@ -712,12 +708,10 @@ public int refCount() {
712708
static final class StoreDirectory extends FilterDirectory {
713709

714710
private final Logger deletesLogger;
715-
private final boolean forceRamTermDict;
716711

717-
StoreDirectory(ByteSizeCachingDirectory delegateDirectory, Logger deletesLogger, boolean forceRamTermDict) {
712+
StoreDirectory(ByteSizeCachingDirectory delegateDirectory, Logger deletesLogger) {
718713
super(delegateDirectory);
719714
this.deletesLogger = deletesLogger;
720-
this.forceRamTermDict = forceRamTermDict;
721715
}
722716

723717
/** Estimate the cumulative size of all files in this directory in bytes. */
@@ -744,18 +738,6 @@ private void innerClose() throws IOException {
744738
super.close();
745739
}
746740

747-
@Override
748-
public IndexInput openInput(String name, IOContext context) throws IOException {
749-
IndexInput input = super.openInput(name, context);
750-
if (name.endsWith(".tip") || name.endsWith(".cfs")) {
751-
// only do this if we are reading cfs or tip file - all other files don't need this.
752-
if (forceRamTermDict && input instanceof ByteBufferIndexInput) {
753-
return new DeoptimizingIndexInput(input.toString(), input);
754-
}
755-
}
756-
return input;
757-
}
758-
759741
@Override
760742
public String toString() {
761743
return "store(" + in.toString() + ")";
@@ -1636,127 +1618,4 @@ private static IndexWriterConfig newIndexWriterConfig() {
16361618
// we also don't specify a codec here and merges should use the engines for this index
16371619
.setMergePolicy(NoMergePolicy.INSTANCE);
16381620
}
1639-
1640-
/**
1641-
* see {@link #FORCE_RAM_TERM_DICT} for details
1642-
*/
1643-
private static final class DeoptimizingIndexInput extends IndexInput {
1644-
1645-
private final IndexInput in;
1646-
1647-
private DeoptimizingIndexInput(String resourceDescription, IndexInput in) {
1648-
super(resourceDescription);
1649-
this.in = in;
1650-
}
1651-
1652-
@Override
1653-
public IndexInput clone() {
1654-
return new DeoptimizingIndexInput(toString(), in.clone());
1655-
}
1656-
1657-
@Override
1658-
public void close() throws IOException {
1659-
in.close();
1660-
}
1661-
1662-
@Override
1663-
public long getFilePointer() {
1664-
return in.getFilePointer();
1665-
}
1666-
1667-
@Override
1668-
public void seek(long pos) throws IOException {
1669-
in.seek(pos);
1670-
}
1671-
1672-
@Override
1673-
public long length() {
1674-
return in.length();
1675-
}
1676-
1677-
@Override
1678-
public String toString() {
1679-
return in.toString();
1680-
}
1681-
1682-
@Override
1683-
public IndexInput slice(String sliceDescription, long offset, long length) throws IOException {
1684-
return new DeoptimizingIndexInput(sliceDescription, in.slice(sliceDescription, offset, length));
1685-
}
1686-
1687-
@Override
1688-
public RandomAccessInput randomAccessSlice(long offset, long length) throws IOException {
1689-
return in.randomAccessSlice(offset, length);
1690-
}
1691-
1692-
@Override
1693-
public byte readByte() throws IOException {
1694-
return in.readByte();
1695-
}
1696-
1697-
@Override
1698-
public void readBytes(byte[] b, int offset, int len) throws IOException {
1699-
in.readBytes(b, offset, len);
1700-
}
1701-
1702-
@Override
1703-
public void readBytes(byte[] b, int offset, int len, boolean useBuffer) throws IOException {
1704-
in.readBytes(b, offset, len, useBuffer);
1705-
}
1706-
1707-
@Override
1708-
public short readShort() throws IOException {
1709-
return in.readShort();
1710-
}
1711-
1712-
@Override
1713-
public int readInt() throws IOException {
1714-
return in.readInt();
1715-
}
1716-
1717-
@Override
1718-
public int readVInt() throws IOException {
1719-
return in.readVInt();
1720-
}
1721-
1722-
@Override
1723-
public int readZInt() throws IOException {
1724-
return in.readZInt();
1725-
}
1726-
1727-
@Override
1728-
public long readLong() throws IOException {
1729-
return in.readLong();
1730-
}
1731-
1732-
@Override
1733-
public long readVLong() throws IOException {
1734-
return in.readVLong();
1735-
}
1736-
1737-
@Override
1738-
public long readZLong() throws IOException {
1739-
return in.readZLong();
1740-
}
1741-
1742-
@Override
1743-
public String readString() throws IOException {
1744-
return in.readString();
1745-
}
1746-
1747-
@Override
1748-
public Map<String, String> readMapOfStrings() throws IOException {
1749-
return in.readMapOfStrings();
1750-
}
1751-
1752-
@Override
1753-
public Set<String> readSetOfStrings() throws IOException {
1754-
return in.readSetOfStrings();
1755-
}
1756-
1757-
@Override
1758-
public void skipBytes(long numBytes) throws IOException {
1759-
in.skipBytes(numBytes);
1760-
}
1761-
}
17621621
}

0 commit comments

Comments
 (0)