diff --git a/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/RareTermsIT.java b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/RareTermsIT.java new file mode 100644 index 0000000000000..58089154e58de --- /dev/null +++ b/qa/mixed-cluster/src/test/java/org/elasticsearch/backwards/RareTermsIT.java @@ -0,0 +1,86 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.backwards; + +import org.elasticsearch.Version; +import org.elasticsearch.client.Request; +import org.elasticsearch.client.Response; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.support.XContentMapValues; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.hamcrest.Matchers; + +import java.io.IOException; +import java.util.List; + +/** + * Test that index enough data to trigger the creation of Cuckoo filters. + */ +public class RareTermsIT extends ESRestTestCase { + + private static final String index = "idx"; + + private void setupMaxBuckets() throws Exception { + // increases the max bucket limit for this test + final Request request = new Request("PUT", "_cluster/settings"); + request.setJsonEntity("{ \"transient\" : { \"search.max_buckets\" : 65356 } }"); + assertOK(client().performRequest(request)); + } + + private int indexDocs(int numDocs, int id) throws Exception { + final Request request = new Request("POST", "/_bulk"); + final StringBuilder builder = new StringBuilder(); + for (int i = 0; i < numDocs; ++i) { + builder.append("{ \"index\" : { \"_index\" : \"" + index + "\", \"_id\": \"" + id++ + "\" } }\n"); + builder.append("{\"str_value\" : \"s" + i + "\"}\n"); + } + request.setJsonEntity(builder.toString()); + assertOK(client().performRequest(request)); + return id; + } + + public void testSingleValuedString() throws Exception { + IndexingIT.Nodes nodes = IndexingIT.buildNodeAndVersions(client()); + Version version = nodes.getBWCVersion(); + // rare_terms was introduced in version 7.3.0 + assumeTrue("Version too old", version.onOrAfter(Version.V_7_3_0)); + // increase max buckets + setupMaxBuckets(); + final Settings.Builder settings = Settings.builder() + .put(IndexMetadata.INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 2) + .put(IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0); + createIndex(index, settings.build()); + // We want to trigger the usage of cuckoo filters that happen only when there are + // more than 10k distinct values in one shard. + final int numDocs = randomIntBetween(12000, 17000); + int id = 1; + // Index every value 5 times + for (int i = 0; i < 5; i++) { + id = indexDocs(numDocs, id); + refreshAllIndices(); + } + // There are no rare terms that only appear in one document + assertNumRareTerms(1, 0); + // All terms have a cardinality lower than 10 + assertNumRareTerms(10, numDocs); + } + + private void assertNumRareTerms(int maxDocs, int rareTerms) throws IOException { + final Request request = new Request("POST", index + "/_search"); + request.setJsonEntity( + "{\"aggs\" : {\"rareTerms\" : {\"rare_terms\" : {\"field\" : \"str_value.keyword\", \"max_doc_count\" : " + maxDocs + "}}}}" + ); + final Response response = client().performRequest(request); + assertOK(response); + final Object o = XContentMapValues.extractValue("aggregations.rareTerms.buckets", responseAsMap(response)); + assertThat(o, Matchers.instanceOf(List.class)); + assertThat(((List) o).size(), Matchers.equalTo(rareTerms)); + } +} diff --git a/server/src/main/java/org/elasticsearch/common/util/CuckooFilter.java b/server/src/main/java/org/elasticsearch/common/util/CuckooFilter.java index e4a63d5d94fe6..3bb032bf2ab05 100644 --- a/server/src/main/java/org/elasticsearch/common/util/CuckooFilter.java +++ b/server/src/main/java/org/elasticsearch/common/util/CuckooFilter.java @@ -9,7 +9,9 @@ import org.apache.lucene.store.DataInput; import org.apache.lucene.store.DataOutput; +import org.apache.lucene.util.RamUsageEstimator; import org.apache.lucene.util.packed.PackedInts; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -56,7 +58,7 @@ public class CuckooFilter implements Writeable { private static final int MAX_EVICTIONS = 500; static final int EMPTY = 0; - private final PackedInts.Mutable data; + private final PackedArray data; private final int numBuckets; private final int bitsPerEntry; private final int fingerprintMask; @@ -82,7 +84,7 @@ public class CuckooFilter implements Writeable { throw new IllegalArgumentException("Attempted to create [" + numBuckets * entriesPerBucket + "] entries which is > Integer.MAX_VALUE"); } - this.data = PackedInts.getMutable(numBuckets * entriesPerBucket, bitsPerEntry, PackedInts.COMPACT); + this.data = new PackedArray(numBuckets * entriesPerBucket, bitsPerEntry); // puts the bits at the right side of the mask, e.g. `0000000000001111` for bitsPerEntry = 4 this.fingerprintMask = (0x80000000 >> (bitsPerEntry - 1)) >>> (Integer.SIZE - bitsPerEntry); @@ -106,7 +108,7 @@ public class CuckooFilter implements Writeable { + "] entries which is > Integer.MAX_VALUE"); } // TODO this is probably super slow, but just used for testing atm - this.data = PackedInts.getMutable(numBuckets * entriesPerBucket, bitsPerEntry, PackedInts.COMPACT); + this.data = new PackedArray(numBuckets * entriesPerBucket, bitsPerEntry); for (int i = 0; i < other.data.size(); i++) { data.set(i, other.data.get(i)); } @@ -122,17 +124,26 @@ public class CuckooFilter implements Writeable { this.fingerprintMask = (0x80000000 >> (bitsPerEntry - 1)) >>> (Integer.SIZE - bitsPerEntry); - data = (PackedInts.Mutable) PackedInts.getReader(new DataInput() { - @Override - public byte readByte() throws IOException { - return in.readByte(); - } - - @Override - public void readBytes(byte[] b, int offset, int len) throws IOException { - in.readBytes(b, offset, len); + if (in.getVersion().before(Version.V_7_15_0)) { + final PackedInts.Reader reader = PackedInts.getReader(new DataInput() { + @Override + public byte readByte() throws IOException { + return in.readByte(); + } + + @Override + public void readBytes(byte[] b, int offset, int len) throws IOException { + in.readBytes(b, offset, len); + } + }); + // This is probably slow but it should only happen if we have a mixed clusters (e.g during upgrade). + data = new PackedArray(numBuckets * entriesPerBucket, bitsPerEntry); + for (int i = 0; i < reader.size(); i++) { + data.set(i, reader.get(i)); } - }); + } else { + data = new PackedArray(in); + } } @Override @@ -142,18 +153,26 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVInt(entriesPerBucket); out.writeVInt(count); out.writeVInt(evictedFingerprint); - - data.save(new DataOutput() { - @Override - public void writeByte(byte b) throws IOException { - out.writeByte(b); - } - - @Override - public void writeBytes(byte[] b, int offset, int length) throws IOException { - out.writeBytes(b, offset, length); + if (out.getVersion().before(Version.V_7_15_0)) { + // This is probably slow but it should only happen if we have a mixed clusters (e.g during upgrade). + PackedInts.Mutable mutable = PackedInts.getMutable(numBuckets * entriesPerBucket, bitsPerEntry, PackedInts.COMPACT); + for (int i = 0; i < data.size(); i++) { + mutable.set(i, data.get(i)); } - }); + mutable.save(new DataOutput() { + @Override + public void writeByte(byte b) throws IOException { + out.writeByte(b); + } + + @Override + public void writeBytes(byte[] b, int offset, int length) throws IOException { + out.writeBytes(b, offset, length); + } + }); + } else { + data.save(out); + } } /** @@ -507,4 +526,201 @@ public boolean equals(Object other) { && Objects.equals(this.count, that.count) && Objects.equals(this.evictedFingerprint, that.evictedFingerprint); } + + /** + * Forked from Lucene's Packed64 class. The main difference is that this version + * can be read from / write to Elasticsearch streams. + */ + private static class PackedArray { + private static final int BLOCK_SIZE = 64; // 32 = int, 64 = long + private static final int BLOCK_BITS = 6; // The #bits representing BLOCK_SIZE + private static final int MOD_MASK = BLOCK_SIZE - 1; // x % BLOCK_SIZE + + /** + * Values are stores contiguously in the blocks array. + */ + private final long[] blocks; + /** + * A right-aligned mask of width BitsPerValue used by {@link #get(int)}. + */ + private final long maskRight; + /** + * Optimization: Saves one lookup in {@link #get(int)}. + */ + private final int bpvMinusBlockSize; + + private final int bitsPerValue; + private final int valueCount; + + PackedArray(int valueCount, int bitsPerValue) { + this.bitsPerValue = bitsPerValue; + this.valueCount = valueCount; + final int longCount = PackedInts.Format.PACKED.longCount(PackedInts.VERSION_CURRENT, valueCount, bitsPerValue); + this.blocks = new long[longCount]; + maskRight = ~0L << (BLOCK_SIZE-bitsPerValue) >>> (BLOCK_SIZE-bitsPerValue); + bpvMinusBlockSize = bitsPerValue - BLOCK_SIZE; + } + + PackedArray(StreamInput in) + throws IOException { + this.bitsPerValue = in.readVInt(); + this.valueCount = in.readVInt(); + this.blocks = in.readLongArray(); + maskRight = ~0L << (BLOCK_SIZE - bitsPerValue) >>> (BLOCK_SIZE - bitsPerValue); + bpvMinusBlockSize = bitsPerValue - BLOCK_SIZE; + } + + public void save(StreamOutput out) throws IOException { + out.writeVInt(bitsPerValue); + out.writeVInt(valueCount); + out.writeLongArray(blocks); + } + + public int size() { + return valueCount; + } + + public long get(final int index) { + // The abstract index in a bit stream + final long majorBitPos = (long)index * bitsPerValue; + // The index in the backing long-array + final int elementPos = (int)(majorBitPos >>> BLOCK_BITS); + // The number of value-bits in the second long + final long endBits = (majorBitPos & MOD_MASK) + bpvMinusBlockSize; + + if (endBits <= 0) { // Single block + return (blocks[elementPos] >>> -endBits) & maskRight; + } + // Two blocks + return ((blocks[elementPos] << endBits) + | (blocks[elementPos+1] >>> (BLOCK_SIZE - endBits))) + & maskRight; + } + + public int get(int index, long[] arr, int off, int len) { + assert len > 0 : "len must be > 0 (got " + len + ")"; + assert index >= 0 && index < valueCount; + len = Math.min(len, valueCount - index); + assert off + len <= arr.length; + + final int originalIndex = index; + final PackedInts.Decoder decoder = PackedInts.getDecoder(PackedInts.Format.PACKED, PackedInts.VERSION_CURRENT, bitsPerValue); + // go to the next block where the value does not span across two blocks + final int offsetInBlocks = index % decoder.longValueCount(); + if (offsetInBlocks != 0) { + for (int i = offsetInBlocks; i < decoder.longValueCount() && len > 0; ++i) { + arr[off++] = get(index++); + --len; + } + if (len == 0) { + return index - originalIndex; + } + } + + // bulk get + assert index % decoder.longValueCount() == 0; + int blockIndex = (int) (((long) index * bitsPerValue) >>> BLOCK_BITS); + assert (((long)index * bitsPerValue) & MOD_MASK) == 0; + final int iterations = len / decoder.longValueCount(); + decoder.decode(blocks, blockIndex, arr, off, iterations); + final int gotValues = iterations * decoder.longValueCount(); + index += gotValues; + len -= gotValues; + assert len >= 0; + + if (index > originalIndex) { + // stay at the block boundary + return index - originalIndex; + } else { + // no progress so far => already at a block boundary but no full block to get + assert index == originalIndex; + assert len > 0 : "len must be > 0 (got " + len + ")"; + assert index >= 0 && index < size(); + assert off + len <= arr.length; + + final int gets = Math.min(size() - index, len); + for (int i = index, o = off, end = index + gets; i < end; ++i, ++o) { + arr[o] = get(i); + } + return gets; + } + } + + public void set(final int index, final long value) { + // The abstract index in a contiguous bit stream + final long majorBitPos = (long)index * bitsPerValue; + // The index in the backing long-array + final int elementPos = (int)(majorBitPos >>> BLOCK_BITS); // / BLOCK_SIZE + // The number of value-bits in the second long + final long endBits = (majorBitPos & MOD_MASK) + bpvMinusBlockSize; + + if (endBits <= 0) { // Single block + blocks[elementPos] = blocks[elementPos] & ~(maskRight << -endBits) + | (value << -endBits); + return; + } + // Two blocks + blocks[elementPos] = blocks[elementPos] & ~(maskRight >>> endBits) + | (value >>> endBits); + blocks[elementPos+1] = blocks[elementPos+1] & (~0L >>> endBits) + | (value << (BLOCK_SIZE - endBits)); + } + + public int set(int index, long[] arr, int off, int len) { + assert len > 0 : "len must be > 0 (got " + len + ")"; + assert index >= 0 && index < valueCount; + len = Math.min(len, valueCount - index); + assert off + len <= arr.length; + + final int originalIndex = index; + final PackedInts.Encoder encoder = PackedInts.getEncoder(PackedInts.Format.PACKED, PackedInts.VERSION_CURRENT, bitsPerValue); + + // go to the next block where the value does not span across two blocks + final int offsetInBlocks = index % encoder.longValueCount(); + if (offsetInBlocks != 0) { + for (int i = offsetInBlocks; i < encoder.longValueCount() && len > 0; ++i) { + set(index++, arr[off++]); + --len; + } + if (len == 0) { + return index - originalIndex; + } + } + + // bulk set + assert index % encoder.longValueCount() == 0; + int blockIndex = (int) (((long) index * bitsPerValue) >>> BLOCK_BITS); + assert (((long)index * bitsPerValue) & MOD_MASK) == 0; + final int iterations = len / encoder.longValueCount(); + encoder.encode(arr, off, blocks, blockIndex, iterations); + final int setValues = iterations * encoder.longValueCount(); + index += setValues; + len -= setValues; + assert len >= 0; + + if (index > originalIndex) { + // stay at the block boundary + return index - originalIndex; + } else { + // no progress so far => already at a block boundary but no full block to get + assert index == originalIndex; + len = Math.min(len, size() - index); + assert off + len <= arr.length; + + for (int i = index, o = off, end = index + len; i < end; ++i, ++o) { + set(i, arr[o]); + } + return len; + } + } + + public long ramBytesUsed() { + return RamUsageEstimator.alignObjectSize( + RamUsageEstimator.NUM_BYTES_OBJECT_HEADER + + 3 * Integer.BYTES // bpvMinusBlockSize,valueCount,bitsPerValue + + Long.BYTES // maskRight + + RamUsageEstimator.NUM_BYTES_OBJECT_REF) // blocks ref + + RamUsageEstimator.sizeOf(blocks); + } + } }