Skip to content

Commit addf09a

Browse files
committed
Replace exact numDocs by soft-del count in SegmentInfo (#31086)
This PR adapts/utilizes recent enhancements in Lucene-7.4: - Replaces exactNumDocs by the soft-deletes count in SegmentCommitInfo. This enhancement allows us to back out changes introduced in #30228. - Always configure the soft-deletes field in IWC
1 parent 8578caf commit addf09a

File tree

11 files changed

+33
-105
lines changed

11 files changed

+33
-105
lines changed

server/src/main/java/org/elasticsearch/common/lucene/Lucene.java

+3-12
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import org.apache.lucene.index.SegmentCommitInfo;
4545
import org.apache.lucene.index.SegmentInfos;
4646
import org.apache.lucene.index.SegmentReader;
47-
import org.apache.lucene.index.SoftDeletesDirectoryReaderWrapper;
4847
import org.apache.lucene.search.DocIdSetIterator;
4948
import org.apache.lucene.search.Explanation;
5049
import org.apache.lucene.search.FieldDoc;
@@ -145,21 +144,11 @@ public static Iterable<String> files(SegmentInfos infos) throws IOException {
145144
public static int getNumDocs(SegmentInfos info) {
146145
int numDocs = 0;
147146
for (SegmentCommitInfo si : info) {
148-
numDocs += si.info.maxDoc() - si.getDelCount();
147+
numDocs += si.info.maxDoc() - si.getDelCount() - si.getSoftDelCount();
149148
}
150149
return numDocs;
151150
}
152151

153-
/**
154-
* Unlike {@link #getNumDocs(SegmentInfos)} this method returns a numDocs that always excludes soft-deleted docs.
155-
* This method is expensive thus prefer using {@link #getNumDocs(SegmentInfos)} unless an exact numDocs is required.
156-
*/
157-
public static int getExactNumDocs(IndexCommit commit) throws IOException {
158-
try (DirectoryReader reader = DirectoryReader.open(commit)) {
159-
return new SoftDeletesDirectoryReaderWrapper(reader, Lucene.SOFT_DELETE_FIELD).numDocs();
160-
}
161-
}
162-
163152
/**
164153
* Reads the segments infos from the given commit, failing if it fails to load
165154
*/
@@ -212,6 +201,7 @@ public static SegmentInfos pruneUnreferencedFiles(String segmentsFileName, Direc
212201
}
213202
final CommitPoint cp = new CommitPoint(si, directory);
214203
try (IndexWriter writer = new IndexWriter(directory, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)
204+
.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD)
215205
.setIndexCommit(cp)
216206
.setCommitOnClose(false)
217207
.setMergePolicy(NoMergePolicy.INSTANCE)
@@ -235,6 +225,7 @@ public static void cleanLuceneIndex(Directory directory) throws IOException {
235225
}
236226
}
237227
try (IndexWriter writer = new IndexWriter(directory, new IndexWriterConfig(Lucene.STANDARD_ANALYZER)
228+
.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD)
238229
.setMergePolicy(NoMergePolicy.INSTANCE) // no merges
239230
.setCommitOnClose(false) // no commits
240231
.setOpenMode(IndexWriterConfig.OpenMode.CREATE))) // force creation - don't append...

server/src/main/java/org/elasticsearch/index/engine/CommitStats.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,13 @@ public final class CommitStats implements Streamable, ToXContentFragment {
3939
private String id; // lucene commit id in base 64;
4040
private int numDocs;
4141

42-
public CommitStats(SegmentInfos segmentInfos, int numDocs) {
42+
public CommitStats(SegmentInfos segmentInfos) {
4343
// clone the map to protect against concurrent changes
4444
userData = MapBuilder.<String, String>newMapBuilder().putAll(segmentInfos.getUserData()).immutableMap();
4545
// lucene calls the current generation, last generation.
4646
generation = segmentInfos.getLastGeneration();
4747
id = Base64.getEncoder().encodeToString(segmentInfos.getId());
48-
this.numDocs = numDocs;
48+
numDocs = Lucene.getNumDocs(segmentInfos);
4949
}
5050

5151
private CommitStats() {

server/src/main/java/org/elasticsearch/index/engine/Engine.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -632,9 +632,7 @@ protected final void ensureOpen() {
632632

633633
/** get commits stats for the last commit */
634634
public CommitStats commitStats() {
635-
try (Engine.Searcher searcher = acquireSearcher("commit_stats", Engine.SearcherScope.INTERNAL)) {
636-
return new CommitStats(getLastCommittedSegmentInfos(), searcher.reader().numDocs());
637-
}
635+
return new CommitStats(getLastCommittedSegmentInfos());
638636
}
639637

640638
/**

server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

+1
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ void addIndices(final RecoveryState.Index indexRecoveryStats, final Directory ta
156156
final Directory hardLinkOrCopyTarget = new org.apache.lucene.store.HardlinkCopyDirectoryWrapper(target);
157157

158158
IndexWriterConfig iwc = new IndexWriterConfig(null)
159+
.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD)
159160
.setCommitOnClose(false)
160161
// we don't want merges to happen here - we call maybe merge on the engine
161162
// later once we stared it up otherwise we would need to wait for it here

server/src/main/java/org/elasticsearch/index/store/Store.java

+2-11
Original file line numberDiff line numberDiff line change
@@ -864,7 +864,7 @@ static LoadedMetadata loadMetadata(IndexCommit commit, Directory directory, Logg
864864
Map<String, String> commitUserDataBuilder = new HashMap<>();
865865
try {
866866
final SegmentInfos segmentCommitInfos = Store.readSegmentsInfo(commit, directory);
867-
numDocs = Lucene.getExactNumDocs(commit != null ? commit : findIndexCommit(directory, segmentCommitInfos));
867+
numDocs = Lucene.getNumDocs(segmentCommitInfos);
868868
commitUserDataBuilder.putAll(segmentCommitInfos.getUserData());
869869
Version maxVersion = segmentCommitInfos.getMinSegmentLuceneVersion(); // we don't know which version was used to write so we take the max version.
870870
for (SegmentCommitInfo info : segmentCommitInfos) {
@@ -947,16 +947,6 @@ public static void hashFile(BytesRefBuilder fileHash, InputStream in, long size)
947947
assert fileHash.length() == len : Integer.toString(fileHash.length()) + " != " + Integer.toString(len);
948948
}
949949

950-
private static IndexCommit findIndexCommit(Directory directory, SegmentInfos sis) throws IOException {
951-
List<IndexCommit> commits = DirectoryReader.listCommits(directory);
952-
for (IndexCommit commit : commits) {
953-
if (commit.getSegmentsFileName().equals(sis.getSegmentsFileName())) {
954-
return commit;
955-
}
956-
}
957-
throw new IOException("Index commit [" + sis.getSegmentsFileName() + "] is not found");
958-
}
959-
960950
@Override
961951
public Iterator<StoreFileMetaData> iterator() {
962952
return metadata.values().iterator();
@@ -1629,6 +1619,7 @@ private static IndexWriter newIndexWriter(final IndexWriterConfig.OpenMode openM
16291619
throws IOException {
16301620
assert openMode == IndexWriterConfig.OpenMode.APPEND || commit == null : "can't specify create flag with a commit";
16311621
IndexWriterConfig iwc = new IndexWriterConfig(null)
1622+
.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD)
16321623
.setCommitOnClose(false)
16331624
.setIndexCommit(commit)
16341625
// we don't want merges to happen here - we call maybe merge on the engine

server/src/main/java/org/elasticsearch/index/translog/TruncateTranslogCommand.java

+2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.lucene.store.LockObtainFailedException;
3434
import org.apache.lucene.store.NativeFSLockFactory;
3535
import org.apache.lucene.store.OutputStreamDataOutput;
36+
import org.elasticsearch.common.lucene.Lucene;
3637
import org.elasticsearch.core.internal.io.IOUtils;
3738
import org.elasticsearch.ElasticsearchException;
3839
import org.elasticsearch.cli.EnvironmentAwareCommand;
@@ -179,6 +180,7 @@ protected void execute(Terminal terminal, OptionSet options, Environment env) th
179180
terminal.println("Marking index with the new history uuid");
180181
// commit the new histroy id
181182
IndexWriterConfig iwc = new IndexWriterConfig(null)
183+
.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD)
182184
.setCommitOnClose(false)
183185
// we don't want merges to happen here - we call maybe merge on the engine
184186
// later once we stared it up otherwise we would need to wait for it here

server/src/main/java/org/elasticsearch/indices/flush/SyncedFlushService.java

+6-11
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.elasticsearch.indices.flush;
2020

2121
import org.apache.logging.log4j.message.ParameterizedMessage;
22-
import org.apache.lucene.index.SegmentInfos;
2322
import org.elasticsearch.Assertions;
2423
import org.elasticsearch.ElasticsearchException;
2524
import org.elasticsearch.Version;
@@ -42,13 +41,13 @@
4241
import org.elasticsearch.common.inject.Inject;
4342
import org.elasticsearch.common.io.stream.StreamInput;
4443
import org.elasticsearch.common.io.stream.StreamOutput;
45-
import org.elasticsearch.common.lucene.Lucene;
4644
import org.elasticsearch.common.settings.Settings;
4745
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
4846
import org.elasticsearch.common.util.concurrent.CountDown;
4947
import org.elasticsearch.index.Index;
5048
import org.elasticsearch.index.IndexNotFoundException;
5149
import org.elasticsearch.index.IndexService;
50+
import org.elasticsearch.index.engine.CommitStats;
5251
import org.elasticsearch.index.engine.Engine;
5352
import org.elasticsearch.index.shard.IndexEventListener;
5453
import org.elasticsearch.index.shard.IndexShard;
@@ -468,19 +467,15 @@ public String executor() {
468467
}
469468
}
470469

471-
private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest request) throws IOException {
470+
private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest request) {
472471
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId().getIndex()).getShard(request.shardId().id());
473472
FlushRequest flushRequest = new FlushRequest().force(false).waitIfOngoing(true);
474473
logger.trace("{} performing pre sync flush", request.shardId());
475474
indexShard.flush(flushRequest);
476-
try (Engine.IndexCommitRef commitRef = indexShard.acquireLastIndexCommit(false)) {
477-
final SegmentInfos segmentInfos = Lucene.readSegmentInfos(commitRef.getIndexCommit());
478-
final int numDocs = Lucene.getExactNumDocs(commitRef.getIndexCommit());
479-
final Engine.CommitId commitId = new Engine.CommitId(segmentInfos.getId());
480-
final String syncId = segmentInfos.userData.get(Engine.SYNC_COMMIT_ID);
481-
logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, numDocs);
482-
return new PreSyncedFlushResponse(commitId, numDocs, syncId);
483-
}
475+
final CommitStats commitStats = indexShard.commitStats();
476+
final Engine.CommitId commitId = commitStats.getRawCommitId();
477+
logger.trace("{} pre sync flush done. commit id {}, num docs {}", request.shardId(), commitId, commitStats.getNumDocs());
478+
return new PreSyncedFlushResponse(commitId, commitStats.getNumDocs(), commitStats.syncId());
484479
}
485480

486481
private ShardSyncedFlushResponse performSyncedFlush(ShardSyncedFlushRequest request) {

server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,7 @@ public RecoveryResponse newInstance() {
289289
* @param recoveryTarget the target of the recovery
290290
* @return a snapshot of the store metadata
291291
*/
292-
static Store.MetadataSnapshot getStoreMetadataSnapshot(final Logger logger, final RecoveryTarget recoveryTarget) {
292+
private Store.MetadataSnapshot getStoreMetadataSnapshot(final RecoveryTarget recoveryTarget) {
293293
try {
294294
return recoveryTarget.indexShard().snapshotStoreMetadata();
295295
} catch (final org.apache.lucene.index.IndexNotFoundException e) {
@@ -312,7 +312,7 @@ private StartRecoveryRequest getStartRecoveryRequest(final RecoveryTarget recove
312312
final StartRecoveryRequest request;
313313
logger.trace("{} collecting local files for [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode());
314314

315-
final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(logger, recoveryTarget);
315+
final Store.MetadataSnapshot metadataSnapshot = getStoreMetadataSnapshot(recoveryTarget);
316316
logger.trace("{} local file count [{}]", recoveryTarget.shardId(), metadataSnapshot.size());
317317

318318
final long startingSeqNo;

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

+1
Original file line numberDiff line numberDiff line change
@@ -1427,6 +1427,7 @@ public void restore() throws IOException {
14271427
// empty shard would cause exceptions to be thrown. Since there is no data to restore from an empty
14281428
// shard anyway, we just create the empty shard here and then exit.
14291429
IndexWriter writer = new IndexWriter(store.directory(), new IndexWriterConfig(null)
1430+
.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD)
14301431
.setOpenMode(IndexWriterConfig.OpenMode.CREATE)
14311432
.setCommitOnClose(true));
14321433
writer.close();

server/src/test/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetServiceTests.java

+2-32
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,16 @@
2424
import org.apache.lucene.index.IndexWriter;
2525
import org.apache.lucene.index.IndexWriterConfig;
2626
import org.apache.lucene.index.NoMergePolicy;
27-
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
2827
import org.elasticsearch.common.UUIDs;
28+
import org.elasticsearch.common.lucene.Lucene;
2929
import org.elasticsearch.index.seqno.SequenceNumbers;
3030
import org.elasticsearch.index.shard.IndexShard;
3131
import org.elasticsearch.index.shard.IndexShardTestCase;
3232
import org.elasticsearch.index.translog.Translog;
3333

3434
import java.util.HashMap;
35-
import java.util.HashSet;
3635
import java.util.List;
3736
import java.util.Map;
38-
import java.util.Set;
3937

4038
import static org.hamcrest.Matchers.equalTo;
4139

@@ -94,6 +92,7 @@ public void testGetStartingSeqNo() throws Exception {
9492
replica.close("test", false);
9593
final List<IndexCommit> commits = DirectoryReader.listCommits(replica.store().directory());
9694
IndexWriterConfig iwc = new IndexWriterConfig(null)
95+
.setSoftDeletesField(Lucene.SOFT_DELETE_FIELD)
9796
.setCommitOnClose(false)
9897
.setMergePolicy(NoMergePolicy.INSTANCE)
9998
.setOpenMode(IndexWriterConfig.OpenMode.APPEND);
@@ -111,33 +110,4 @@ public void testGetStartingSeqNo() throws Exception {
111110
closeShards(replica);
112111
}
113112
}
114-
115-
public void testExactNumDocsInStoreMetadataSnapshot() throws Exception {
116-
final IndexShard replica = newShard(false);
117-
recoveryEmptyReplica(replica);
118-
long flushedDocs = 0;
119-
final int numDocs = scaledRandomIntBetween(1, 20);
120-
final Set<String> docIds = new HashSet<>();
121-
for (int i = 0; i < numDocs; i++) {
122-
String id = Integer.toString(i);
123-
docIds.add(id);
124-
indexDoc(replica, "_doc", id);
125-
if (randomBoolean()) {
126-
replica.flush(new FlushRequest());
127-
flushedDocs = docIds.size();
128-
}
129-
}
130-
for (String id : randomSubsetOf(docIds)) {
131-
deleteDoc(replica, "_doc", id);
132-
docIds.remove(id);
133-
if (randomBoolean()) {
134-
replica.flush(new FlushRequest());
135-
flushedDocs = docIds.size();
136-
}
137-
}
138-
final RecoveryTarget recoveryTarget = new RecoveryTarget(replica, null, null, null);
139-
assertThat(PeerRecoveryTargetService.getStoreMetadataSnapshot(logger, recoveryTarget).getNumDocs(), equalTo(flushedDocs));
140-
recoveryTarget.decRef();
141-
closeShards(replica);
142-
}
143113
}

test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

+11-32
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@
2626
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
2727
import org.apache.logging.log4j.Logger;
2828
import org.apache.lucene.store.AlreadyClosedException;
29-
import org.elasticsearch.common.collect.Tuple;
30-
import org.elasticsearch.common.lucene.Lucene;
3129
import org.elasticsearch.core.internal.io.IOUtils;
3230
import org.elasticsearch.ElasticsearchException;
3331
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
@@ -77,10 +75,7 @@
7775
import org.elasticsearch.index.IndexService;
7876
import org.elasticsearch.index.engine.CommitStats;
7977
import org.elasticsearch.index.engine.Engine;
80-
import org.elasticsearch.index.engine.EngineTestCase;
81-
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
8278
import org.elasticsearch.index.shard.IndexShard;
83-
import org.elasticsearch.index.shard.IndexShardState;
8479
import org.elasticsearch.index.shard.IndexShardTestCase;
8580
import org.elasticsearch.index.shard.ShardId;
8681
import org.elasticsearch.indices.IndicesService;
@@ -1108,7 +1103,7 @@ public void beforeIndexDeletion() throws Exception {
11081103
// ElasticsearchIntegrationTest must override beforeIndexDeletion() to avoid failures.
11091104
assertNoPendingIndexOperations();
11101105
//check that shards that have same sync id also contain same number of documents
1111-
assertSameSyncIdSameDocs();
1106+
assertSameSyncIdSameDocs();
11121107
assertOpenTranslogReferences();
11131108
}
11141109

@@ -1119,39 +1114,23 @@ private void assertSameSyncIdSameDocs() {
11191114
IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name);
11201115
for (IndexService indexService : indexServices) {
11211116
for (IndexShard indexShard : indexService) {
1122-
Tuple<String, Integer> commitStats = commitStats(indexShard);
1123-
if (commitStats != null) {
1124-
String syncId = commitStats.v1();
1125-
long liveDocsOnShard = commitStats.v2();
1126-
if (docsOnShards.get(syncId) != null) {
1127-
assertThat("sync id is equal but number of docs does not match on node " + nodeAndClient.name +
1128-
". expected " + docsOnShards.get(syncId) + " but got " + liveDocsOnShard, docsOnShards.get(syncId),
1129-
equalTo(liveDocsOnShard));
1130-
} else {
1131-
docsOnShards.put(syncId, liveDocsOnShard);
1117+
CommitStats commitStats = indexShard.commitStats();
1118+
if (commitStats != null) { // null if the engine is closed or if the shard is recovering
1119+
String syncId = commitStats.getUserData().get(Engine.SYNC_COMMIT_ID);
1120+
if (syncId != null) {
1121+
long liveDocsOnShard = commitStats.getNumDocs();
1122+
if (docsOnShards.get(syncId) != null) {
1123+
assertThat("sync id is equal but number of docs does not match on node " + nodeAndClient.name + ". expected " + docsOnShards.get(syncId) + " but got " + liveDocsOnShard, docsOnShards.get(syncId), equalTo(liveDocsOnShard));
1124+
} else {
1125+
docsOnShards.put(syncId, liveDocsOnShard);
1126+
}
11321127
}
11331128
}
11341129
}
11351130
}
11361131
}
11371132
}
11381133

1139-
private Tuple<String, Integer> commitStats(IndexShard indexShard) {
1140-
try (Engine.IndexCommitRef commitRef = indexShard.acquireLastIndexCommit(false)) {
1141-
final String syncId = commitRef.getIndexCommit().getUserData().get(Engine.SYNC_COMMIT_ID);
1142-
// Only read if sync_id exists
1143-
if (Strings.hasText(syncId)) {
1144-
return Tuple.tuple(syncId, Lucene.getExactNumDocs(commitRef.getIndexCommit()));
1145-
} else {
1146-
return null;
1147-
}
1148-
} catch (IllegalIndexShardStateException ex) {
1149-
return null; // Shard is closed or not started yet.
1150-
} catch (IOException ex) {
1151-
throw new AssertionError(ex);
1152-
}
1153-
}
1154-
11551134
private void assertNoPendingIndexOperations() throws Exception {
11561135
assertBusy(() -> {
11571136
final Collection<NodeAndClient> nodesAndClients = nodes.values();

0 commit comments

Comments
 (0)