-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Alternative schemes for Elasticsearch generated document IDs? #33049
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Pinging @elastic/es-search-aggs |
It would be useful to know the indexing speed for each of these options too since we would want to be clear with the user whether choosing a particular option also influences the indexing speed as well as heap usage and index size |
@colings86 I can set up and run this challenge against an Elastic Cloud cluster for comparison. It indexes the same amount of data using the following 3 main ID schemes:
Details about the ID generation can be found here. |
@colings86 I ran the challenge overnight and got the following average indexing throughout when indexing 100 million documents (~20GB shard size) against a single 32GB Elastic Cloud node:
I used these document ID types as an example and am not suggesting we necessarily use one of them. The ideal solution would be if we could create a variation of the currently used scheme that has similar heap usage profile as these alternative ID types while still retaining the characteristics that make autogenerated IDs so fast to index. |
One of these characteristics is that we know the provenance of an autogenerated ID, so we know whether we can skip the checks for the existence of another document with the same ID. With externally-provided IDs we always have to check. Importantly, this is independent of the format of the ID. |
@DaveCTurner Excellent. I remember that but was not sure if it was still in place or not. I assume that means that if we created an alternate autogenerated ID format that has a different heap usage profile, it would by default then also be faster than my benchmarks as we also for this format would be able to skip the checks? |
In theory yes, although nothing is certain in these situations without measurement. However, it puzzles me that autogenerated IDs take up approximately twice the terms memory of the other formats. A nice round multiple like that makes me wonder if we are perhaps double-counting something, or mistakenly generating twice as many terms or something like that? |
The 2x factor is probably a coincidence. However it's expected that patterns that compress well also require more memory. Memory usage is due to the FST that records shared prefixes of terms per block of 25-40 terms, so the number of prefixes that are recorded in the FST doesn't depend too much on the pattern, but the length of the recorded prefixes does. Even in the worst case, these numbers are pretty good in my opinion: 40MB of terms dictionary still gives a memory/disk ratio of 1.6%. Though I agree it is a bit disappointing to spend most memory on the |
@DaveCTurner helped me set up a build where I could test a few different permutations of the current autogenerated ID format. Of the 6 variations I tested ( The This type of ID was generated by simply rearranging the order of the various components that make up the current ID:
Another version that also performed quite well was
|
I have a general question, are you comparing the memory consumption of optimized indices? If not I think the numbers are misleading. It would be good to clarify this. |
No, these numbers are not for optimized indices. I have just indexed ~25GB of data and refreshed before recording statistics. I agree this is one of the tests we should run, but also feel forcemerging is not always a realistic option when you are running with quite large shards as per our best practices for logging and metrics use cases, so gains without forcemerging are also of interest even if they can vary more. |
this is a misconception IMO. We try to compare the memory consumption of a certain ID format. If you end up with a lot more segments in a certain case due to some indexing patterns and your other ID has only half the segments you might think it takes 1/2 the memory but in fact it's the same or even more. If you want to compare the numbers you have to compare apples against apples and therefor force merge to a single segment. |
Agreed. I will rerun it and record stats before and after a forcemerge. |
I ran it again for some of the ID types and recorded stats before and after forcemerge. I also recorded the number of segments before the forcemerge for comparison: The results seem to largely correlate with the previous run, so I think it is worth investigating a bit further. If it is consistent, we could save a good amount of heap. If it on the other hand is indeed a red herring and not consistently reproducible, the spread of values itself suggests to me that we hight have an area where some level of optimization to reduce the spread potentially could bring significant gains. |
I tweaked UUIDTests to run some simulations with various patterns for // common bytes first
uuidBytes[i++] = (byte) (timestamp >>> 40); // changes every 35 years
uuidBytes[i++] = (byte) (timestamp >>> 32); // changes every ~50 days
assert macAddress.length == 6;
System.arraycopy(macAddress, 0, uuidBytes, i, macAddress.length);
i += macAddress.length;
uuidBytes[i++] = (byte) (timestamp >>> 24); // changes every ~4.5h
uuidBytes[i++] = (byte) (timestamp >>> 16); // changes every ~65 secs
uuidBytes[i++] = (byte) (sequenceId >>> 16); // changes every ~65k docs
uuidBytes[i++] = (byte) (timestamp >>> 8);
uuidBytes[i++] = (byte) (sequenceId >>> 8);
uuidBytes[i++] = (byte) timestamp;
uuidBytes[i++] = (byte) sequenceId; Indices were forced-merged before computing disk and memory usage. 50,000,000 at 8k docs/s
43,200,000 documents indexed at 500 docs/s (simulates an entire day of indexing since 24*60*60*500=43,200,000):
It's interesting to notice how differently these patterns perform depending on the configuration. For instance the current format looks bad in the first test regarding memory usage, but is on par with most other formats in the second test while providing better disk usage than all but one. The current pattern for I'm happy to test more formats if needed. |
@jpountz That is very interesting. The It would also be interesting to see if/how changing the MAC address (primary shard relocation) during the run affects the results. The difference in size on disk looks large, but as this only covers ID storage (only small part of shard disk usage), would it be correct to assume the difference as a percentage would be much smaller in a real use-case? The |
Here are some more numbers. 50M documents are indexed in every case. I also ran the test with 5 different mac addresses, which gives numbers for a shrinked index and probably a worst-case scenario for a shard that gets relocated while indexing.
Yes. |
Based on this, the |
One additional use case that would be useful to simulate is when we have a long retention period and overshard during indexing (low indexing rate and multiple MAC addresses active during the same time period) and then use the shrink index API to merge these into fewer shards. |
One observation: given that the terms index (the part that is in memory) is built for each prefix that is shared by 25 to 40 terms, its memory usage per term tends to increase upon merging when ids are interleaved (average shared prefix length increases) and to remain the same if ids are not interleaved at all (average prefix length remains the same), eg. if all ids of one segment are greater that all ids of the other segment. So if we want an ID pattern whose memory usage doesn't increase as more documents are added, we should try to pick a pattern that produces increasing ids. |
This program may be used to simulate indexing/disk usage. It only needs Lucene on the classpath. import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.LongSupplier;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
public class DiskMemIdUsage {
private static final int NUM_INDEXING_THREADS = 8;
private static final int MAC_ADDRESS_LENGTH = 6;
private static final int ID_LENGTH = 15;
private static final int BATCH_SIZE = 10_000;
private static interface IDFormat {
public void setId(long timestamp, byte[] macAddress, int sequenceId, byte[] idBytes);
}
private static final IDFormat FLAKE_IDS = new IDFormat() {
@Override
public void setId(long timestamp, byte[] macAddress, int sequenceId, byte[] idBytes) {
int i = 0;
idBytes[i++] = (byte) (timestamp >>> 40); // changes every 35 years
idBytes[i++] = (byte) (timestamp >>> 32); // changes every ~50 days
idBytes[i++] = (byte) (timestamp >>> 24); // changes every ~4.5h
idBytes[i++] = (byte) (timestamp >>> 16); // changes every ~65 secs
idBytes[i++] = (byte) (timestamp >>> 8);
idBytes[i++] = (byte) timestamp;
assert macAddress.length == 6;
System.arraycopy(macAddress, 0, idBytes, i, macAddress.length);
i += macAddress.length;
idBytes[i++] = (byte) (sequenceId >>> 16); // changes every ~65k docs
idBytes[i++] = (byte) (sequenceId >>> 8);
idBytes[i++] = (byte) sequenceId;
assert i == ID_LENGTH;
}
@Override
public String toString() {
return "FLAKE";
}
};
private static final IDFormat CURRENT_IDS = new IDFormat() {
@Override
public void setId(long timestamp, byte[] macAddress, int sequenceId, byte[] idBytes) {
int i = 0;
idBytes[i++] = (byte) sequenceId;
// changes every 65k docs, so potentially every second if you have a steady indexing rate
idBytes[i++] = (byte) (sequenceId >>> 16);
// Now we start focusing on compression and put bytes that should not change too often.
idBytes[i++] = (byte) (timestamp >>> 16); // changes every ~65 secs
idBytes[i++] = (byte) (timestamp >>> 24); // changes every ~4.5h
idBytes[i++] = (byte) (timestamp >>> 32); // changes every ~50 days
idBytes[i++] = (byte) (timestamp >>> 40); // changes every 35 years
assert macAddress.length == 6;
System.arraycopy(macAddress, 0, idBytes, i, macAddress.length);
i += macAddress.length;
// Finally we put the remaining bytes, which will likely not be compressed at all.
idBytes[i++] = (byte) (timestamp >>> 8);
idBytes[i++] = (byte) (sequenceId >>> 8);
idBytes[i++] = (byte) timestamp;
assert i == ID_LENGTH;
}
@Override
public String toString() {
return "CURRENT";
}
};
private static LongSupplier timestampSupplier(Random random, double docsPerSecond) {
final double averageIntervalBetweenDocsMs = 1000 / docsPerSecond;
return new LongSupplier() {
long initialTimestamp = System.currentTimeMillis();
double addend = 0;
@Override
public long getAsLong() {
long value = initialTimestamp + (long) addend;
addend += random.nextDouble() * 2 * averageIntervalBetweenDocsMs;
return value;
}
};
}
private static class ResourceUsage {
final double diskUsagePerDoc;
final double memoryUsagePerDoc;
ResourceUsage(double diskUsagePerDoc, double memoryUsagePerDoc) {
this.diskUsagePerDoc = diskUsagePerDoc;
this.memoryUsagePerDoc = memoryUsagePerDoc;
}
}
private static BiConsumer<byte[], byte[]> idGenerator(IDFormat format, double docsPerSecond) {
final Random random = new Random(0);
return new BiConsumer<byte[], byte[]>() {
final LongSupplier timestampSupplier = timestampSupplier(random, docsPerSecond);
long maxTimestamp = Long.MIN_VALUE;
int sequenceId = random.nextInt();
@Override
public synchronized void accept(byte[] macAddress, byte[] idBytes) {
assert macAddress.length == 6;
assert idBytes.length == ID_LENGTH;
long timestamp = Math.max(maxTimestamp, timestampSupplier.getAsLong());
if (++sequenceId == 0) {
timestamp++;
}
maxTimestamp = timestamp;
format.setId(timestamp, macAddress, sequenceId, idBytes);
}
};
}
private static ResourceUsage simulateResourceUsage(int numDocs, BiConsumer<byte[], byte[]> idGenerator, int numNodes) throws IOException, InterruptedException {
final byte[][] macAddresses = new byte[numNodes][];
for (int i = 0; i < macAddresses.length; ++i) {
macAddresses[i] = new byte[MAC_ADDRESS_LENGTH];
ThreadLocalRandom.current().nextBytes(macAddresses[i]);
}
Path path = Files.createTempDirectory("bench_id");
IndexWriterConfig config = new IndexWriterConfig()
.setRAMBufferSizeMB(512);
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(NUM_INDEXING_THREADS - 1, NUM_INDEXING_THREADS - 1,
10, TimeUnit.MINUTES, new ArrayBlockingQueue<>(NUM_INDEXING_THREADS), new CallerRunsPolicy());
try (Directory dir = FSDirectory.open(path); IndexWriter w = new IndexWriter(dir, config)) {
for (int remainingDocs = numDocs; remainingDocs > 0; ) {
final int batchSize = Math.min(remainingDocs, BATCH_SIZE);
final byte[] macAddress = macAddresses[ThreadLocalRandom.current().nextInt(numNodes)];
threadPool.submit(new Runnable() {
@Override
public void run() {
byte[] id = new byte[ID_LENGTH];
StringField field = new StringField("_id", new BytesRef(id), Store.NO);
Document doc = new Document();
doc.add(field);
try {
for (int i = 0; i < batchSize; ++i) {
idGenerator.accept(macAddress, id);
w.addDocument(doc);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
});
remainingDocs -= batchSize;
}
threadPool.shutdown();
threadPool.awaitTermination(10, TimeUnit.MINUTES);
w.forceMerge(1);
long diskSize = 0;
for (String file : dir.listAll()) {
diskSize += dir.fileLength(file);
}
final long memorySize;
try (IndexReader reader = DirectoryReader.open(w)) {
if (reader.numDocs() != numDocs || reader.leaves().size() != 1) {
throw new Error();
}
SegmentReader onlySegmentReader = (SegmentReader) reader.leaves().get(0).reader();
memorySize = onlySegmentReader.ramBytesUsed();
}
return new ResourceUsage((double) diskSize / numDocs, (double) memorySize / numDocs);
} finally {
try {
IOUtils.rm(path);
} finally {
threadPool.shutdownNow();
}
}
}
public static void main(String[] args) throws Exception {
System.out.println("num_docs\tnum_nodes\tdocs/s\tformat\tdisk/doc\tmemory/doc");
for (int numDocs : new int[] { 50_000_000 }) {
for (int numNodes : new int[] { 1, 5 }) {
for (int docsPerSecond: new int[] { 100, 1000, 10000 }) {
for (IDFormat idFormat : new IDFormat[] { FLAKE_IDS, CURRENT_IDS }) {
BiConsumer<byte[], byte[]> idGenerator = idGenerator(idFormat, docsPerSecond);
ResourceUsage resourceUsage = simulateResourceUsage(numDocs, idGenerator, numNodes);
System.out.println(String.format("%d\t%d\t%d\t%s\t%2f\t%2f",
numDocs, numNodes, docsPerSecond, idFormat.toString(),
resourceUsage.diskUsagePerDoc, resourceUsage.memoryUsagePerDoc));
}
}
}
}
}
} |
A smaller memory footprint for the Here is a small program which directly uses the finite state transducers classes from Lucene and returns their memory size for 100000 identifiers with different formats and sequences. Without surprise, shared prefixes are efficiently compressed (344 bytes for the padded format with pure incremental ids) whereas high cardinality prefixes take a lot of memory (3MB with random ids !)
|
Pinging @elastic/es-core-infra |
Closed via #52405. |
I have recently run some benchmarks simulating dense nodes, and have noticed that heap usage related to storage of document IDs is the main driver behind heap usage for at least some types of data. To see how different types of document IDs perform/behave, I created a Rally challenge based on the rally-eventdata-track to index ~25GB data into single-shard indices using a few different types of document IDs.
I imported the metrics in Elasticsearch and got the following when visualizing it in Kibana:
1. Heap usage per 120M documents by type of document ID
2. Index size on disk for 120M documents by type of document ID
Most of the alternative ID types tested contains a large random portion (UUID4, SHA*). It is interesting to see that the autogenerated IDs seem to take up less space on disk but use more than twice the amount of heap compared to the other types of IDs.
As using autogenerated IDs give a speed advantage at indexing time, I think it would be great to make the format/.structure of autogenerated IDs configurable per index so the user can choose between optimizing for heap or disk usage. This does not need to support arbitrary formats, but one option that is more efficient with respect to heap usage would be useful.
The text was updated successfully, but these errors were encountered: