diff --git a/benchmarks/src/main/java/org/elasticsearch/benchmark/index/codec/tsdb/TSDBDocValuesMergeBenchmark.java b/benchmarks/src/main/java/org/elasticsearch/benchmark/index/codec/tsdb/TSDBDocValuesMergeBenchmark.java new file mode 100644 index 0000000000000..e3eb3405038dd --- /dev/null +++ b/benchmarks/src/main/java/org/elasticsearch/benchmark/index/codec/tsdb/TSDBDocValuesMergeBenchmark.java @@ -0,0 +1,196 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.benchmark.index.codec.tsdb; + +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.codecs.DocValuesFormat; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.SortedDocValuesField; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.document.SortedSetDocValuesField; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LogByteSizeMergePolicy; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.SortedNumericSortField; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.common.logging.LogConfigurator; +import org.elasticsearch.index.codec.Elasticsearch900Lucene101Codec; +import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.profile.AsyncProfiler; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.io.IOException; +import java.nio.file.Files; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +@BenchmarkMode(Mode.SampleTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@State(Scope.Benchmark) +@Fork(1) +@Threads(1) +@Warmup(iterations = 0) +@Measurement(iterations = 1) +public class TSDBDocValuesMergeBenchmark { + + static { + // For Elasticsearch900Lucene101Codec: + LogConfigurator.loadLog4jPlugins(); + LogConfigurator.configureESLogging(); + LogConfigurator.setNodeName("test"); + } + + @Param("20431204") + private int nDocs; + + @Param("1000") + private int deltaTime; + + @Param("42") + private int seed; + + private static final String TIMESTAMP_FIELD = "@timestamp"; + private static final String HOSTNAME_FIELD = "host.name"; + private static final long BASE_TIMESTAMP = 1704067200000L; + + private IndexWriter indexWriterWithoutOptimizedMerge; + private IndexWriter indexWriterWithOptimizedMerge; + private ExecutorService executorService; + + public static void main(String[] args) throws RunnerException { + final Options options = new OptionsBuilder().include(TSDBDocValuesMergeBenchmark.class.getSimpleName()) + .addProfiler(AsyncProfiler.class) + .build(); + + new Runner(options).run(); + } + + @Setup(Level.Trial) + public void setup() throws IOException { + executorService = Executors.newSingleThreadExecutor(); + + final Directory tempDirectoryWithoutDocValuesSkipper = FSDirectory.open(Files.createTempDirectory("temp1-")); + final Directory tempDirectoryWithDocValuesSkipper = FSDirectory.open(Files.createTempDirectory("temp2-")); + + indexWriterWithoutOptimizedMerge = createIndex(tempDirectoryWithoutDocValuesSkipper, false); + indexWriterWithOptimizedMerge = createIndex(tempDirectoryWithDocValuesSkipper, true); + } + + private IndexWriter createIndex(final Directory directory, final boolean optimizedMergeEnabled) throws IOException { + final var iwc = createIndexWriterConfig(optimizedMergeEnabled); + long counter1 = 0; + long counter2 = 10_000_000; + long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 }; + long[] gauge2Values = new long[] { -2, -4, -6, -8, -10, -12, -14, -16 }; + int numHosts = 1000; + String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" }; + + final Random random = new Random(seed); + IndexWriter indexWriter = new IndexWriter(directory, iwc); + for (int i = 0; i < nDocs; i++) { + final Document doc = new Document(); + + final int batchIndex = i / numHosts; + final String hostName = "host-" + batchIndex; + // Slightly vary the timestamp in each document + final long timestamp = BASE_TIMESTAMP + ((i % numHosts) * deltaTime) + random.nextInt(0, deltaTime); + + doc.add(new SortedDocValuesField(HOSTNAME_FIELD, new BytesRef(hostName))); + doc.add(new SortedNumericDocValuesField(TIMESTAMP_FIELD, timestamp)); + doc.add(new SortedNumericDocValuesField("counter_1", counter1++)); + doc.add(new SortedNumericDocValuesField("counter_2", counter2++)); + doc.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[i % gauge1Values.length])); + doc.add(new SortedNumericDocValuesField("gauge_2", gauge2Values[i % gauge1Values.length])); + int numTags = tags.length % (i + 1); + for (int j = 0; j < numTags; j++) { + doc.add(new SortedSetDocValuesField("tags", new BytesRef(tags[j]))); + } + + indexWriter.addDocument(doc); + } + indexWriter.commit(); + return indexWriter; + } + + @Benchmark + public void forceMergeWithoutOptimizedMerge() throws IOException { + forceMerge(indexWriterWithoutOptimizedMerge); + } + + @Benchmark + public void forceMergeWithOptimizedMerge() throws IOException { + forceMerge(indexWriterWithOptimizedMerge); + } + + private void forceMerge(final IndexWriter indexWriter) throws IOException { + indexWriter.forceMerge(1); + } + + @TearDown(Level.Trial) + public void tearDown() { + if (executorService != null) { + executorService.shutdown(); + try { + if (executorService.awaitTermination(30, TimeUnit.SECONDS) == false) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + } + + private static IndexWriterConfig createIndexWriterConfig(boolean optimizedMergeEnabled) { + var config = new IndexWriterConfig(new StandardAnalyzer()); + // NOTE: index sort config matching LogsDB's sort order + config.setIndexSort( + new Sort( + new SortField(HOSTNAME_FIELD, SortField.Type.STRING, false), + new SortedNumericSortField(TIMESTAMP_FIELD, SortField.Type.LONG, true) + ) + ); + config.setLeafSorter(DataStream.TIMESERIES_LEAF_READERS_SORTER); + config.setMergePolicy(new LogByteSizeMergePolicy()); + var docValuesFormat = new ES819TSDBDocValuesFormat(4096, optimizedMergeEnabled); + config.setCodec(new Elasticsearch900Lucene101Codec() { + + @Override + public DocValuesFormat getDocValuesFormatForField(String field) { + return docValuesFormat; + } + }); + return config; + } +} diff --git a/docs/changelog/125403.yaml b/docs/changelog/125403.yaml new file mode 100644 index 0000000000000..d953dae4db4fe --- /dev/null +++ b/docs/changelog/125403.yaml @@ -0,0 +1,5 @@ +pr: 125403 +summary: First step optimizing tsdb doc values codec merging +area: Codec +type: enhancement +issues: [] diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index 6a1b5bfb97685..9fa84efcd1099 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -475,4 +475,5 @@ exports org.elasticsearch.monitor.metrics; exports org.elasticsearch.plugins.internal.rewriter to org.elasticsearch.inference; exports org.elasticsearch.lucene.util.automaton; + exports org.elasticsearch.index.codec.perfield; } diff --git a/server/src/main/java/org/elasticsearch/index/codec/Elasticsearch900Lucene101Codec.java b/server/src/main/java/org/elasticsearch/index/codec/Elasticsearch900Lucene101Codec.java index ae7fa481a1caa..d96495fb0f615 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/Elasticsearch900Lucene101Codec.java +++ b/server/src/main/java/org/elasticsearch/index/codec/Elasticsearch900Lucene101Codec.java @@ -17,9 +17,9 @@ import org.apache.lucene.codecs.lucene101.Lucene101PostingsFormat; import org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat; import org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsFormat; -import org.apache.lucene.codecs.perfield.PerFieldDocValuesFormat; import org.apache.lucene.codecs.perfield.PerFieldKnnVectorsFormat; import org.apache.lucene.codecs.perfield.PerFieldPostingsFormat; +import org.elasticsearch.index.codec.perfield.XPerFieldDocValuesFormat; import org.elasticsearch.index.codec.zstd.Zstd814StoredFieldsFormat; /** @@ -39,7 +39,7 @@ public PostingsFormat getPostingsFormatForField(String field) { }; private final DocValuesFormat defaultDVFormat; - private final DocValuesFormat docValuesFormat = new PerFieldDocValuesFormat() { + private final DocValuesFormat docValuesFormat = new XPerFieldDocValuesFormat() { @Override public DocValuesFormat getDocValuesFormatForField(String field) { return Elasticsearch900Lucene101Codec.this.getDocValuesFormatForField(field); diff --git a/server/src/main/java/org/elasticsearch/index/codec/perfield/XPerFieldDocValuesFormat.java b/server/src/main/java/org/elasticsearch/index/codec/perfield/XPerFieldDocValuesFormat.java new file mode 100644 index 0000000000000..2b5a5d9d45f10 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/perfield/XPerFieldDocValuesFormat.java @@ -0,0 +1,372 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.codec.perfield; + +import org.apache.lucene.codecs.DocValuesConsumer; +import org.apache.lucene.codecs.DocValuesFormat; +import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.codecs.perfield.PerFieldDocValuesFormat; +import org.apache.lucene.index.BinaryDocValues; +import org.apache.lucene.index.DocValuesSkipper; +import org.apache.lucene.index.DocValuesType; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.MergeState; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.SegmentReadState; +import org.apache.lucene.index.SegmentWriteState; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.internal.hppc.IntObjectHashMap; +import org.apache.lucene.util.IOUtils; +import org.elasticsearch.core.SuppressForbidden; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.IdentityHashMap; +import java.util.Map; + +/** + * Fork of {@link PerFieldDocValuesFormat} to allow access FieldsReader's fields field, otherwise no changes. + */ +public abstract class XPerFieldDocValuesFormat extends DocValuesFormat { + /** Name of this {@link DocValuesFormat}. */ + public static final String PER_FIELD_NAME = "ESPerFieldDV819"; + + /** {@link FieldInfo} attribute name used to store the format name for each field. */ + // FORK note: usage of PerFieldDocValuesFormat is needed for bwc purposes. + // (Otherwise, we load no fields from indices that use PerFieldDocValuesFormat) + public static final String PER_FIELD_FORMAT_KEY = PerFieldDocValuesFormat.class.getSimpleName() + ".format"; + + /** {@link FieldInfo} attribute name used to store the segment suffix name for each field. */ + // FORK note: usage of PerFieldDocValuesFormat is needed for bwc purposes. + public static final String PER_FIELD_SUFFIX_KEY = PerFieldDocValuesFormat.class.getSimpleName() + ".suffix"; + + /** Sole constructor. */ + protected XPerFieldDocValuesFormat() { + super(PER_FIELD_NAME); + } + + @Override + public final DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOException { + return new FieldsWriter(state); + } + + record ConsumerAndSuffix(DocValuesConsumer consumer, int suffix) implements Closeable { + @Override + public void close() throws IOException { + consumer.close(); + } + } + + @SuppressForbidden(reason = "forked from Lucene") + private class FieldsWriter extends DocValuesConsumer { + + private final Map formats = new HashMap<>(); + private final Map suffixes = new HashMap<>(); + + private final SegmentWriteState segmentWriteState; + + FieldsWriter(SegmentWriteState state) { + segmentWriteState = state; + } + + @Override + public void addNumericField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException { + getInstance(field).addNumericField(field, valuesProducer); + } + + @Override + public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException { + getInstance(field).addBinaryField(field, valuesProducer); + } + + @Override + public void addSortedField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException { + getInstance(field).addSortedField(field, valuesProducer); + } + + @Override + public void addSortedNumericField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException { + getInstance(field).addSortedNumericField(field, valuesProducer); + } + + @Override + public void addSortedSetField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException { + getInstance(field).addSortedSetField(field, valuesProducer); + } + + @Override + public void merge(MergeState mergeState) throws IOException { + Map> consumersToField = new IdentityHashMap<>(); + + // Group each consumer by the fields it handles + for (FieldInfo fi : mergeState.mergeFieldInfos) { + if (fi.getDocValuesType() == DocValuesType.NONE) { + continue; + } + // merge should ignore current format for the fields being merged + DocValuesConsumer consumer = getInstance(fi, true); + Collection fieldsForConsumer = consumersToField.get(consumer); + if (fieldsForConsumer == null) { + fieldsForConsumer = new ArrayList<>(); + consumersToField.put(consumer, fieldsForConsumer); + } + fieldsForConsumer.add(fi.name); + } + + // Delegate the merge to the appropriate consumer + for (Map.Entry> e : consumersToField.entrySet()) { + e.getKey().merge(XPerFieldMergeState.restrictFields(mergeState, e.getValue())); + } + } + + private DocValuesConsumer getInstance(FieldInfo field) throws IOException { + return getInstance(field, false); + } + + /** + * DocValuesConsumer for the given field. + * + * @param field - FieldInfo object. + * @param ignoreCurrentFormat - ignore the existing format attributes. + * @return DocValuesConsumer for the field. + * @throws IOException if there is a low-level IO error + */ + private DocValuesConsumer getInstance(FieldInfo field, boolean ignoreCurrentFormat) throws IOException { + DocValuesFormat format = null; + if (field.getDocValuesGen() != -1) { + String formatName = null; + if (ignoreCurrentFormat == false) { + formatName = field.getAttribute(PER_FIELD_FORMAT_KEY); + } + // this means the field never existed in that segment, yet is applied updates + if (formatName != null) { + format = DocValuesFormat.forName(formatName); + } + } + if (format == null) { + format = getDocValuesFormatForField(field.name); + } + if (format == null) { + throw new IllegalStateException("invalid null DocValuesFormat for field=\"" + field.name + "\""); + } + final String formatName = format.getName(); + + field.putAttribute(PER_FIELD_FORMAT_KEY, formatName); + Integer suffix = null; + + ConsumerAndSuffix consumer = formats.get(format); + if (consumer == null) { + // First time we are seeing this format; create a new instance + + if (field.getDocValuesGen() != -1) { + String suffixAtt = null; + if (ignoreCurrentFormat == false) { + suffixAtt = field.getAttribute(PER_FIELD_SUFFIX_KEY); + } + // even when dvGen is != -1, it can still be a new field, that never + // existed in the segment, and therefore doesn't have the recorded + // attributes yet. + if (suffixAtt != null) { + suffix = Integer.valueOf(suffixAtt); + } + } + + if (suffix == null) { + // bump the suffix + suffix = suffixes.get(formatName); + if (suffix == null) { + suffix = 0; + } else { + suffix = suffix + 1; + } + } + suffixes.put(formatName, suffix); + + final String segmentSuffix = getFullSegmentSuffix( + segmentWriteState.segmentSuffix, + getSuffix(formatName, Integer.toString(suffix)) + ); + consumer = new ConsumerAndSuffix(format.fieldsConsumer(new SegmentWriteState(segmentWriteState, segmentSuffix)), suffix); + formats.put(format, consumer); + } else { + // we've already seen this format, so just grab its suffix + assert suffixes.containsKey(formatName); + suffix = consumer.suffix; + } + + field.putAttribute(PER_FIELD_SUFFIX_KEY, Integer.toString(suffix)); + // TODO: we should only provide the "slice" of FIS + // that this DVF actually sees ... + return consumer.consumer; + } + + @Override + public void close() throws IOException { + // Close all subs + IOUtils.close(formats.values()); + } + } + + static String getSuffix(String formatName, String suffix) { + return formatName + "_" + suffix; + } + + static String getFullSegmentSuffix(String outerSegmentSuffix, String segmentSuffix) { + if (outerSegmentSuffix.length() == 0) { + return segmentSuffix; + } else { + return outerSegmentSuffix + "_" + segmentSuffix; + } + } + + @SuppressForbidden(reason = "forked from Lucene") + public static class FieldsReader extends DocValuesProducer { + + private final IntObjectHashMap fields = new IntObjectHashMap<>(); + private final Map formats = new HashMap<>(); + + // clone for merge + FieldsReader(FieldsReader other) { + Map oldToNew = new IdentityHashMap<>(); + // First clone all formats + for (Map.Entry ent : other.formats.entrySet()) { + DocValuesProducer values = ent.getValue().getMergeInstance(); + formats.put(ent.getKey(), values); + oldToNew.put(ent.getValue(), values); + } + + // Then rebuild fields: + for (IntObjectHashMap.IntObjectCursor ent : other.fields) { + DocValuesProducer producer = oldToNew.get(ent.value); + assert producer != null; + fields.put(ent.key, producer); + } + } + + FieldsReader(final SegmentReadState readState) throws IOException { + + // Init each unique format: + boolean success = false; + try { + // Read field name -> format name + for (FieldInfo fi : readState.fieldInfos) { + if (fi.getDocValuesType() != DocValuesType.NONE) { + final String fieldName = fi.name; + final String formatName = fi.getAttribute(PER_FIELD_FORMAT_KEY); + if (formatName != null) { + // null formatName means the field is in fieldInfos, but has no docvalues! + final String suffix = fi.getAttribute(PER_FIELD_SUFFIX_KEY); + if (suffix == null) { + throw new IllegalStateException("missing attribute: " + PER_FIELD_SUFFIX_KEY + " for field: " + fieldName); + } + DocValuesFormat format = DocValuesFormat.forName(formatName); + String segmentSuffix = getFullSegmentSuffix(readState.segmentSuffix, getSuffix(formatName, suffix)); + if (formats.containsKey(segmentSuffix) == false) { + formats.put(segmentSuffix, format.fieldsProducer(new SegmentReadState(readState, segmentSuffix))); + } + fields.put(fi.number, formats.get(segmentSuffix)); + } + } + } + success = true; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(formats.values()); + } + } + } + + // FORK note: the reason why PerFieldDocValuesFormat is forked: + public DocValuesProducer getDocValuesProducer(FieldInfo field) { + return fields.get(field.number); + } + + public Map getFormats() { + return formats; + } + + @Override + public NumericDocValues getNumeric(FieldInfo field) throws IOException { + DocValuesProducer producer = fields.get(field.number); + return producer == null ? null : producer.getNumeric(field); + } + + @Override + public BinaryDocValues getBinary(FieldInfo field) throws IOException { + DocValuesProducer producer = fields.get(field.number); + return producer == null ? null : producer.getBinary(field); + } + + @Override + public SortedDocValues getSorted(FieldInfo field) throws IOException { + DocValuesProducer producer = fields.get(field.number); + return producer == null ? null : producer.getSorted(field); + } + + @Override + public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { + DocValuesProducer producer = fields.get(field.number); + return producer == null ? null : producer.getSortedNumeric(field); + } + + @Override + public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException { + DocValuesProducer producer = fields.get(field.number); + return producer == null ? null : producer.getSortedSet(field); + } + + @Override + public DocValuesSkipper getSkipper(FieldInfo field) throws IOException { + DocValuesProducer producer = fields.get(field.number); + return producer == null ? null : producer.getSkipper(field); + } + + @Override + public void close() throws IOException { + IOUtils.close(formats.values()); + } + + @Override + public void checkIntegrity() throws IOException { + for (DocValuesProducer format : formats.values()) { + format.checkIntegrity(); + } + } + + @Override + public DocValuesProducer getMergeInstance() { + return new FieldsReader(this); + } + + @Override + public String toString() { + return "PerFieldDocValues(formats=" + formats.size() + ")"; + } + } + + @Override + public final DocValuesProducer fieldsProducer(SegmentReadState state) throws IOException { + return new FieldsReader(state); + } + + /** + * Returns the doc values format that should be used for writing new segments of field + * . + * + *

The field to format mapping is written to the index, so this method is only invoked when + * writing, not when reading. + */ + public abstract DocValuesFormat getDocValuesFormatForField(String field); +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/perfield/XPerFieldMergeState.java b/server/src/main/java/org/elasticsearch/index/codec/perfield/XPerFieldMergeState.java new file mode 100644 index 0000000000000..72a8c4bc1492b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/perfield/XPerFieldMergeState.java @@ -0,0 +1,261 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.codec.perfield; + +import org.apache.lucene.codecs.FieldsProducer; +import org.apache.lucene.index.DocValuesType; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FieldInfos; +import org.apache.lucene.index.IndexOptions; +import org.apache.lucene.index.MergeState; +import org.apache.lucene.index.Terms; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +/** Fork of org.apache.lucene.codecs.perfield.PerFieldMergeState, because of {@link XPerFieldDocValuesFormat} */ +final class XPerFieldMergeState { + + /** + * Create a new MergeState from the given {@link MergeState} instance with restricted fields. + * + * @param fields The fields to keep in the new instance. + * @return The new MergeState with restricted fields + */ + static MergeState restrictFields(MergeState in, Collection fields) { + var fieldInfos = new FieldInfos[in.fieldInfos.length]; + for (int i = 0; i < in.fieldInfos.length; i++) { + fieldInfos[i] = new FilterFieldInfos(in.fieldInfos[i], fields); + } + var fieldsProducers = new FieldsProducer[in.fieldsProducers.length]; + for (int i = 0; i < in.fieldsProducers.length; i++) { + fieldsProducers[i] = in.fieldsProducers[i] == null ? null : new FilterFieldsProducer(in.fieldsProducers[i], fields); + } + var mergeFieldInfos = new FilterFieldInfos(in.mergeFieldInfos, fields); + return new MergeState( + in.docMaps, + in.segmentInfo, + mergeFieldInfos, + in.storedFieldsReaders, + in.termVectorsReaders, + in.normsProducers, + in.docValuesProducers, + fieldInfos, + in.liveDocs, + fieldsProducers, + in.pointsReaders, + in.knnVectorsReaders, + in.maxDocs, + in.infoStream, + in.intraMergeTaskExecutor, + in.needsIndexSort + ); + } + + private static class FilterFieldInfos extends FieldInfos { + private final Set filteredNames; + private final List filtered; + + // Copy of the private fields from FieldInfos + // Renamed so as to be less confusing about which fields we're referring to + private final boolean filteredHasVectors; + private final boolean filteredHasPostings; + private final boolean filteredHasProx; + private final boolean filteredHasPayloads; + private final boolean filteredHasOffsets; + private final boolean filteredHasFreq; + private final boolean filteredHasNorms; + private final boolean filteredHasDocValues; + private final boolean filteredHasPointValues; + + FilterFieldInfos(FieldInfos src, Collection filterFields) { + // Copy all the input FieldInfo objects since the field numbering must be kept consistent + super(toArray(src)); + + boolean hasVectors = false; + boolean hasPostings = false; + boolean hasProx = false; + boolean hasPayloads = false; + boolean hasOffsets = false; + boolean hasFreq = false; + boolean hasNorms = false; + boolean hasDocValues = false; + boolean hasPointValues = false; + + this.filteredNames = new HashSet<>(filterFields); + this.filtered = new ArrayList<>(filterFields.size()); + for (FieldInfo fi : src) { + if (this.filteredNames.contains(fi.name)) { + this.filtered.add(fi); + hasVectors |= fi.hasTermVectors(); + hasPostings |= fi.getIndexOptions() != IndexOptions.NONE; + hasProx |= fi.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS) >= 0; + hasFreq |= fi.getIndexOptions() != IndexOptions.DOCS; + hasOffsets |= fi.getIndexOptions().compareTo(IndexOptions.DOCS_AND_FREQS_AND_POSITIONS_AND_OFFSETS) >= 0; + hasNorms |= fi.hasNorms(); + hasDocValues |= fi.getDocValuesType() != DocValuesType.NONE; + hasPayloads |= fi.hasPayloads(); + hasPointValues |= (fi.getPointDimensionCount() != 0); + } + } + + this.filteredHasVectors = hasVectors; + this.filteredHasPostings = hasPostings; + this.filteredHasProx = hasProx; + this.filteredHasPayloads = hasPayloads; + this.filteredHasOffsets = hasOffsets; + this.filteredHasFreq = hasFreq; + this.filteredHasNorms = hasNorms; + this.filteredHasDocValues = hasDocValues; + this.filteredHasPointValues = hasPointValues; + } + + private static FieldInfo[] toArray(FieldInfos src) { + FieldInfo[] res = new FieldInfo[src.size()]; + int i = 0; + for (FieldInfo fi : src) { + res[i++] = fi; + } + return res; + } + + @Override + public Iterator iterator() { + return filtered.iterator(); + } + + @Override + public boolean hasFreq() { + return filteredHasFreq; + } + + @Override + public boolean hasPostings() { + return filteredHasPostings; + } + + @Override + public boolean hasProx() { + return filteredHasProx; + } + + @Override + public boolean hasPayloads() { + return filteredHasPayloads; + } + + @Override + public boolean hasOffsets() { + return filteredHasOffsets; + } + + @Override + public boolean hasTermVectors() { + return filteredHasVectors; + } + + @Override + public boolean hasNorms() { + return filteredHasNorms; + } + + @Override + public boolean hasDocValues() { + return filteredHasDocValues; + } + + @Override + public boolean hasPointValues() { + return filteredHasPointValues; + } + + @Override + public int size() { + return filtered.size(); + } + + @Override + public FieldInfo fieldInfo(String fieldName) { + if (filteredNames.contains(fieldName) == false) { + // Throw IAE to be consistent with fieldInfo(int) which throws it as well on invalid numbers + throw new IllegalArgumentException( + "The field named '" + + fieldName + + "' is not accessible in the current " + + "merge context, available ones are: " + + filteredNames + ); + } + return super.fieldInfo(fieldName); + } + + @Override + public FieldInfo fieldInfo(int fieldNumber) { + FieldInfo res = super.fieldInfo(fieldNumber); + if (filteredNames.contains(res.name) == false) { + throw new IllegalArgumentException( + "The field named '" + + res.name + + "' numbered '" + + fieldNumber + + "' is not " + + "accessible in the current merge context, available ones are: " + + filteredNames + ); + } + return res; + } + } + + private static class FilterFieldsProducer extends FieldsProducer { + private final FieldsProducer in; + private final List filtered; + + FilterFieldsProducer(FieldsProducer in, Collection filterFields) { + this.in = in; + this.filtered = new ArrayList<>(filterFields); + } + + @Override + public Iterator iterator() { + return filtered.iterator(); + } + + @Override + public Terms terms(String field) throws IOException { + if (filtered.contains(field) == false) { + throw new IllegalArgumentException( + "The field named '" + field + "' is not accessible in the current " + "merge context, available ones are: " + filtered + ); + } + return in.terms(field); + } + + @Override + public int size() { + return filtered.size(); + } + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public void checkIntegrity() throws IOException { + in.checkIntegrity(); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/DocValuesConsumerUtil.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/DocValuesConsumerUtil.java new file mode 100644 index 0000000000000..d6dae9ea882f9 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/DocValuesConsumerUtil.java @@ -0,0 +1,93 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.codec.tsdb.es819; + +import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.MergeState; +import org.elasticsearch.index.codec.perfield.XPerFieldDocValuesFormat; + +/** + * Contains logic to determine whether optimized merge can occur. + */ +class DocValuesConsumerUtil { + + static final MergeStats UNSUPPORTED = new MergeStats(false, -1, -1); + + record MergeStats(boolean supported, long sumNumValues, int sumNumDocsWithField) {} + + static MergeStats compatibleWithOptimizedMerge(boolean optimizedMergeEnabled, MergeState mergeState, FieldInfo fieldInfo) { + if (optimizedMergeEnabled == false || mergeState.needsIndexSort == false) { + return UNSUPPORTED; + } + + // Documents marked as deleted should be rare. Maybe in the case of noop operation? + for (int i = 0; i < mergeState.liveDocs.length; i++) { + if (mergeState.liveDocs[i] != null) { + return UNSUPPORTED; + } + } + + long sumNumValues = 0; + int sumNumDocsWithField = 0; + + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + if (docValuesProducer instanceof XPerFieldDocValuesFormat.FieldsReader perFieldReader) { + var wrapped = perFieldReader.getDocValuesProducer(fieldInfo); + if (wrapped instanceof ES819TSDBDocValuesProducer tsdbDocValuesProducer) { + switch (fieldInfo.getDocValuesType()) { + case NUMERIC -> { + var entry = tsdbDocValuesProducer.numerics.get(fieldInfo.number); + if (entry != null) { + sumNumValues += entry.numValues; + sumNumDocsWithField += entry.numDocsWithField; + } + } + case SORTED_NUMERIC -> { + var entry = tsdbDocValuesProducer.sortedNumerics.get(fieldInfo.number); + if (entry != null) { + sumNumValues += entry.numValues; + sumNumDocsWithField += entry.numDocsWithField; + } + } + case SORTED -> { + var entry = tsdbDocValuesProducer.sorted.get(fieldInfo.number); + if (entry != null) { + sumNumValues += entry.ordsEntry.numValues; + sumNumDocsWithField += entry.ordsEntry.numDocsWithField; + } + } + case SORTED_SET -> { + var entry = tsdbDocValuesProducer.sortedSets.get(fieldInfo.number); + if (entry != null) { + if (entry.singleValueEntry != null) { + sumNumValues += entry.singleValueEntry.ordsEntry.numValues; + sumNumDocsWithField += entry.singleValueEntry.ordsEntry.numDocsWithField; + } else { + sumNumValues += entry.ordsEntry.numValues; + sumNumDocsWithField += entry.ordsEntry.numDocsWithField; + } + } + } + default -> throw new IllegalStateException("unexpected doc values producer type: " + fieldInfo.getDocValuesType()); + } + } else { + return UNSUPPORTED; + } + } else { + return UNSUPPORTED; + } + } + + return new MergeStats(true, sumNumValues, sumNumDocsWithField); + } + +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesConsumer.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesConsumer.java index 779ef68072568..b860c0f5983c7 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesConsumer.java +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesConsumer.java @@ -10,15 +10,14 @@ package org.elasticsearch.index.codec.tsdb.es819; import org.apache.lucene.codecs.CodecUtil; -import org.apache.lucene.codecs.DocValuesConsumer; import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.codecs.lucene90.IndexedDISI; import org.apache.lucene.index.BinaryDocValues; import org.apache.lucene.index.DocValues; import org.apache.lucene.index.DocValuesSkipIndexType; -import org.apache.lucene.index.EmptyDocValuesProducer; import org.apache.lucene.index.FieldInfo; import org.apache.lucene.index.IndexFileNames; +import org.apache.lucene.index.MergeState; import org.apache.lucene.index.NumericDocValues; import org.apache.lucene.index.SegmentWriteState; import org.apache.lucene.index.SortedDocValues; @@ -47,21 +46,24 @@ import java.util.Arrays; import java.util.List; +import static org.elasticsearch.index.codec.tsdb.es819.DocValuesConsumerUtil.compatibleWithOptimizedMerge; import static org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat.DIRECT_MONOTONIC_BLOCK_SHIFT; import static org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat.SKIP_INDEX_LEVEL_SHIFT; import static org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat.SKIP_INDEX_MAX_LEVEL; import static org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat.SORTED_SET; -final class ES819TSDBDocValuesConsumer extends DocValuesConsumer { +final class ES819TSDBDocValuesConsumer extends XDocValuesConsumer { IndexOutput data, meta; final int maxDoc; private byte[] termsDictBuffer; private final int skipIndexIntervalSize; + final boolean enableOptimizedMerge; ES819TSDBDocValuesConsumer( SegmentWriteState state, int skipIndexIntervalSize, + boolean enableOptimizedMerge, String dataCodec, String dataExtension, String metaCodec, @@ -90,6 +92,7 @@ final class ES819TSDBDocValuesConsumer extends DocValuesConsumer { ); maxDoc = state.segmentInfo.maxDoc(); this.skipIndexIntervalSize = skipIndexIntervalSize; + this.enableOptimizedMerge = enableOptimizedMerge; success = true; } finally { if (success == false) { @@ -102,7 +105,7 @@ final class ES819TSDBDocValuesConsumer extends DocValuesConsumer { public void addNumericField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException { meta.writeInt(field.number); meta.writeByte(ES819TSDBDocValuesFormat.NUMERIC); - DocValuesProducer producer = new EmptyDocValuesProducer() { + var producer = new TsdbDocValuesProducer(valuesProducer) { @Override public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { return DocValues.singleton(valuesProducer.getNumeric(field)); @@ -115,15 +118,21 @@ public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOExcepti writeField(field, producer, -1); } - private long[] writeField(FieldInfo field, DocValuesProducer valuesProducer, long maxOrd) throws IOException { + private long[] writeField(FieldInfo field, TsdbDocValuesProducer valuesProducer, long maxOrd) throws IOException { int numDocsWithValue = 0; long numValues = 0; - SortedNumericDocValues values = valuesProducer.getSortedNumeric(field); - for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) { - numDocsWithValue++; - final int count = values.docValueCount(); - numValues += count; + SortedNumericDocValues values; + if (valuesProducer.mergeStats.supported()) { + numDocsWithValue = valuesProducer.mergeStats.sumNumDocsWithField(); + numValues = valuesProducer.mergeStats.sumNumValues(); + } else { + values = valuesProducer.getSortedNumeric(field); + for (int doc = values.nextDoc(); doc != DocIdSetIterator.NO_MORE_DOCS; doc = values.nextDoc()) { + numDocsWithValue++; + final int count = values.docValueCount(); + numValues += count; + } } meta.writeLong(numValues); @@ -212,6 +221,16 @@ private long[] writeField(FieldInfo field, DocValuesProducer valuesProducer, lon return new long[] { numDocsWithValue, numValues }; } + @Override + public void mergeNumericField(FieldInfo mergeFieldInfo, MergeState mergeState) throws IOException { + var result = compatibleWithOptimizedMerge(enableOptimizedMerge, mergeState, mergeFieldInfo); + if (result.supported()) { + mergeNumericField(result, mergeFieldInfo, mergeState); + } else { + super.mergeNumericField(mergeFieldInfo, mergeState); + } + } + @Override public void addBinaryField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException { meta.writeInt(field.number); @@ -287,8 +306,18 @@ public void addSortedField(FieldInfo field, DocValuesProducer valuesProducer) th doAddSortedField(field, valuesProducer, false); } + @Override + public void mergeSortedField(FieldInfo mergeFieldInfo, MergeState mergeState) throws IOException { + var result = compatibleWithOptimizedMerge(enableOptimizedMerge, mergeState, mergeFieldInfo); + if (result.supported()) { + mergeSortedField(result, mergeFieldInfo, mergeState); + } else { + super.mergeSortedField(mergeFieldInfo, mergeState); + } + } + private void doAddSortedField(FieldInfo field, DocValuesProducer valuesProducer, boolean addTypeByte) throws IOException { - DocValuesProducer producer = new EmptyDocValuesProducer() { + var producer = new TsdbDocValuesProducer(valuesProducer) { @Override public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { SortedDocValues sorted = valuesProducer.getSorted(field); @@ -483,10 +512,10 @@ private void writeTermsIndex(SortedSetDocValues values) throws IOException { public void addSortedNumericField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException { meta.writeInt(field.number); meta.writeByte(ES819TSDBDocValuesFormat.SORTED_NUMERIC); - writeSortedNumericField(field, valuesProducer, -1); + writeSortedNumericField(field, new TsdbDocValuesProducer(valuesProducer), -1); } - private void writeSortedNumericField(FieldInfo field, DocValuesProducer valuesProducer, long maxOrd) throws IOException { + private void writeSortedNumericField(FieldInfo field, TsdbDocValuesProducer valuesProducer, long maxOrd) throws IOException { if (field.docValuesSkipIndexType() != DocValuesSkipIndexType.NONE) { writeSkipIndex(field, valuesProducer); } @@ -521,7 +550,22 @@ private void writeSortedNumericField(FieldInfo field, DocValuesProducer valuesPr } } - private static boolean isSingleValued(SortedSetDocValues values) throws IOException { + @Override + public void mergeSortedNumericField(FieldInfo mergeFieldInfo, MergeState mergeState) throws IOException { + var result = compatibleWithOptimizedMerge(enableOptimizedMerge, mergeState, mergeFieldInfo); + if (result.supported()) { + mergeSortedNumericField(result, mergeFieldInfo, mergeState); + } else { + super.mergeSortedNumericField(mergeFieldInfo, mergeState); + } + } + + private static boolean isSingleValued(FieldInfo field, TsdbDocValuesProducer producer) throws IOException { + if (producer.mergeStats.supported()) { + return producer.mergeStats.sumNumValues() == producer.mergeStats.sumNumDocsWithField(); + } + + var values = producer.getSortedSet(field); if (DocValues.unwrapSingleton(values) != null) { return true; } @@ -537,13 +581,23 @@ private static boolean isSingleValued(SortedSetDocValues values) throws IOExcept return true; } + @Override + public void mergeSortedSetField(FieldInfo mergeFieldInfo, MergeState mergeState) throws IOException { + var result = compatibleWithOptimizedMerge(enableOptimizedMerge, mergeState, mergeFieldInfo); + if (result.supported()) { + mergeSortedSetField(result, mergeFieldInfo, mergeState); + } else { + super.mergeSortedSetField(mergeFieldInfo, mergeState); + } + } + @Override public void addSortedSetField(FieldInfo field, DocValuesProducer valuesProducer) throws IOException { meta.writeInt(field.number); meta.writeByte(SORTED_SET); - if (isSingleValued(valuesProducer.getSortedSet(field))) { - doAddSortedField(field, new EmptyDocValuesProducer() { + if (isSingleValued(field, new TsdbDocValuesProducer(valuesProducer))) { + doAddSortedField(field, new TsdbDocValuesProducer(valuesProducer) { @Override public SortedDocValues getSorted(FieldInfo field) throws IOException { return SortedSetSelector.wrap(valuesProducer.getSortedSet(field), SortedSetSelector.Type.MIN); @@ -554,7 +608,7 @@ public SortedDocValues getSorted(FieldInfo field) throws IOException { SortedSetDocValues values = valuesProducer.getSortedSet(field); long maxOrd = values.getValueCount(); - writeSortedNumericField(field, new EmptyDocValuesProducer() { + writeSortedNumericField(field, new TsdbDocValuesProducer(valuesProducer) { @Override public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { SortedSetDocValues values = valuesProducer.getSortedSet(field); diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormat.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormat.java index 706726cb90028..fd35f8a96dc20 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormat.java +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormat.java @@ -13,6 +13,7 @@ import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.index.SegmentReadState; import org.apache.lucene.index.SegmentWriteState; +import org.elasticsearch.common.util.FeatureFlag; import java.io.IOException; @@ -88,25 +89,47 @@ public class ES819TSDBDocValuesFormat extends org.apache.lucene.codecs.DocValues } } + // Default for escape hatch: + static final boolean OPTIMIZED_MERGE_ENABLE_DEFAULT; + static final FeatureFlag TSDB_DOC_VALUES_OPTIMIZED_MERGE = new FeatureFlag("tsdb_doc_values_optimized_merge"); + static final String OPTIMIZED_MERGE_ENABLED_NAME = ES819TSDBDocValuesConsumer.class.getName() + ".enableOptimizedMerge"; + + static { + boolean optimizedMergeDefault = TSDB_DOC_VALUES_OPTIMIZED_MERGE.isEnabled(); + OPTIMIZED_MERGE_ENABLE_DEFAULT = Boolean.parseBoolean( + System.getProperty(OPTIMIZED_MERGE_ENABLED_NAME, Boolean.toString(optimizedMergeDefault)) + ); + } + final int skipIndexIntervalSize; + private final boolean enableOptimizedMerge; /** Default constructor. */ public ES819TSDBDocValuesFormat() { - this(DEFAULT_SKIP_INDEX_INTERVAL_SIZE); + this(DEFAULT_SKIP_INDEX_INTERVAL_SIZE, OPTIMIZED_MERGE_ENABLE_DEFAULT); } /** Doc values fields format with specified skipIndexIntervalSize. */ - public ES819TSDBDocValuesFormat(int skipIndexIntervalSize) { + public ES819TSDBDocValuesFormat(int skipIndexIntervalSize, boolean enableOptimizedMerge) { super(CODEC_NAME); if (skipIndexIntervalSize < 2) { throw new IllegalArgumentException("skipIndexIntervalSize must be > 1, got [" + skipIndexIntervalSize + "]"); } this.skipIndexIntervalSize = skipIndexIntervalSize; + this.enableOptimizedMerge = enableOptimizedMerge; } @Override public DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOException { - return new ES819TSDBDocValuesConsumer(state, skipIndexIntervalSize, DATA_CODEC, DATA_EXTENSION, META_CODEC, META_EXTENSION); + return new ES819TSDBDocValuesConsumer( + state, + skipIndexIntervalSize, + enableOptimizedMerge, + DATA_CODEC, + DATA_EXTENSION, + META_CODEC, + META_EXTENSION + ); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java index 0e051aa5fdd25..22172268add5f 100644 --- a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesProducer.java @@ -51,11 +51,11 @@ import static org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat.TERMS_DICT_BLOCK_LZ4_SHIFT; final class ES819TSDBDocValuesProducer extends DocValuesProducer { - private final IntObjectHashMap numerics; + final IntObjectHashMap numerics; private final IntObjectHashMap binaries; - private final IntObjectHashMap sorted; - private final IntObjectHashMap sortedSets; - private final IntObjectHashMap sortedNumerics; + final IntObjectHashMap sorted; + final IntObjectHashMap sortedSets; + final IntObjectHashMap sortedNumerics; private final IntObjectHashMap skippers; private final IndexInput data; private final int maxDoc; @@ -1430,7 +1430,7 @@ private void set() { private record DocValuesSkipperEntry(long offset, long length, long minValue, long maxValue, int docCount, int maxDocId) {} - private static class NumericEntry { + static class NumericEntry { long docsWithFieldOffset; long docsWithFieldLength; short jumpTableEntryCount; @@ -1460,18 +1460,18 @@ private static class BinaryEntry { DirectMonotonicReader.Meta addressesMeta; } - private static class SortedNumericEntry extends NumericEntry { + static class SortedNumericEntry extends NumericEntry { DirectMonotonicReader.Meta addressesMeta; long addressesOffset; long addressesLength; } - private static class SortedEntry { + static class SortedEntry { NumericEntry ordsEntry; TermsDictEntry termsDictEntry; } - private static class SortedSetEntry { + static class SortedSetEntry { SortedEntry singleValueEntry; SortedNumericEntry ordsEntry; TermsDictEntry termsDictEntry; diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/TsdbDocValuesProducer.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/TsdbDocValuesProducer.java new file mode 100644 index 0000000000000..b8100f2e635f8 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/TsdbDocValuesProducer.java @@ -0,0 +1,66 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.index.codec.tsdb.es819; + +import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.index.EmptyDocValuesProducer; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; + +import java.io.IOException; + +class TsdbDocValuesProducer extends EmptyDocValuesProducer { + + final DocValuesConsumerUtil.MergeStats mergeStats; + final DocValuesProducer actual; + + TsdbDocValuesProducer(DocValuesConsumerUtil.MergeStats mergeStats) { + this.mergeStats = mergeStats; + this.actual = null; + } + + TsdbDocValuesProducer(DocValuesProducer valuesProducer) { + if (valuesProducer instanceof TsdbDocValuesProducer tsdb) { + mergeStats = tsdb.mergeStats; + } else { + mergeStats = DocValuesConsumerUtil.UNSUPPORTED; + } + this.actual = valuesProducer; + } + + @Override + public SortedDocValues getSorted(FieldInfo field) throws IOException { + if (actual != null) { + return actual.getSorted(field); + } else { + return super.getSorted(field); + } + } + + @Override + public SortedSetDocValues getSortedSet(FieldInfo field) throws IOException { + if (actual != null) { + return actual.getSortedSet(field); + } else { + return super.getSortedSet(field); + } + } + + @Override + public SortedNumericDocValues getSortedNumeric(FieldInfo field) throws IOException { + if (actual != null) { + return actual.getSortedNumeric(field); + } else { + return super.getSortedNumeric(field); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/XDocValuesConsumer.java b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/XDocValuesConsumer.java new file mode 100644 index 0000000000000..af6fc2587a49a --- /dev/null +++ b/server/src/main/java/org/elasticsearch/index/codec/tsdb/es819/XDocValuesConsumer.java @@ -0,0 +1,759 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ +package org.elasticsearch.index.codec.tsdb.es819; + +import org.apache.lucene.codecs.DocValuesConsumer; +import org.apache.lucene.codecs.DocValuesProducer; +import org.apache.lucene.index.BaseTermsEnum; +import org.apache.lucene.index.DocIDMerger; +import org.apache.lucene.index.DocValues; +import org.apache.lucene.index.DocValuesType; +import org.apache.lucene.index.FieldInfo; +import org.apache.lucene.index.FilteredTermsEnum; +import org.apache.lucene.index.ImpactsEnum; +import org.apache.lucene.index.MergeState; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.OrdinalMap; +import org.apache.lucene.index.PostingsEnum; +import org.apache.lucene.index.SortedDocValues; +import org.apache.lucene.index.SortedNumericDocValues; +import org.apache.lucene.index.SortedSetDocValues; +import org.apache.lucene.index.TermState; +import org.apache.lucene.index.TermsEnum; +import org.apache.lucene.util.AttributeSource; +import org.apache.lucene.util.Bits; +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.LongBitSet; +import org.apache.lucene.util.LongValues; +import org.apache.lucene.util.packed.PackedInts; +import org.elasticsearch.index.codec.tsdb.es819.DocValuesConsumerUtil.MergeStats; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS; + +/** + * Forks the merging logic from {@link DocValuesConsumer} that {@link ES819TSDBDocValuesConsumer} needs. + * This class should be removed when merging logic in {@link DocValuesConsumer} becomes accessible / overwritable in Lucene. + */ +public abstract class XDocValuesConsumer extends DocValuesConsumer { + + /** Sole constructor. (For invocation by subclass constructors, typically implicit.) */ + protected XDocValuesConsumer() {} + + /** Tracks state of one numeric sub-reader that we are merging */ + private static class NumericDocValuesSub extends DocIDMerger.Sub { + + final NumericDocValues values; + + NumericDocValuesSub(MergeState.DocMap docMap, NumericDocValues values) { + super(docMap); + this.values = values; + assert values.docID() == -1; + } + + @Override + public int nextDoc() throws IOException { + return values.nextDoc(); + } + } + + /** + * Merges the numeric docvalues from MergeState. + * + *

The default implementation calls {@link #addNumericField}, passing a DocValuesProducer that + * merges and filters deleted documents on the fly. + */ + public void mergeNumericField(MergeStats mergeStats, final FieldInfo mergeFieldInfo, final MergeState mergeState) throws IOException { + addNumericField(mergeFieldInfo, new TsdbDocValuesProducer(mergeStats) { + @Override + public NumericDocValues getNumeric(FieldInfo fieldInfo) throws IOException { + if (fieldInfo != mergeFieldInfo) { + throw new IllegalArgumentException("wrong fieldInfo"); + } + + List subs = new ArrayList<>(); + assert mergeState.docMaps.length == mergeState.docValuesProducers.length; + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + NumericDocValues values = null; + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + if (docValuesProducer != null) { + FieldInfo readerFieldInfo = mergeState.fieldInfos[i].fieldInfo(mergeFieldInfo.name); + if (readerFieldInfo != null && readerFieldInfo.getDocValuesType() == DocValuesType.NUMERIC) { + values = docValuesProducer.getNumeric(readerFieldInfo); + } + } + if (values != null) { + subs.add(new NumericDocValuesSub(mergeState.docMaps[i], values)); + } + } + + return mergeNumericValues(subs, mergeState.needsIndexSort); + } + }); + } + + private static NumericDocValues mergeNumericValues(List subs, boolean indexIsSorted) throws IOException { + long cost = 0; + for (NumericDocValuesSub sub : subs) { + cost += sub.values.cost(); + } + final long finalCost = cost; + + final DocIDMerger docIDMerger = DocIDMerger.of(subs, indexIsSorted); + + return new NumericDocValues() { + private int docID = -1; + private NumericDocValuesSub current; + + @Override + public int docID() { + return docID; + } + + @Override + public int nextDoc() throws IOException { + current = docIDMerger.next(); + if (current == null) { + docID = NO_MORE_DOCS; + } else { + docID = current.mappedDocID; + } + return docID; + } + + @Override + public int advance(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean advanceExact(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long cost() { + return finalCost; + } + + @Override + public long longValue() throws IOException { + return current.values.longValue(); + } + }; + } + + /** Tracks state of one sorted numeric sub-reader that we are merging */ + private static class SortedNumericDocValuesSub extends DocIDMerger.Sub { + + final SortedNumericDocValues values; + + SortedNumericDocValuesSub(MergeState.DocMap docMap, SortedNumericDocValues values) { + super(docMap); + this.values = values; + assert values.docID() == -1; + } + + @Override + public int nextDoc() throws IOException { + return values.nextDoc(); + } + } + + /** + * Merges the sorted docvalues from toMerge. + * + *

The default implementation calls {@link #addSortedNumericField}, passing iterables that + * filter deleted documents. + */ + public void mergeSortedNumericField(MergeStats mergeStats, FieldInfo mergeFieldInfo, final MergeState mergeState) throws IOException { + + addSortedNumericField(mergeFieldInfo, new TsdbDocValuesProducer(mergeStats) { + @Override + public SortedNumericDocValues getSortedNumeric(FieldInfo fieldInfo) throws IOException { + if (fieldInfo != mergeFieldInfo) { + throw new IllegalArgumentException("wrong FieldInfo"); + } + + // We must make new iterators + DocIDMerger for each iterator: + List subs = new ArrayList<>(); + long cost = 0; + boolean allSingletons = true; + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + SortedNumericDocValues values = null; + if (docValuesProducer != null) { + FieldInfo readerFieldInfo = mergeState.fieldInfos[i].fieldInfo(mergeFieldInfo.name); + if (readerFieldInfo != null && readerFieldInfo.getDocValuesType() == DocValuesType.SORTED_NUMERIC) { + values = docValuesProducer.getSortedNumeric(readerFieldInfo); + } + } + if (values == null) { + values = DocValues.emptySortedNumeric(); + } + cost += values.cost(); + if (allSingletons && DocValues.unwrapSingleton(values) == null) { + allSingletons = false; + } + subs.add(new SortedNumericDocValuesSub(mergeState.docMaps[i], values)); + } + + if (allSingletons) { + // All subs are single-valued. + // We specialize for that case since it makes it easier for codecs to optimize + // for single-valued fields. + List singleValuedSubs = new ArrayList<>(); + for (SortedNumericDocValuesSub sub : subs) { + final NumericDocValues singleValuedValues = DocValues.unwrapSingleton(sub.values); + assert singleValuedValues != null; + singleValuedSubs.add(new NumericDocValuesSub(sub.docMap, singleValuedValues)); + } + return DocValues.singleton(mergeNumericValues(singleValuedSubs, mergeState.needsIndexSort)); + } + + final long finalCost = cost; + + final DocIDMerger docIDMerger = DocIDMerger.of(subs, mergeState.needsIndexSort); + + return new SortedNumericDocValues() { + + private int docID = -1; + private SortedNumericDocValuesSub currentSub; + + @Override + public int docID() { + return docID; + } + + @Override + public int nextDoc() throws IOException { + currentSub = docIDMerger.next(); + if (currentSub == null) { + docID = NO_MORE_DOCS; + } else { + docID = currentSub.mappedDocID; + } + + return docID; + } + + @Override + public int advance(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean advanceExact(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int docValueCount() { + return currentSub.values.docValueCount(); + } + + @Override + public long cost() { + return finalCost; + } + + @Override + public long nextValue() throws IOException { + return currentSub.values.nextValue(); + } + }; + } + }); + } + + /** + * A merged {@link TermsEnum}. This helps avoid relying on the default terms enum, which calls + * {@link SortedDocValues#lookupOrd(int)} or {@link SortedSetDocValues#lookupOrd(long)} on every + * call to {@link TermsEnum#next()}. + */ + private static class MergedTermsEnum extends BaseTermsEnum { + + private final TermsEnum[] subs; + private final OrdinalMap ordinalMap; + private final long valueCount; + private long ord = -1; + private BytesRef term; + + MergedTermsEnum(OrdinalMap ordinalMap, TermsEnum[] subs) { + this.ordinalMap = ordinalMap; + this.subs = subs; + this.valueCount = ordinalMap.getValueCount(); + } + + @Override + public BytesRef term() throws IOException { + return term; + } + + @Override + public long ord() throws IOException { + return ord; + } + + @Override + public BytesRef next() throws IOException { + if (++ord >= valueCount) { + return null; + } + final int subNum = ordinalMap.getFirstSegmentNumber(ord); + final TermsEnum sub = subs[subNum]; + final long subOrd = ordinalMap.getFirstSegmentOrd(ord); + do { + term = sub.next(); + } while (sub.ord() < subOrd); + assert sub.ord() == subOrd; + return term; + } + + @Override + public AttributeSource attributes() { + throw new UnsupportedOperationException(); + } + + @Override + public SeekStatus seekCeil(BytesRef text) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void seekExact(long ord) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public int docFreq() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long totalTermFreq() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public PostingsEnum postings(PostingsEnum reuse, int flags) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public ImpactsEnum impacts(int flags) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public TermState termState() throws IOException { + throw new UnsupportedOperationException(); + } + } + + /** Tracks state of one sorted sub-reader that we are merging */ + private static class SortedDocValuesSub extends DocIDMerger.Sub { + + final SortedDocValues values; + final LongValues map; + + SortedDocValuesSub(MergeState.DocMap docMap, SortedDocValues values, LongValues map) { + super(docMap); + this.values = values; + this.map = map; + assert values.docID() == -1; + } + + @Override + public int nextDoc() throws IOException { + return values.nextDoc(); + } + } + + /** + * Merges the sorted docvalues from toMerge. + * + *

The default implementation calls {@link #addSortedField}, passing an Iterable that merges + * ordinals and values and filters deleted documents . + */ + public void mergeSortedField(MergeStats mergeStats, FieldInfo fieldInfo, final MergeState mergeState) throws IOException { + List toMerge = new ArrayList<>(); + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + SortedDocValues values = null; + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + if (docValuesProducer != null) { + FieldInfo readerFieldInfo = mergeState.fieldInfos[i].fieldInfo(fieldInfo.name); + if (readerFieldInfo != null && readerFieldInfo.getDocValuesType() == DocValuesType.SORTED) { + values = docValuesProducer.getSorted(readerFieldInfo); + } + } + if (values == null) { + values = DocValues.emptySorted(); + } + toMerge.add(values); + } + + final int numReaders = toMerge.size(); + final SortedDocValues[] dvs = toMerge.toArray(new SortedDocValues[numReaders]); + + // step 1: iterate thru each sub and mark terms still in use + TermsEnum[] liveTerms = new TermsEnum[dvs.length]; + long[] weights = new long[liveTerms.length]; + for (int sub = 0; sub < numReaders; sub++) { + SortedDocValues dv = dvs[sub]; + Bits liveDocs = mergeState.liveDocs[sub]; + if (liveDocs == null) { + liveTerms[sub] = dv.termsEnum(); + weights[sub] = dv.getValueCount(); + } else { + LongBitSet bitset = new LongBitSet(dv.getValueCount()); + int docID; + while ((docID = dv.nextDoc()) != NO_MORE_DOCS) { + if (liveDocs.get(docID)) { + int ord = dv.ordValue(); + if (ord >= 0) { + bitset.set(ord); + } + } + } + liveTerms[sub] = new BitsFilteredTermsEnum(dv.termsEnum(), bitset); + weights[sub] = bitset.cardinality(); + } + } + + // step 2: create ordinal map (this conceptually does the "merging") + final OrdinalMap map = OrdinalMap.build(null, liveTerms, weights, PackedInts.COMPACT); + + // step 3: add field + addSortedField(fieldInfo, new TsdbDocValuesProducer(mergeStats) { + @Override + public SortedDocValues getSorted(FieldInfo fieldInfoIn) throws IOException { + if (fieldInfoIn != fieldInfo) { + throw new IllegalArgumentException("wrong FieldInfo"); + } + + // We must make new iterators + DocIDMerger for each iterator: + + List subs = new ArrayList<>(); + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + SortedDocValues values = null; + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + if (docValuesProducer != null) { + FieldInfo readerFieldInfo = mergeState.fieldInfos[i].fieldInfo(fieldInfo.name); + if (readerFieldInfo != null && readerFieldInfo.getDocValuesType() == DocValuesType.SORTED) { + values = docValuesProducer.getSorted(readerFieldInfo); + } + } + if (values == null) { + values = DocValues.emptySorted(); + } + + subs.add(new SortedDocValuesSub(mergeState.docMaps[i], values, map.getGlobalOrds(i))); + } + + return mergeSortedValues(subs, mergeState.needsIndexSort, map); + } + }); + } + + private static SortedDocValues mergeSortedValues(List subs, boolean indexIsSorted, OrdinalMap map) + throws IOException { + long cost = 0; + for (SortedDocValuesSub sub : subs) { + cost += sub.values.cost(); + } + final long finalCost = cost; + + final DocIDMerger docIDMerger = DocIDMerger.of(subs, indexIsSorted); + + return new SortedDocValues() { + private int docID = -1; + private SortedDocValuesSub current; + + @Override + public int docID() { + return docID; + } + + @Override + public int nextDoc() throws IOException { + current = docIDMerger.next(); + if (current == null) { + docID = NO_MORE_DOCS; + } else { + docID = current.mappedDocID; + } + return docID; + } + + @Override + public int ordValue() throws IOException { + int subOrd = current.values.ordValue(); + assert subOrd != -1; + return (int) current.map.get(subOrd); + } + + @Override + public int advance(int target) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean advanceExact(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long cost() { + return finalCost; + } + + @Override + public int getValueCount() { + return (int) map.getValueCount(); + } + + @Override + public BytesRef lookupOrd(int ord) throws IOException { + int segmentNumber = map.getFirstSegmentNumber(ord); + int segmentOrd = (int) map.getFirstSegmentOrd(ord); + return subs.get(segmentNumber).values.lookupOrd(segmentOrd); + } + + @Override + public TermsEnum termsEnum() throws IOException { + TermsEnum[] termsEnurmSubs = new TermsEnum[subs.size()]; + for (int sub = 0; sub < termsEnurmSubs.length; ++sub) { + termsEnurmSubs[sub] = subs.get(sub).values.termsEnum(); + } + return new MergedTermsEnum(map, termsEnurmSubs); + } + }; + } + + /** Tracks state of one sorted set sub-reader that we are merging */ + private static class SortedSetDocValuesSub extends DocIDMerger.Sub { + + final SortedSetDocValues values; + final LongValues map; + + SortedSetDocValuesSub(MergeState.DocMap docMap, SortedSetDocValues values, LongValues map) { + super(docMap); + this.values = values; + this.map = map; + assert values.docID() == -1; + } + + @Override + public int nextDoc() throws IOException { + return values.nextDoc(); + } + + @Override + public String toString() { + return "SortedSetDocValuesSub(mappedDocID=" + mappedDocID + " values=" + values + ")"; + } + } + + /** + * Merges the sortedset docvalues from toMerge. + * + *

The default implementation calls {@link #addSortedSetField}, passing an Iterable that merges + * ordinals and values and filters deleted documents . + */ + public void mergeSortedSetField(MergeStats mergeStats, FieldInfo mergeFieldInfo, final MergeState mergeState) throws IOException { + + List toMerge = new ArrayList<>(); + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + SortedSetDocValues values = null; + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + if (docValuesProducer != null) { + FieldInfo fieldInfo = mergeState.fieldInfos[i].fieldInfo(mergeFieldInfo.name); + if (fieldInfo != null && fieldInfo.getDocValuesType() == DocValuesType.SORTED_SET) { + values = docValuesProducer.getSortedSet(fieldInfo); + } + } + if (values == null) { + values = DocValues.emptySortedSet(); + } + toMerge.add(values); + } + + // step 1: iterate thru each sub and mark terms still in use + TermsEnum[] liveTerms = new TermsEnum[toMerge.size()]; + long[] weights = new long[liveTerms.length]; + for (int sub = 0; sub < liveTerms.length; sub++) { + SortedSetDocValues dv = toMerge.get(sub); + Bits liveDocs = mergeState.liveDocs[sub]; + if (liveDocs == null) { + liveTerms[sub] = dv.termsEnum(); + weights[sub] = dv.getValueCount(); + } else { + LongBitSet bitset = new LongBitSet(dv.getValueCount()); + int docID; + while ((docID = dv.nextDoc()) != NO_MORE_DOCS) { + if (liveDocs.get(docID)) { + for (int i = 0; i < dv.docValueCount(); i++) { + bitset.set(dv.nextOrd()); + } + } + } + liveTerms[sub] = new BitsFilteredTermsEnum(dv.termsEnum(), bitset); + weights[sub] = bitset.cardinality(); + } + } + + // step 2: create ordinal map (this conceptually does the "merging") + final OrdinalMap map = OrdinalMap.build(null, liveTerms, weights, PackedInts.COMPACT); + + // step 3: add field + addSortedSetField(mergeFieldInfo, new TsdbDocValuesProducer(mergeStats) { + @Override + public SortedSetDocValues getSortedSet(FieldInfo fieldInfo) throws IOException { + if (fieldInfo != mergeFieldInfo) { + throw new IllegalArgumentException("wrong FieldInfo"); + } + + // We must make new iterators + DocIDMerger for each iterator: + List subs = new ArrayList<>(); + + long cost = 0; + boolean allSingletons = true; + + for (int i = 0; i < mergeState.docValuesProducers.length; i++) { + SortedSetDocValues values = null; + DocValuesProducer docValuesProducer = mergeState.docValuesProducers[i]; + if (docValuesProducer != null) { + FieldInfo readerFieldInfo = mergeState.fieldInfos[i].fieldInfo(mergeFieldInfo.name); + if (readerFieldInfo != null && readerFieldInfo.getDocValuesType() == DocValuesType.SORTED_SET) { + values = docValuesProducer.getSortedSet(readerFieldInfo); + } + } + if (values == null) { + values = DocValues.emptySortedSet(); + } + cost += values.cost(); + if (allSingletons && DocValues.unwrapSingleton(values) == null) { + allSingletons = false; + } + subs.add(new SortedSetDocValuesSub(mergeState.docMaps[i], values, map.getGlobalOrds(i))); + } + + if (allSingletons) { + // All subs are single-valued. + // We specialize for that case since it makes it easier for codecs to optimize + // for single-valued fields. + List singleValuedSubs = new ArrayList<>(); + for (SortedSetDocValuesSub sub : subs) { + final SortedDocValues singleValuedValues = DocValues.unwrapSingleton(sub.values); + assert singleValuedValues != null; + singleValuedSubs.add(new SortedDocValuesSub(sub.docMap, singleValuedValues, sub.map)); + } + return DocValues.singleton(mergeSortedValues(singleValuedSubs, mergeState.needsIndexSort, map)); + } + + final DocIDMerger docIDMerger = DocIDMerger.of(subs, mergeState.needsIndexSort); + + final long finalCost = cost; + + return new SortedSetDocValues() { + private int docID = -1; + private SortedSetDocValuesSub currentSub; + + @Override + public int docID() { + return docID; + } + + @Override + public int nextDoc() throws IOException { + currentSub = docIDMerger.next(); + if (currentSub == null) { + docID = NO_MORE_DOCS; + } else { + docID = currentSub.mappedDocID; + } + + return docID; + } + + @Override + public int advance(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean advanceExact(int target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public long nextOrd() throws IOException { + long subOrd = currentSub.values.nextOrd(); + return currentSub.map.get(subOrd); + } + + @Override + public int docValueCount() { + return currentSub.values.docValueCount(); + } + + @Override + public long cost() { + return finalCost; + } + + @Override + public BytesRef lookupOrd(long ord) throws IOException { + int segmentNumber = map.getFirstSegmentNumber(ord); + long segmentOrd = map.getFirstSegmentOrd(ord); + return toMerge.get(segmentNumber).lookupOrd(segmentOrd); + } + + @Override + public long getValueCount() { + return map.getValueCount(); + } + + @Override + public TermsEnum termsEnum() throws IOException { + TermsEnum[] subs = new TermsEnum[toMerge.size()]; + for (int sub = 0; sub < subs.length; ++sub) { + subs[sub] = toMerge.get(sub).termsEnum(); + } + return new MergedTermsEnum(map, subs); + } + }; + } + }); + } + + // TODO: seek-by-ord to nextSetBit + static class BitsFilteredTermsEnum extends FilteredTermsEnum { + final LongBitSet liveTerms; + + BitsFilteredTermsEnum(TermsEnum in, LongBitSet liveTerms) { + super(in, false); // <-- not passing false here wasted about 3 hours of my time!!!!!!!!!!!!! + assert liveTerms != null; + this.liveTerms = liveTerms; + } + + @Override + protected AcceptStatus accept(BytesRef term) throws IOException { + if (liveTerms.get(ord())) { + return AcceptStatus.YES; + } else { + return AcceptStatus.NO; + } + } + } +} diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/DocValuesCodecDuelTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/DocValuesCodecDuelTests.java index ea6d944a1271c..f0ce28f11a51a 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/tsdb/DocValuesCodecDuelTests.java +++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/DocValuesCodecDuelTests.java @@ -9,6 +9,8 @@ package org.elasticsearch.index.codec.tsdb; +import org.apache.lucene.codecs.Codec; +import org.apache.lucene.codecs.DocValuesFormat; import org.apache.lucene.codecs.lucene90.Lucene90DocValuesFormat; import org.apache.lucene.document.BinaryDocValuesField; import org.apache.lucene.document.Document; @@ -24,6 +26,7 @@ import org.apache.lucene.tests.index.RandomIndexWriter; import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.index.codec.Elasticsearch900Lucene101Codec; import org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormatTests.TestES87TSDBDocValuesFormat; import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat; import org.elasticsearch.test.ESTestCase; @@ -51,9 +54,19 @@ public void testDuel() throws IOException { baselineConfig.setMergePolicy(mergePolicy); baselineConfig.setCodec(TestUtil.alwaysDocValuesFormat(new Lucene90DocValuesFormat())); var contenderConf = newIndexWriterConfig(); - contenderConf.setCodec( - TestUtil.alwaysDocValuesFormat(rarely() ? new TestES87TSDBDocValuesFormat() : new ES819TSDBDocValuesFormat()) - ); + contenderConf.setMergePolicy(mergePolicy); + Codec codec = new Elasticsearch900Lucene101Codec() { + + final DocValuesFormat docValuesFormat = randomBoolean() + ? new ES819TSDBDocValuesFormat() + : new TestES87TSDBDocValuesFormat(); + + @Override + public DocValuesFormat getDocValuesFormatForField(String field) { + return docValuesFormat; + } + }; + contenderConf.setCodec(codec); contenderConf.setMergePolicy(mergePolicy); try ( diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormatTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormatTests.java index c6c721336dba0..a219ebb3740cc 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormatTests.java +++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/ES87TSDBDocValuesFormatTests.java @@ -12,6 +12,7 @@ import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.codecs.Codec; import org.apache.lucene.codecs.DocValuesConsumer; +import org.apache.lucene.codecs.DocValuesFormat; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; import org.apache.lucene.document.SortedDocValuesField; @@ -33,8 +34,9 @@ import org.apache.lucene.tests.analysis.MockAnalyzer; import org.apache.lucene.tests.index.BaseDocValuesFormatTestCase; import org.apache.lucene.tests.index.RandomIndexWriter; -import org.apache.lucene.tests.util.TestUtil; import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.logging.LogConfigurator; +import org.elasticsearch.index.codec.Elasticsearch900Lucene101Codec; import java.io.IOException; import java.util.ArrayList; @@ -50,6 +52,12 @@ public class ES87TSDBDocValuesFormatTests extends BaseDocValuesFormatTestCase { private static final int NUM_DOCS = 10; + static { + // For Elasticsearch900Lucene101Codec: + LogConfigurator.loadLog4jPlugins(); + LogConfigurator.configureESLogging(); + } + static class TestES87TSDBDocValuesFormat extends ES87TSDBDocValuesFormat { TestES87TSDBDocValuesFormat() { @@ -66,7 +74,13 @@ public DocValuesConsumer fieldsConsumer(SegmentWriteState state) throws IOExcept } } - private final Codec codec = TestUtil.alwaysDocValuesFormat(new TestES87TSDBDocValuesFormat()); + private final Codec codec = new Elasticsearch900Lucene101Codec() { + + @Override + public DocValuesFormat getDocValuesFormatForField(String field) { + return new TestES87TSDBDocValuesFormat(); + } + }; @Override protected Codec getCodec() { diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/TsdbDocValueBwcTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/TsdbDocValueBwcTests.java index 32b2a90322911..9c41e7a80ed66 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/tsdb/TsdbDocValueBwcTests.java +++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/TsdbDocValueBwcTests.java @@ -10,6 +10,7 @@ package org.elasticsearch.index.codec.tsdb; import org.apache.lucene.codecs.Codec; +import org.apache.lucene.codecs.DocValuesFormat; import org.apache.lucene.codecs.DocValuesProducer; import org.apache.lucene.document.Document; import org.apache.lucene.document.NumericDocValuesField; @@ -31,6 +32,9 @@ import org.apache.lucene.util.BytesRef; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.core.SuppressForbidden; +import org.elasticsearch.index.codec.Elasticsearch816Codec; +import org.elasticsearch.index.codec.Elasticsearch900Lucene101Codec; +import org.elasticsearch.index.codec.perfield.XPerFieldDocValuesFormat; import org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormatTests.TestES87TSDBDocValuesFormat; import org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesFormat; import org.elasticsearch.test.ESTestCase; @@ -45,8 +49,30 @@ public class TsdbDocValueBwcTests extends ESTestCase { public void testMixedIndex() throws Exception { - Codec oldCodec = TestUtil.alwaysDocValuesFormat(new TestES87TSDBDocValuesFormat()); - Codec newCodec = TestUtil.alwaysDocValuesFormat(new ES819TSDBDocValuesFormat()); + var oldCodec = TestUtil.alwaysDocValuesFormat(new TestES87TSDBDocValuesFormat()); + var newCodec = TestUtil.alwaysDocValuesFormat(new ES819TSDBDocValuesFormat()); + testMixedIndex(oldCodec, newCodec); + } + + public void testMixedIndex816To900Lucene101() throws Exception { + var oldCodec = new Elasticsearch816Codec() { + + final DocValuesFormat docValuesFormat = new TestES87TSDBDocValuesFormat(); + + @Override + public DocValuesFormat getDocValuesFormatForField(String field) { + return docValuesFormat; + } + }; + var newCodec = new Elasticsearch900Lucene101Codec() { + + final DocValuesFormat docValuesFormat = new ES819TSDBDocValuesFormat(); + + @Override + public DocValuesFormat getDocValuesFormatForField(String field) { + return docValuesFormat; + } + }; testMixedIndex(oldCodec, newCodec); } @@ -101,55 +127,63 @@ void testMixedIndex(Codec oldCodec, Codec newCodec) throws IOException, NoSuchFi } } // Check documents before force merge: - try (var iw = new IndexWriter(dir, getTimeSeriesIndexWriterConfig(hostnameField, timestampField, newCodec))) { - try (var reader = DirectoryReader.open(iw)) { - assertOldDocValuesFormatVersion(reader); - - var hostNameDV = MultiDocValues.getSortedValues(reader, hostnameField); - assertNotNull(hostNameDV); - var timestampDV = MultiDocValues.getSortedNumericValues(reader, timestampField); - assertNotNull(timestampDV); - var counterOneDV = MultiDocValues.getNumericValues(reader, "counter_1"); - if (counterOneDV == null) { - counterOneDV = DocValues.emptyNumeric(); - } - var gaugeOneDV = MultiDocValues.getSortedNumericValues(reader, "gauge_1"); - if (gaugeOneDV == null) { - gaugeOneDV = DocValues.emptySortedNumeric(); + try (var reader = DirectoryReader.open(dir)) { + assertOldDocValuesFormatVersion(reader); + // Assert per field format field info attributes: + // (XPerFieldDocValuesFormat must produce the same attributes as PerFieldDocValuesFormat for BWC. + // Otherwise, doc values fields may disappear) + for (var leaf : reader.leaves()) { + for (var fieldInfo : leaf.reader().getFieldInfos()) { + assertThat(fieldInfo.attributes(), Matchers.aMapWithSize(2)); + assertThat(fieldInfo.attributes(), Matchers.hasEntry("PerFieldDocValuesFormat.suffix", "0")); + assertThat(fieldInfo.attributes(), Matchers.hasEntry("PerFieldDocValuesFormat.format", "ES87TSDB")); } - var tagsDV = MultiDocValues.getSortedSetValues(reader, "tags"); - if (tagsDV == null) { - tagsDV = DocValues.emptySortedSet(); - } - for (int i = 0; i < numDocs; i++) { - assertEquals(i, hostNameDV.nextDoc()); - String actualHostName = hostNameDV.lookupOrd(hostNameDV.ordValue()).utf8ToString(); - assertTrue("unexpected host name:" + actualHostName, actualHostName.startsWith("host-")); + } - assertEquals(i, timestampDV.nextDoc()); - long timestamp = timestampDV.nextValue(); - long lowerBound = baseTimestamp; - long upperBound = baseTimestamp + numDocs; - assertTrue( - "unexpected timestamp [" + timestamp + "], expected between [" + lowerBound + "] and [" + upperBound + "]", - timestamp >= lowerBound && timestamp < upperBound - ); - if (counterOneDV.advanceExact(i)) { - long counterOneValue = counterOneDV.longValue(); - assertTrue("unexpected counter [" + counterOneValue + "]", counterOneValue >= 0 && counterOneValue < counter1); - } - if (gaugeOneDV.advanceExact(i)) { - for (int j = 0; j < gaugeOneDV.docValueCount(); j++) { - long value = gaugeOneDV.nextValue(); - assertTrue("unexpected gauge [" + value + "]", Arrays.binarySearch(gauge1Values, value) >= 0); - } + var hostNameDV = MultiDocValues.getSortedValues(reader, hostnameField); + assertNotNull(hostNameDV); + var timestampDV = MultiDocValues.getSortedNumericValues(reader, timestampField); + assertNotNull(timestampDV); + var counterOneDV = MultiDocValues.getNumericValues(reader, "counter_1"); + if (counterOneDV == null) { + counterOneDV = DocValues.emptyNumeric(); + } + var gaugeOneDV = MultiDocValues.getSortedNumericValues(reader, "gauge_1"); + if (gaugeOneDV == null) { + gaugeOneDV = DocValues.emptySortedNumeric(); + } + var tagsDV = MultiDocValues.getSortedSetValues(reader, "tags"); + if (tagsDV == null) { + tagsDV = DocValues.emptySortedSet(); + } + for (int i = 0; i < numDocs; i++) { + assertEquals(i, hostNameDV.nextDoc()); + String actualHostName = hostNameDV.lookupOrd(hostNameDV.ordValue()).utf8ToString(); + assertTrue("unexpected host name:" + actualHostName, actualHostName.startsWith("host-")); + + assertEquals(i, timestampDV.nextDoc()); + long timestamp = timestampDV.nextValue(); + long lowerBound = baseTimestamp; + long upperBound = baseTimestamp + numDocs; + assertTrue( + "unexpected timestamp [" + timestamp + "], expected between [" + lowerBound + "] and [" + upperBound + "]", + timestamp >= lowerBound && timestamp < upperBound + ); + if (counterOneDV.advanceExact(i)) { + long counterOneValue = counterOneDV.longValue(); + assertTrue("unexpected counter [" + counterOneValue + "]", counterOneValue >= 0 && counterOneValue < counter1); + } + if (gaugeOneDV.advanceExact(i)) { + for (int j = 0; j < gaugeOneDV.docValueCount(); j++) { + long value = gaugeOneDV.nextValue(); + assertTrue("unexpected gauge [" + value + "]", Arrays.binarySearch(gauge1Values, value) >= 0); } - if (tagsDV.advanceExact(i)) { - for (int j = 0; j < tagsDV.docValueCount(); j++) { - long ordinal = tagsDV.nextOrd(); - String actualTag = tagsDV.lookupOrd(ordinal).utf8ToString(); - assertTrue("unexpected tag [" + actualTag + "]", Arrays.binarySearch(tags, actualTag) >= 0); - } + } + if (tagsDV.advanceExact(i)) { + for (int j = 0; j < tagsDV.docValueCount(); j++) { + long ordinal = tagsDV.nextOrd(); + String actualTag = tagsDV.lookupOrd(ordinal).utf8ToString(); + assertTrue("unexpected tag [" + actualTag + "]", Arrays.binarySearch(tags, actualTag) >= 0); } } } @@ -165,6 +199,15 @@ void testMixedIndex(Codec oldCodec, Codec newCodec) throws IOException, NoSuchFi assertEquals(numDocs, reader.maxDoc()); assertNewDocValuesFormatVersion(reader); var leaf = reader.leaves().get(0).reader(); + // Assert per field format field info attributes: + // (XPerFieldDocValuesFormat must produce the same attributes as PerFieldDocValuesFormat for BWC. + // Otherwise, doc values fields may disappear) + for (var fieldInfo : leaf.getFieldInfos()) { + assertThat(fieldInfo.attributes(), Matchers.aMapWithSize(2)); + assertThat(fieldInfo.attributes(), Matchers.hasEntry("PerFieldDocValuesFormat.suffix", "0")); + assertThat(fieldInfo.attributes(), Matchers.hasEntry("PerFieldDocValuesFormat.format", "ES819TSDB")); + } + var hostNameDV = leaf.getSortedDocValues(hostnameField); assertNotNull(hostNameDV); var timestampDV = DocValues.unwrapSingleton(leaf.getSortedNumericDocValues(timestampField)); @@ -249,6 +292,7 @@ private void assertOldDocValuesFormatVersion(DirectoryReader reader) throws NoSu var dvReader = leaf.getDocValuesReader(); var field = getFormatsFieldFromPerFieldFieldsReader(dvReader.getClass()); Map formats = (Map) field.get(dvReader); + assertThat(formats, Matchers.aMapWithSize(1)); var tsdbDvReader = (DocValuesProducer) formats.get("ES87TSDB_0"); tsdbDvReader.checkIntegrity(); assertThat(tsdbDvReader, Matchers.instanceOf(ES87TSDBDocValuesProducer.class)); @@ -257,25 +301,39 @@ private void assertOldDocValuesFormatVersion(DirectoryReader reader) throws NoSu private void assertNewDocValuesFormatVersion(DirectoryReader reader) throws NoSuchFieldException, IllegalAccessException, IOException, ClassNotFoundException { - if (System.getSecurityManager() != null) { - // With jvm version 24 entitlements are used and security manager is nog longer used. - // Making this assertion work with security manager requires granting the entire test codebase privileges to use - // suppressAccessChecks and suppressAccessChecks. This is undesired from a security manager perspective. - logger.info("not asserting doc values format version, because security manager is used"); - return; - } for (var leafReaderContext : reader.leaves()) { var leaf = (SegmentReader) leafReaderContext.reader(); var dvReader = leaf.getDocValuesReader(); - var field = getFormatsFieldFromPerFieldFieldsReader(dvReader.getClass()); - Map formats = (Map) field.get(dvReader); - var tsdbDvReader = (DocValuesProducer) formats.get("ES819TSDB_0"); - tsdbDvReader.checkIntegrity(); - assertThat( - tsdbDvReader, - Matchers.instanceOf(Class.forName("org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesProducer")) - ); + dvReader.checkIntegrity(); + + if (dvReader instanceof XPerFieldDocValuesFormat.FieldsReader perFieldDvReader) { + var formats = perFieldDvReader.getFormats(); + assertThat(formats, Matchers.aMapWithSize(1)); + var tsdbDvReader = formats.get("ES819TSDB_0"); + tsdbDvReader.checkIntegrity(); + assertThat( + tsdbDvReader, + Matchers.instanceOf(Class.forName("org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesProducer")) + ); + } else { + if (System.getSecurityManager() != null) { + // With jvm version 24 entitlements are used and security manager is nog longer used. + // Making this assertion work with security manager requires granting the entire test codebase privileges to use + // suppressAccessChecks and suppressAccessChecks. This is undesired from a security manager perspective. + logger.info("not asserting doc values format version, because security manager is used"); + continue; + } + var field = getFormatsFieldFromPerFieldFieldsReader(dvReader.getClass()); + Map formats = (Map) field.get(dvReader); + assertThat(formats, Matchers.aMapWithSize(1)); + var tsdbDvReader = (DocValuesProducer) formats.get("ES819TSDB_0"); + tsdbDvReader.checkIntegrity(); + assertThat( + tsdbDvReader, + Matchers.instanceOf(Class.forName("org.elasticsearch.index.codec.tsdb.es819.ES819TSDBDocValuesProducer")) + ); + } } } diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java index 3eb187bed9fb8..2e787c9e56d6d 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java +++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatTests.java @@ -10,16 +10,393 @@ package org.elasticsearch.index.codec.tsdb.es819; import org.apache.lucene.codecs.Codec; -import org.apache.lucene.tests.util.TestUtil; +import org.apache.lucene.codecs.DocValuesFormat; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.document.SortedDocValuesField; +import org.apache.lucene.document.SortedNumericDocValuesField; +import org.apache.lucene.document.SortedSetDocValuesField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.DocValues; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LogByteSizeMergePolicy; +import org.apache.lucene.search.Sort; +import org.apache.lucene.search.SortField; +import org.apache.lucene.search.SortedNumericSortField; +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.index.codec.Elasticsearch900Lucene101Codec; import org.elasticsearch.index.codec.tsdb.ES87TSDBDocValuesFormatTests; +import java.util.Arrays; +import java.util.Locale; + public class ES819TSDBDocValuesFormatTests extends ES87TSDBDocValuesFormatTests { - private final Codec codec = TestUtil.alwaysDocValuesFormat(new ES819TSDBDocValuesFormat()); + private final Codec codec = new Elasticsearch900Lucene101Codec() { + + final ES819TSDBDocValuesFormat docValuesFormat = new ES819TSDBDocValuesFormat(); + + @Override + public DocValuesFormat getDocValuesFormatForField(String field) { + return docValuesFormat; + } + }; @Override protected Codec getCodec() { return codec; } + public void testForceMergeDenseCase() throws Exception { + String timestampField = "@timestamp"; + String hostnameField = "host.name"; + long baseTimestamp = 1704067200000L; + + var config = getTimeSeriesIndexWriterConfig(hostnameField, timestampField); + try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) { + long counter1 = 0; + long counter2 = 10_000_000; + long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 }; + long[] gauge2Values = new long[] { -2, -4, -6, -8, -10, -12, -14, -16 }; + String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" }; + + int numDocs = 256 + random().nextInt(1024); + int numHosts = numDocs / 20; + for (int i = 0; i < numDocs; i++) { + var d = new Document(); + + int batchIndex = i / numHosts; + String hostName = String.format(Locale.ROOT, "host-%03d", batchIndex); + long timestamp = baseTimestamp + (1000L * i); + + d.add(new SortedDocValuesField(hostnameField, new BytesRef(hostName))); + // Index sorting doesn't work with NumericDocValuesField: + d.add(new SortedNumericDocValuesField(timestampField, timestamp)); + d.add(new NumericDocValuesField("counter_1", counter1++)); + d.add(new SortedNumericDocValuesField("counter_2", counter2++)); + d.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[i % gauge1Values.length])); + d.add(new SortedNumericDocValuesField("gauge_2", gauge2Values[i % gauge1Values.length])); + int numTags = 1 + random().nextInt(8); + for (int j = 0; j < numTags; j++) { + d.add(new SortedSetDocValuesField("tags", new BytesRef(tags[j]))); + } + + iw.addDocument(d); + if (i % 100 == 0) { + iw.commit(); + } + } + iw.commit(); + + iw.forceMerge(1); + + // For asserting using binary search later on: + Arrays.sort(gauge2Values); + + try (var reader = DirectoryReader.open(iw)) { + assertEquals(1, reader.leaves().size()); + assertEquals(numDocs, reader.maxDoc()); + var leaf = reader.leaves().get(0).reader(); + var hostNameDV = leaf.getSortedDocValues(hostnameField); + assertNotNull(hostNameDV); + var timestampDV = DocValues.unwrapSingleton(leaf.getSortedNumericDocValues(timestampField)); + assertNotNull(timestampDV); + var counterOneDV = leaf.getNumericDocValues("counter_1"); + assertNotNull(counterOneDV); + var counterTwoDV = leaf.getSortedNumericDocValues("counter_2"); + assertNotNull(counterTwoDV); + var gaugeOneDV = leaf.getSortedNumericDocValues("gauge_1"); + assertNotNull(gaugeOneDV); + var gaugeTwoDV = leaf.getSortedNumericDocValues("gauge_2"); + assertNotNull(gaugeTwoDV); + var tagsDV = leaf.getSortedSetDocValues("tags"); + assertNotNull(tagsDV); + for (int i = 0; i < numDocs; i++) { + assertEquals(i, hostNameDV.nextDoc()); + int batchIndex = i / numHosts; + assertEquals(batchIndex, hostNameDV.ordValue()); + String expectedHostName = String.format(Locale.ROOT, "host-%03d", batchIndex); + assertEquals(expectedHostName, hostNameDV.lookupOrd(hostNameDV.ordValue()).utf8ToString()); + + assertEquals(i, timestampDV.nextDoc()); + long timestamp = timestampDV.longValue(); + long lowerBound = baseTimestamp; + long upperBound = baseTimestamp + (1000L * numDocs); + assertTrue( + "unexpected timestamp [" + timestamp + "], expected between [" + lowerBound + "] and [" + upperBound + "]", + timestamp >= lowerBound && timestamp < upperBound + ); + + assertEquals(i, counterOneDV.nextDoc()); + long counterOneValue = counterOneDV.longValue(); + assertTrue("unexpected counter [" + counterOneValue + "]", counterOneValue >= 0 && counterOneValue < counter1); + + assertEquals(i, counterTwoDV.nextDoc()); + assertEquals(1, counterTwoDV.docValueCount()); + long counterTwoValue = counterTwoDV.nextValue(); + assertTrue("unexpected counter [" + counterTwoValue + "]", counterTwoValue > 0 && counterTwoValue <= counter2); + + assertEquals(i, gaugeOneDV.nextDoc()); + assertEquals(1, gaugeOneDV.docValueCount()); + long gaugeOneValue = gaugeOneDV.nextValue(); + assertTrue("unexpected gauge [" + gaugeOneValue + "]", Arrays.binarySearch(gauge1Values, gaugeOneValue) >= 0); + + assertEquals(i, gaugeTwoDV.nextDoc()); + assertEquals(1, gaugeTwoDV.docValueCount()); + long gaugeTwoValue = gaugeTwoDV.nextValue(); + assertTrue("unexpected gauge [" + gaugeTwoValue + "]", Arrays.binarySearch(gauge2Values, gaugeTwoValue) >= 0); + + assertEquals(i, tagsDV.nextDoc()); + for (int j = 0; j < tagsDV.docValueCount(); j++) { + long ordinal = tagsDV.nextOrd(); + String actualTag = tagsDV.lookupOrd(ordinal).utf8ToString(); + assertTrue("unexpected tag [" + actualTag + "]", Arrays.binarySearch(tags, actualTag) >= 0); + } + } + } + } + } + + public void testForceMergeSparseCase() throws Exception { + String timestampField = "@timestamp"; + String hostnameField = "host.name"; + long baseTimestamp = 1704067200000L; + + var config = getTimeSeriesIndexWriterConfig(hostnameField, timestampField); + try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) { + long counter1 = 0; + long counter2 = 10_000_000; + long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 }; + long[] gauge2Values = new long[] { -2, -4, -6, -8, -10, -12, -14, -16 }; + String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" }; + + int numDocs = 256 + random().nextInt(1024); + int numHosts = numDocs / 20; + for (int i = 0; i < numDocs; i++) { + var d = new Document(); + + int batchIndex = i / numHosts; + String hostName = String.format(Locale.ROOT, "host-%03d", batchIndex); + long timestamp = baseTimestamp + (1000L * i); + + d.add(new SortedDocValuesField(hostnameField, new BytesRef(hostName))); + // Index sorting doesn't work with NumericDocValuesField: + d.add(new SortedNumericDocValuesField(timestampField, timestamp)); + + if (random().nextBoolean()) { + d.add(new NumericDocValuesField("counter_1", counter1++)); + } + if (random().nextBoolean()) { + d.add(new SortedNumericDocValuesField("counter_2", counter2++)); + } + if (random().nextBoolean()) { + d.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[i % gauge1Values.length])); + } + if (random().nextBoolean()) { + d.add(new SortedNumericDocValuesField("gauge_2", gauge2Values[i % gauge1Values.length])); + } + if (random().nextBoolean()) { + int numTags = 1 + random().nextInt(8); + for (int j = 0; j < numTags; j++) { + d.add(new SortedSetDocValuesField("tags", new BytesRef(tags[j]))); + } + } + if (random().nextBoolean()) { + int randomIndex = random().nextInt(tags.length); + d.add(new SortedDocValuesField("other_tag", new BytesRef(tags[randomIndex]))); + } + + iw.addDocument(d); + if (i % 100 == 0) { + iw.commit(); + } + } + iw.commit(); + + iw.forceMerge(1); + + // For asserting using binary search later on: + Arrays.sort(gauge2Values); + + try (var reader = DirectoryReader.open(iw)) { + assertEquals(1, reader.leaves().size()); + assertEquals(numDocs, reader.maxDoc()); + var leaf = reader.leaves().get(0).reader(); + var hostNameDV = leaf.getSortedDocValues(hostnameField); + assertNotNull(hostNameDV); + var timestampDV = DocValues.unwrapSingleton(leaf.getSortedNumericDocValues(timestampField)); + assertNotNull(timestampDV); + var counterOneDV = leaf.getNumericDocValues("counter_1"); + assertNotNull(counterOneDV); + var counterTwoDV = leaf.getSortedNumericDocValues("counter_2"); + assertNotNull(counterTwoDV); + var gaugeOneDV = leaf.getSortedNumericDocValues("gauge_1"); + assertNotNull(gaugeOneDV); + var gaugeTwoDV = leaf.getSortedNumericDocValues("gauge_2"); + assertNotNull(gaugeTwoDV); + var tagsDV = leaf.getSortedSetDocValues("tags"); + assertNotNull(tagsDV); + var otherTagDV = leaf.getSortedDocValues("other_tag"); + assertNotNull(otherTagDV); + for (int i = 0; i < numDocs; i++) { + assertEquals(i, hostNameDV.nextDoc()); + int batchIndex = i / numHosts; + assertEquals(batchIndex, hostNameDV.ordValue()); + String expectedHostName = String.format(Locale.ROOT, "host-%03d", batchIndex); + assertEquals(expectedHostName, hostNameDV.lookupOrd(hostNameDV.ordValue()).utf8ToString()); + + assertEquals(i, timestampDV.nextDoc()); + long timestamp = timestampDV.longValue(); + long lowerBound = baseTimestamp; + long upperBound = baseTimestamp + (1000L * numDocs); + assertTrue( + "unexpected timestamp [" + timestamp + "], expected between [" + lowerBound + "] and [" + upperBound + "]", + timestamp >= lowerBound && timestamp < upperBound + ); + + if (counterOneDV.advanceExact(i)) { + long counterOneValue = counterOneDV.longValue(); + assertTrue("unexpected counter [" + counterOneValue + "]", counterOneValue >= 0 && counterOneValue < counter1); + } + + if (counterTwoDV.advanceExact(i)) { + assertEquals(1, counterTwoDV.docValueCount()); + long counterTwoValue = counterTwoDV.nextValue(); + assertTrue("unexpected counter [" + counterTwoValue + "]", counterTwoValue > 0 && counterTwoValue <= counter2); + } + + if (gaugeOneDV.advanceExact(i)) { + assertEquals(1, gaugeOneDV.docValueCount()); + long gaugeOneValue = gaugeOneDV.nextValue(); + assertTrue("unexpected gauge [" + gaugeOneValue + "]", Arrays.binarySearch(gauge1Values, gaugeOneValue) >= 0); + } + + if (gaugeTwoDV.advanceExact(i)) { + assertEquals(1, gaugeTwoDV.docValueCount()); + long gaugeTwoValue = gaugeTwoDV.nextValue(); + assertTrue("unexpected gauge [" + gaugeTwoValue + "]", Arrays.binarySearch(gauge2Values, gaugeTwoValue) >= 0); + } + + if (tagsDV.advanceExact(i)) { + for (int j = 0; j < tagsDV.docValueCount(); j++) { + long ordinal = tagsDV.nextOrd(); + String actualTag = tagsDV.lookupOrd(ordinal).utf8ToString(); + assertTrue("unexpected tag [" + actualTag + "]", Arrays.binarySearch(tags, actualTag) >= 0); + } + } + if (otherTagDV.advanceExact(i)) { + int ordinal = otherTagDV.ordValue(); + String actualTag = otherTagDV.lookupOrd(ordinal).utf8ToString(); + assertTrue("unexpected tag [" + actualTag + "]", Arrays.binarySearch(tags, actualTag) >= 0); + } + } + } + } + } + + public void testWithNoValueMultiValue() throws Exception { + String timestampField = "@timestamp"; + String hostnameField = "host.name"; + long baseTimestamp = 1704067200000L; + int numRounds = 32 + random().nextInt(32); + int numDocsPerRound = 64 + random().nextInt(64); + + var config = getTimeSeriesIndexWriterConfig(hostnameField, timestampField); + try (var dir = newDirectory(); var iw = new IndexWriter(dir, config)) { + long[] gauge1Values = new long[] { 2, 4, 6, 8, 10, 12, 14, 16 }; + String[] tags = new String[] { "tag_1", "tag_2", "tag_3", "tag_4", "tag_5", "tag_6", "tag_7", "tag_8" }; + { + long timestamp = baseTimestamp; + for (int i = 0; i < numRounds; i++) { + int r = random().nextInt(10); + for (int j = 0; j < numDocsPerRound; j++) { + var d = new Document(); + // host in reverse, otherwise merging will detect that segments are already ordered and will use sequential docid + // merger: + String hostName = String.format(Locale.ROOT, "host-%03d", numRounds - i); + d.add(new SortedDocValuesField(hostnameField, new BytesRef(hostName))); + // Index sorting doesn't work with NumericDocValuesField: + d.add(new SortedNumericDocValuesField(timestampField, timestamp++)); + + if (r % 10 == 5) { + // sometimes no values + } else if (r % 10 > 5) { + // often single value: + d.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[j % gauge1Values.length])); + d.add(new SortedSetDocValuesField("tags", new BytesRef(tags[j % tags.length]))); + } else { + // otherwise multiple values: + int numValues = 2 + random().nextInt(4); + for (int k = 0; k < numValues; k++) { + d.add(new SortedNumericDocValuesField("gauge_1", gauge1Values[(j + k) % gauge1Values.length])); + d.add(new SortedSetDocValuesField("tags", new BytesRef(tags[(j + k) % tags.length]))); + } + } + iw.addDocument(d); + } + iw.commit(); + } + iw.forceMerge(1); + } + + int numDocs = numRounds * numDocsPerRound; + try (var reader = DirectoryReader.open(iw)) { + assertEquals(1, reader.leaves().size()); + assertEquals(numDocs, reader.maxDoc()); + var leaf = reader.leaves().get(0).reader(); + var hostNameDV = leaf.getSortedDocValues(hostnameField); + assertNotNull(hostNameDV); + var timestampDV = DocValues.unwrapSingleton(leaf.getSortedNumericDocValues(timestampField)); + assertNotNull(timestampDV); + var gaugeOneDV = leaf.getSortedNumericDocValues("gauge_1"); + assertNotNull(gaugeOneDV); + var tagsDV = leaf.getSortedSetDocValues("tags"); + assertNotNull(tagsDV); + for (int i = 0; i < numDocs; i++) { + assertEquals(i, hostNameDV.nextDoc()); + String actualHostName = hostNameDV.lookupOrd(hostNameDV.ordValue()).utf8ToString(); + assertTrue("unexpected host name:" + actualHostName, actualHostName.startsWith("host-")); + + assertEquals(i, timestampDV.nextDoc()); + long timestamp = timestampDV.longValue(); + long lowerBound = baseTimestamp; + long upperBound = baseTimestamp + numDocs; + assertTrue( + "unexpected timestamp [" + timestamp + "], expected between [" + lowerBound + "] and [" + upperBound + "]", + timestamp >= lowerBound && timestamp < upperBound + ); + if (gaugeOneDV.advanceExact(i)) { + for (int j = 0; j < gaugeOneDV.docValueCount(); j++) { + long value = gaugeOneDV.nextValue(); + assertTrue("unexpected gauge [" + value + "]", Arrays.binarySearch(gauge1Values, value) >= 0); + } + } + if (tagsDV.advanceExact(i)) { + for (int j = 0; j < tagsDV.docValueCount(); j++) { + long ordinal = tagsDV.nextOrd(); + String actualTag = tagsDV.lookupOrd(ordinal).utf8ToString(); + assertTrue("unexpected tag [" + actualTag + "]", Arrays.binarySearch(tags, actualTag) >= 0); + } + } + } + } + } + } + + private IndexWriterConfig getTimeSeriesIndexWriterConfig(String hostnameField, String timestampField) { + var config = new IndexWriterConfig(); + config.setIndexSort( + new Sort( + new SortField(hostnameField, SortField.Type.STRING, false), + new SortedNumericSortField(timestampField, SortField.Type.LONG, true) + ) + ); + config.setLeafSorter(DataStream.TIMESERIES_LEAF_READERS_SORTER); + config.setMergePolicy(new LogByteSizeMergePolicy()); + config.setCodec(getCodec()); + return config; + } + } diff --git a/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatVariableSkipIntervalTests.java b/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatVariableSkipIntervalTests.java index e3d4b33de6203..d158236ecc7ac 100644 --- a/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatVariableSkipIntervalTests.java +++ b/server/src/test/java/org/elasticsearch/index/codec/tsdb/es819/ES819TSDBDocValuesFormatVariableSkipIntervalTests.java @@ -18,13 +18,13 @@ public class ES819TSDBDocValuesFormatVariableSkipIntervalTests extends ES87TSDBD @Override protected Codec getCodec() { // small interval size to test with many intervals - return TestUtil.alwaysDocValuesFormat(new ES819TSDBDocValuesFormat(random().nextInt(4, 16))); + return TestUtil.alwaysDocValuesFormat(new ES819TSDBDocValuesFormat(random().nextInt(4, 16), random().nextBoolean())); } public void testSkipIndexIntervalSize() { IllegalArgumentException ex = expectThrows( IllegalArgumentException.class, - () -> new ES819TSDBDocValuesFormat(random().nextInt(Integer.MIN_VALUE, 2)) + () -> new ES819TSDBDocValuesFormat(random().nextInt(Integer.MIN_VALUE, 2), random().nextBoolean()) ); assertTrue(ex.getMessage().contains("skipIndexIntervalSize must be > 1")); }