Skip to content

Add a new index setting to skip recovery source when synthetic source is enabled #114618

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 55 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
eccfa09
add benchmark for lucene changes snapshot
jimczi Oct 10, 2024
a27cea4
enable logsdb mode
jimczi Oct 10, 2024
a11aa24
iter
jimczi Oct 11, 2024
1b0368e
Merge remote-tracking branch 'upstream/main' into lucene_changes_synt…
jimczi Oct 11, 2024
76ea21f
Update docs/changelog/114618.yaml
jimczi Oct 11, 2024
a3932e9
spotless
jimczi Oct 11, 2024
d4969eb
spotless
jimczi Oct 11, 2024
9668f5e
remove leftover
jimczi Oct 15, 2024
0e02eb4
Merge remote-tracking branch 'upstream/main' into lucene_changes_synt…
jimczi Oct 28, 2024
a96648d
Plug the new snapshot when needed and add tests
jimczi Nov 6, 2024
c810058
Merge remote-tracking branch 'upstream/main' into lucene_changes_synt…
jimczi Nov 6, 2024
edd38bb
spotless
jimczi Nov 6, 2024
b99f66f
remove leftover
jimczi Nov 6, 2024
497e0e0
fix NPE
jimczi Nov 6, 2024
d6a4326
Merge remote-tracking branch 'upstream/main' into lucene_changes_synt…
jimczi Nov 6, 2024
f08c190
spotless
jimczi Nov 6, 2024
cf5912e
another NPE in test
jimczi Nov 6, 2024
67ca170
Merge remote-tracking branch 'upstream/main' into lucene_changes_synt…
jimczi Nov 13, 2024
6b08b40
Address review comments
jimczi Nov 14, 2024
f7024ef
add a mapper test case for all synthetic field mapper
jimczi Nov 14, 2024
4f246ad
remove micro benchmark
jimczi Nov 14, 2024
20f7c0b
Merge remote-tracking branch 'upstream/main' into lucene_changes_synt…
jimczi Nov 18, 2024
f5cf813
apply review comment
jimczi Nov 18, 2024
b02a4bd
fix randomization
jimczi Nov 18, 2024
2124324
Merge remote-tracking branch 'upstream/main' into lucene_changes_synt…
jimczi Dec 3, 2024
23fe840
address review comments
jimczi Dec 3, 2024
93c5c08
Update server/src/main/java/org/elasticsearch/index/engine/LuceneChan…
jimczi Dec 4, 2024
b088c4f
missing import
jimczi Dec 4, 2024
fdfb2cb
disallow changing the new setting on resize and restore
jimczi Dec 4, 2024
d7207e4
add a new indices.recovery.chunk_size to control the maximum allowed …
jimczi Dec 4, 2024
813035c
Merge remote-tracking branch 'upstream/main' into lucene_changes_synt…
jimczi Dec 4, 2024
cd0747e
Merge remote-tracking branch 'upstream/main' into lucene_changes_synt…
jimczi Dec 4, 2024
ea1ede9
Merge remote-tracking branch 'upstream/main' into lucene_changes_synt…
jimczi Dec 5, 2024
c8625d8
Address review comments
jimczi Dec 5, 2024
8a20ff8
Restore removed function in Engine class since the engines in serverl…
jimczi Dec 5, 2024
0a5d866
Update change log
jimczi Dec 5, 2024
3ae2475
remove test plugin to set the recovery chunk size now that a setting …
jimczi Dec 5, 2024
52c8966
fix default implementation for the newChangesSnapshot flavour
jimczi Dec 5, 2024
9a32666
Merge remote-tracking branch 'upstream/main' into lucene_changes_synt…
jimczi Dec 5, 2024
a3242f5
add missing changes
jimczi Dec 5, 2024
874e876
Merge remote-tracking branch 'upstream/main' into lucene_changes_synt…
jimczi Dec 5, 2024
91d399a
fix more IT tests now that the chunk size setting is registered
jimczi Dec 6, 2024
5728aa5
Merge branch 'main' into lucene_changes_synthetic_snapshot
jimczi Dec 6, 2024
64f4a51
Also check index mode when validating the new setting
jimczi Dec 6, 2024
0394eff
Merge remote-tracking branch 'upstream/main' into lucene_changes_synt…
jimczi Dec 6, 2024
43c25d8
Merge remote-tracking branch 'origin/lucene_changes_synthetic_snapsho…
jimczi Dec 6, 2024
0374b86
Merge branch 'main' into lucene_changes_synthetic_snapshot
jimczi Dec 6, 2024
df01f6c
Merge branch 'main' into lucene_changes_synthetic_snapshot
jimczi Dec 6, 2024
4538a25
Merge remote-tracking branch 'upstream/main' into lucene_changes_synt…
jimczi Dec 6, 2024
e12b054
Merge branch 'main' into lucene_changes_synthetic_snapshot
jimczi Dec 9, 2024
e9f534e
Merge remote-tracking branch 'origin/lucene_changes_synthetic_snapsho…
jimczi Dec 9, 2024
3d290aa
fix default impl for newChangesSnapshot
jimczi Dec 9, 2024
a0fa3fd
ensure that we can buffer at least one document
jimczi Dec 9, 2024
873e265
Merge remote-tracking branch 'upstream/main' into lucene_changes_synt…
jimczi Dec 9, 2024
99eaf67
Merge branch 'main' into lucene_changes_synthetic_snapshot
jimczi Dec 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.benchmark.index;

import org.apache.lucene.document.NumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.SortField;
import org.apache.lucene.search.similarities.BM25Similarity;
import org.apache.lucene.store.FSDirectory;
import org.elasticsearch.benchmark.index.mapper.MapperServiceFactory;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.logging.LogConfigurator;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.codec.PerFieldMapperCodec;
import org.elasticsearch.index.codec.zstd.Zstd814StoredFieldsFormat;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.LuceneChangesSnapshot;
import org.elasticsearch.index.engine.LuceneSyntheticSourceChangesSnapshot;
import org.elasticsearch.index.engine.SearchBasedChangesSnapshot;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xcontent.XContentParserConfiguration;
import org.elasticsearch.xcontent.XContentType;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OperationsPerInvocation;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Warmup;

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;

import static org.elasticsearch.index.engine.Engine.ROOT_DOC_FIELD_NAME;

@Fork(value = 1)
@Warmup(iterations = 3, time = 3)
@Measurement(iterations = 5, time = 3)
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Thread)
public class LuceneChangesSnapshotBenchmark {
@Param({ "default", "logsdb@1kb", "logsdb@4MB" })
String mode;

@Param({ "false", "true" })
boolean sequential;

@Param({ "logs-endpoint-events-process", "logs-endpoint-events-security", "logs-kafka-log" })
String dataset;

static final int NUM_OPS = 10000;

Path path;
MapperService mapperService;
FSDirectory dir;
IndexReader reader;
Engine.Searcher searcher;

static {
LogConfigurator.configureESLogging(); // native access requires logging to be initialized
}

@Setup
public void setup() throws IOException {
this.path = Files.createTempDirectory("snapshot_changes");
Settings settings = mode.startsWith("logsdb")
? Settings.builder()
.put("index.mode", "logsdb")
.put(IndexSettings.RECOVERY_SOURCE_SYNTHETIC_ENABLED_SETTING.getKey(), true)
.build()
: Settings.EMPTY;
this.mapperService = MapperServiceFactory.create(settings, readMappings(dataset));
IndexWriterConfig config = new IndexWriterConfig();
config.setCodec(
new PerFieldMapperCodec(Zstd814StoredFieldsFormat.Mode.BEST_COMPRESSION, mapperService, BigArrays.NON_RECYCLING_INSTANCE)
);
if (sequential == false) {
config.setIndexSort(new Sort(new SortField[] { new SortField("rand", SortField.Type.LONG) }));
config.setParentField(ROOT_DOC_FIELD_NAME);
}
try (FSDirectory dir = FSDirectory.open(path); IndexWriter writer = new IndexWriter(dir, config);) {
try (
var inputStream = readSampleDocs(dataset);
XContentParser parser = XContentType.JSON.xContent().createParser(XContentParserConfiguration.EMPTY, inputStream)
) {
int id = 0;
// find the hits array
while (parser.nextToken() != XContentParser.Token.START_ARRAY) {
parser.nextToken();
}
Random rand = new Random();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
// skip start object
parser.nextToken();
// skip _source field name
parser.nextToken();
XContentBuilder source = XContentBuilder.builder(XContentType.JSON.xContent());
source.copyCurrentStructure(parser);
var sourceBytes = BytesReference.bytes(source);
SourceToParse sourceToParse = new SourceToParse(Integer.toString(id), sourceBytes, XContentType.JSON);
ParsedDocument doc = mapperService.documentMapper().parse(sourceToParse);
doc.rootDoc().add(new NumericDocValuesField("rand", rand.nextInt()));
doc.updateSeqID(id, 0);
doc.version().setLongValue(0);
writer.addDocuments(doc.docs());
id++;
parser.nextToken();
}
}
}
this.dir = FSDirectory.open(path);
this.reader = ElasticsearchDirectoryReader.wrap(
DirectoryReader.open(dir),
new ShardId(mapperService.getIndexSettings().getIndex(), 0)
);
long sizeInBytes = 0;
for (LeafReaderContext readerContext : reader.leaves()) {
// we go on the segment level here to get accurate numbers
final SegmentReader segmentReader = Lucene.segmentReader(readerContext.reader());
SegmentCommitInfo info = segmentReader.getSegmentInfo();
try {
sizeInBytes += info.sizeInBytes();
} catch (IOException e) {}
}
System.out.println("Size: " + ByteSizeValue.ofBytes(sizeInBytes));

this.searcher = new Engine.Searcher("snapshot", reader, new BM25Similarity(), null, new QueryCachingPolicy() {
@Override
public void onUse(Query query) {}

@Override
public boolean shouldCache(Query query) throws IOException {
return false;
}
}, () -> {});
}

@TearDown
public void tearDown() {
try {
for (var file : dir.listAll()) {
dir.deleteFile(file);
}
Files.delete(path);
} catch (IOException e) {
throw new RuntimeException(e);
}
IOUtils.closeWhileHandlingException(searcher, reader, dir);
}

@Benchmark
@OperationsPerInvocation(NUM_OPS)
public long recover() throws IOException {
String indexMode = mode.split("@")[0];
Translog.Snapshot snapshot = switch (mapperService.getIndexSettings().getMode()) {
case LOGSDB:
assert indexMode.equals("logsdb");
long maxMemorySize = ByteSizeValue.parseBytesSizeValue(mode.split("@")[1], "").getBytes();
yield new LuceneSyntheticSourceChangesSnapshot(
mapperService.mappingLookup(),
searcher,
SearchBasedChangesSnapshot.DEFAULT_BATCH_SIZE,
maxMemorySize,
0,
NUM_OPS - 1,
true,
true,
IndexVersion.current()
);

default:
assert indexMode.equals("default");
yield new LuceneChangesSnapshot(
searcher,
SearchBasedChangesSnapshot.DEFAULT_BATCH_SIZE,
0,
NUM_OPS - 1,
true,
true,
true,
IndexVersion.current()
);
};

long totalSize = 0;
try (snapshot) {
Translog.Operation op;
while ((op = snapshot.next()) != null) {
totalSize += op.estimateSize();
}
}
return totalSize;
}

private String readMappings(String dataset) throws IOException {
return Streams.readFully(LuceneChangesSnapshotBenchmark.class.getResourceAsStream(dataset + "-mappings.json")).utf8ToString();
}

private InputStream readSampleDocs(String dataset) throws IOException {
return new GZIPInputStream(LuceneChangesSnapshotBenchmark.class.getResourceAsStream(dataset + "-sample-docs.json.gz"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,13 @@
import java.util.Map;

public class MapperServiceFactory {

public static MapperService create(String mappings) {
return create(Settings.EMPTY, mappings);
}

public static MapperService create(Settings userSettings, String mappings) {
Settings settings = Settings.builder()
.put(userSettings)
.put("index.number_of_replicas", 0)
.put("index.number_of_shards", 1)
.put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current())
Expand Down
Loading