Skip to content

Commit 97b9d2b

Browse files
martijnvgdnhatn
authored andcommitted
[CCR] Read changes from Lucene instead of translog (#30120)
This commit adds an API to read translog snapshot from Lucene, then cut-over from the existing translog to the new API in CCR. Relates #30086 Relates #29530
1 parent 07a5aeb commit 97b9d2b

File tree

12 files changed

+833
-182
lines changed

12 files changed

+833
-182
lines changed

server/src/main/java/org/elasticsearch/index/engine/Engine.java

+7
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.elasticsearch.common.unit.TimeValue;
5959
import org.elasticsearch.common.util.concurrent.ReleasableLock;
6060
import org.elasticsearch.index.VersionType;
61+
import org.elasticsearch.index.mapper.MapperService;
6162
import org.elasticsearch.index.mapper.Mapping;
6263
import org.elasticsearch.index.mapper.ParseContext.Document;
6364
import org.elasticsearch.index.mapper.ParsedDocument;
@@ -609,6 +610,12 @@ public Translog.Location getTranslogLastWriteLocation() {
609610
return getTranslog().getLastWriteLocation();
610611
}
611612

613+
/**
614+
* Creates a new "translog" snapshot from Lucene for reading operations whose seqno in the requesting seqno range
615+
*/
616+
public abstract Translog.Snapshot newLuceneChangesSnapshot(String source, MapperService mapperService,
617+
long minSeqNo, long maxSeqNo, boolean requiredFullRange) throws IOException;
618+
612619
protected final void ensureOpen(Exception suppressed) {
613620
if (isClosed.get()) {
614621
AlreadyClosedException ace = new AlreadyClosedException(shardId + " engine is closed", failedEngine.get());

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

+45
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import org.elasticsearch.index.IndexSettings;
7070
import org.elasticsearch.index.VersionType;
7171
import org.elasticsearch.index.mapper.IdFieldMapper;
72+
import org.elasticsearch.index.mapper.MapperService;
7273
import org.elasticsearch.index.mapper.ParseContext;
7374
import org.elasticsearch.index.mapper.ParsedDocument;
7475
import org.elasticsearch.index.mapper.UidFieldMapper;
@@ -152,6 +153,7 @@ public class InternalEngine extends Engine {
152153
private final CounterMetric numDocUpdates = new CounterMetric();
153154
private final NumericDocValuesField softDeleteField = Lucene.newSoftDeleteField();
154155
private final boolean softDeleteEnabled;
156+
private final LastRefreshedCheckpointListener lastRefreshedCheckpointListener;
155157

156158
/**
157159
* How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh. IndexingMemoryController polls this
@@ -229,6 +231,8 @@ public InternalEngine(EngineConfig engineConfig) {
229231
for (ReferenceManager.RefreshListener listener: engineConfig.getInternalRefreshListener()) {
230232
this.internalSearcherManager.addListener(listener);
231233
}
234+
this.lastRefreshedCheckpointListener = new LastRefreshedCheckpointListener(localCheckpointTracker.getCheckpoint());
235+
this.internalSearcherManager.addListener(lastRefreshedCheckpointListener);
232236
success = true;
233237
} finally {
234238
if (success == false) {
@@ -2402,6 +2406,23 @@ long getNumDocUpdates() {
24022406
return numDocUpdates.count();
24032407
}
24042408

2409+
public Translog.Snapshot newLuceneChangesSnapshot(String source, MapperService mapperService,
2410+
long minSeqNo, long maxSeqNo, boolean requiredFullRange) throws IOException {
2411+
// TODO: Should we defer the refresh until we really need it?
2412+
ensureOpen();
2413+
if (lastRefreshedCheckpoint() < maxSeqNo) {
2414+
refresh(source, SearcherScope.INTERNAL);
2415+
}
2416+
Searcher searcher = acquireSearcher(source, SearcherScope.INTERNAL);
2417+
try {
2418+
LuceneChangesSnapshot snapshot = new LuceneChangesSnapshot(searcher, mapperService, minSeqNo, maxSeqNo, requiredFullRange);
2419+
searcher = null;
2420+
return snapshot;
2421+
} finally {
2422+
IOUtils.close(searcher);
2423+
}
2424+
}
2425+
24052426
@Override
24062427
public boolean isRecovering() {
24072428
return pendingTranslogRecovery.get();
@@ -2448,4 +2469,28 @@ public long softUpdateDocuments(Term term, Iterable<? extends Iterable<? extends
24482469
return super.softUpdateDocuments(term, docs, softDeletes);
24492470
}
24502471
}
2472+
2473+
/**
2474+
* Returned the last local checkpoint value has been refreshed internally.
2475+
*/
2476+
final long lastRefreshedCheckpoint() {
2477+
return lastRefreshedCheckpointListener.refreshedCheckpoint.get();
2478+
}
2479+
private final class LastRefreshedCheckpointListener implements ReferenceManager.RefreshListener {
2480+
final AtomicLong refreshedCheckpoint;
2481+
private long pendingCheckpoint;
2482+
LastRefreshedCheckpointListener(long initialLocalCheckpoint) {
2483+
this.refreshedCheckpoint = new AtomicLong(initialLocalCheckpoint);
2484+
}
2485+
@Override
2486+
public void beforeRefresh() {
2487+
pendingCheckpoint = localCheckpointTracker.getCheckpoint(); // All change until this point should be visible after refresh
2488+
}
2489+
@Override
2490+
public void afterRefresh(boolean didRefresh) {
2491+
if (didRefresh) {
2492+
refreshedCheckpoint.set(pendingCheckpoint);
2493+
}
2494+
}
2495+
}
24512496
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.index.engine;
21+
22+
import org.apache.lucene.document.LongPoint;
23+
import org.apache.lucene.index.LeafReader;
24+
import org.apache.lucene.index.LeafReaderContext;
25+
import org.apache.lucene.index.NumericDocValues;
26+
import org.apache.lucene.index.ReaderUtil;
27+
import org.apache.lucene.index.Term;
28+
import org.apache.lucene.search.DocIdSetIterator;
29+
import org.apache.lucene.search.IndexSearcher;
30+
import org.apache.lucene.search.Query;
31+
import org.apache.lucene.search.Sort;
32+
import org.apache.lucene.search.SortField;
33+
import org.apache.lucene.search.SortedNumericSortField;
34+
import org.apache.lucene.search.TopDocs;
35+
import org.elasticsearch.common.bytes.BytesReference;
36+
import org.elasticsearch.common.lucene.Lucene;
37+
import org.elasticsearch.index.VersionType;
38+
import org.elasticsearch.index.fieldvisitor.FieldsVisitor;
39+
import org.elasticsearch.index.mapper.IdFieldMapper;
40+
import org.elasticsearch.index.mapper.MapperService;
41+
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
42+
import org.elasticsearch.index.mapper.Uid;
43+
import org.elasticsearch.index.mapper.VersionFieldMapper;
44+
import org.elasticsearch.index.translog.Translog;
45+
46+
import java.io.Closeable;
47+
import java.io.IOException;
48+
import java.util.List;
49+
import java.util.Objects;
50+
51+
/**
52+
* A {@link Translog.Snapshot} from changes in a Lucene index
53+
*/
54+
final class LuceneChangesSnapshot implements Translog.Snapshot {
55+
private final long fromSeqNo, toSeqNo;
56+
private long lastSeenSeqNo;
57+
private int skippedOperations;
58+
private final boolean requiredFullRange;
59+
60+
private final IndexSearcher indexSearcher;
61+
private final MapperService mapperService;
62+
private int docIndex = 0;
63+
private final TopDocs topDocs;
64+
65+
private final Closeable onClose;
66+
private final CombinedDocValues[] docValues; // Cache of DocValues
67+
68+
/**
69+
* Creates a new "translog" snapshot from Lucene for reading operations whose seq# in the specified range.
70+
*
71+
* @param engineSearcher the internal engine searcher which will be taken over if the snapshot is opened successfully
72+
* @param mapperService the mapper service which will be mainly used to resolve the document's type and uid
73+
* @param fromSeqNo the min requesting seq# - inclusive
74+
* @param toSeqNo the maximum requesting seq# - inclusive
75+
* @param requiredFullRange if true, the snapshot will strictly check for the existence of operations between fromSeqNo and toSeqNo
76+
*/
77+
LuceneChangesSnapshot(Engine.Searcher engineSearcher, MapperService mapperService,
78+
long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
79+
if (fromSeqNo < 0 || toSeqNo < 0 || fromSeqNo > toSeqNo) {
80+
throw new IllegalArgumentException("Invalid range; from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "]");
81+
}
82+
this.mapperService = mapperService;
83+
this.fromSeqNo = fromSeqNo;
84+
this.toSeqNo = toSeqNo;
85+
this.lastSeenSeqNo = fromSeqNo - 1;
86+
this.requiredFullRange = requiredFullRange;
87+
this.indexSearcher = new IndexSearcher(Lucene.wrapAllDocsLive(engineSearcher.getDirectoryReader()));
88+
this.indexSearcher.setQueryCache(null);
89+
this.topDocs = searchOperations(indexSearcher);
90+
final List<LeafReaderContext> leaves = indexSearcher.getIndexReader().leaves();
91+
this.docValues = new CombinedDocValues[leaves.size()];
92+
for (LeafReaderContext leaf : leaves) {
93+
this.docValues[leaf.ord] = new CombinedDocValues(leaf.reader());
94+
}
95+
this.onClose = engineSearcher;
96+
}
97+
98+
@Override
99+
public void close() throws IOException {
100+
onClose.close();
101+
}
102+
103+
@Override
104+
public int totalOperations() {
105+
return Math.toIntExact(topDocs.totalHits);
106+
}
107+
108+
@Override
109+
public int overriddenOperations() {
110+
return skippedOperations;
111+
}
112+
113+
@Override
114+
public Translog.Operation next() throws IOException {
115+
Translog.Operation op = null;
116+
for (int docId = nextDocId(); docId != DocIdSetIterator.NO_MORE_DOCS; docId = nextDocId()) {
117+
op = readDocAsOp(docId);
118+
if (op != null) {
119+
break;
120+
}
121+
}
122+
if (requiredFullRange) {
123+
rangeCheck(op);
124+
}
125+
if (op != null) {
126+
lastSeenSeqNo = op.seqNo();
127+
}
128+
return op;
129+
}
130+
131+
private void rangeCheck(Translog.Operation op) {
132+
if (op == null) {
133+
if (lastSeenSeqNo < toSeqNo) {
134+
throw new IllegalStateException("Not all operations between min_seqno [" + fromSeqNo + "] " +
135+
"and max_seqno [" + toSeqNo + "] found; prematurely terminated last_seen_seqno [" + lastSeenSeqNo + "]");
136+
}
137+
} else {
138+
final long expectedSeqNo = lastSeenSeqNo + 1;
139+
if (op.seqNo() != expectedSeqNo) {
140+
throw new IllegalStateException("Not all operations between min_seqno [" + fromSeqNo + "] " +
141+
"and max_seqno [" + toSeqNo + "] found; expected seqno [" + expectedSeqNo + "]; found [" + op + "]");
142+
}
143+
}
144+
}
145+
146+
private int nextDocId() {
147+
if (docIndex < topDocs.scoreDocs.length) {
148+
final int docId = topDocs.scoreDocs[docIndex].doc;
149+
docIndex++;
150+
return docId;
151+
} else {
152+
return DocIdSetIterator.NO_MORE_DOCS;
153+
}
154+
}
155+
156+
private TopDocs searchOperations(IndexSearcher searcher) throws IOException {
157+
final Query rangeQuery = LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, fromSeqNo, toSeqNo);
158+
final Sort sortedBySeqNoThenByTerm = new Sort(
159+
new SortedNumericSortField(SeqNoFieldMapper.NAME, SortField.Type.LONG),
160+
new SortedNumericSortField(SeqNoFieldMapper.PRIMARY_TERM_NAME, SortField.Type.LONG, true)
161+
);
162+
return searcher.search(rangeQuery, Integer.MAX_VALUE, sortedBySeqNoThenByTerm);
163+
}
164+
165+
private Translog.Operation readDocAsOp(int docID) throws IOException {
166+
final List<LeafReaderContext> leaves = indexSearcher.getIndexReader().leaves();
167+
final LeafReaderContext leaf = leaves.get(ReaderUtil.subIndex(docID, leaves));
168+
final int segmentDocID = docID - leaf.docBase;
169+
final long primaryTerm = docValues[leaf.ord].docPrimaryTerm(segmentDocID);
170+
// We don't have to read the nested child documents - those docs don't have primary terms.
171+
if (primaryTerm == -1) {
172+
skippedOperations++;
173+
return null;
174+
}
175+
final long seqNo = docValues[leaf.ord].docSeqNo(segmentDocID);
176+
// Only pick the first seen seq#
177+
if (seqNo == lastSeenSeqNo) {
178+
skippedOperations++;
179+
return null;
180+
}
181+
final long version = docValues[leaf.ord].docVersion(segmentDocID);
182+
final FieldsVisitor fields = new FieldsVisitor(true);
183+
indexSearcher.doc(docID, fields);
184+
fields.postProcess(mapperService);
185+
186+
final Translog.Operation op;
187+
final boolean isTombstone = docValues[leaf.ord].isTombstone(segmentDocID);
188+
if (isTombstone && fields.uid() == null) {
189+
op = new Translog.NoOp(seqNo, primaryTerm, ""); // TODO: store reason in ignored fields?
190+
assert version == 1L : "Noop tombstone should have version 1L; actual version [" + version + "]";
191+
assert assertDocSoftDeleted(leaf.reader(), segmentDocID) : "Noop but soft_deletes field is not set [" + op + "]";
192+
} else {
193+
final String id = fields.uid().id();
194+
final String type = fields.uid().type();
195+
final Term uid = new Term(IdFieldMapper.NAME, Uid.encodeId(id));
196+
if (isTombstone) {
197+
op = new Translog.Delete(type, id, uid, seqNo, primaryTerm, version, VersionType.INTERNAL);
198+
assert assertDocSoftDeleted(leaf.reader(), segmentDocID) : "Delete op but soft_deletes field is not set [" + op + "]";
199+
} else {
200+
final BytesReference source = fields.source();
201+
// TODO: pass the latest timestamp from engine.
202+
final long autoGeneratedIdTimestamp = -1;
203+
op = new Translog.Index(type, id, seqNo, primaryTerm, version, VersionType.INTERNAL,
204+
source.toBytesRef().bytes, fields.routing(), null, autoGeneratedIdTimestamp);
205+
}
206+
}
207+
assert fromSeqNo <= op.seqNo() && op.seqNo() <= toSeqNo && lastSeenSeqNo < op.seqNo() : "Unexpected operation; " +
208+
"last_seen_seqno [" + lastSeenSeqNo + "], from_seqno [" + fromSeqNo + "], to_seqno [" + toSeqNo + "], op [" + op + "]";
209+
return op;
210+
}
211+
212+
private boolean assertDocSoftDeleted(LeafReader leafReader, int segmentDocId) throws IOException {
213+
final NumericDocValues ndv = leafReader.getNumericDocValues(Lucene.SOFT_DELETE_FIELD);
214+
if (ndv == null || ndv.advanceExact(segmentDocId) == false) {
215+
throw new IllegalStateException("DocValues for field [" + Lucene.SOFT_DELETE_FIELD + "] is not found");
216+
}
217+
return ndv.longValue() == 1;
218+
}
219+
220+
private static final class CombinedDocValues {
221+
private final LeafReader leafReader;
222+
private NumericDocValues versionDV;
223+
private NumericDocValues seqNoDV;
224+
private NumericDocValues primaryTermDV;
225+
private NumericDocValues tombstoneDV;
226+
227+
CombinedDocValues(LeafReader leafReader) throws IOException {
228+
this.leafReader = leafReader;
229+
this.versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing");
230+
this.seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing");
231+
this.primaryTermDV = Objects.requireNonNull(
232+
leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME), "PrimaryTermDV is missing");
233+
this.tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME);
234+
}
235+
236+
long docVersion(int segmentDocId) throws IOException {
237+
if (versionDV.docID() > segmentDocId) {
238+
versionDV = Objects.requireNonNull(leafReader.getNumericDocValues(VersionFieldMapper.NAME), "VersionDV is missing");
239+
}
240+
if (versionDV.advanceExact(segmentDocId) == false) {
241+
throw new IllegalStateException("DocValues for field [" + VersionFieldMapper.NAME + "] is not found");
242+
}
243+
return versionDV.longValue();
244+
}
245+
246+
long docSeqNo(int segmentDocId) throws IOException {
247+
if (seqNoDV.docID() > segmentDocId) {
248+
seqNoDV = Objects.requireNonNull(leafReader.getNumericDocValues(SeqNoFieldMapper.NAME), "SeqNoDV is missing");
249+
}
250+
if (seqNoDV.advanceExact(segmentDocId) == false) {
251+
throw new IllegalStateException("DocValues for field [" + SeqNoFieldMapper.NAME + "] is not found");
252+
}
253+
return seqNoDV.longValue();
254+
}
255+
256+
long docPrimaryTerm(int segmentDocId) throws IOException {
257+
if (primaryTermDV == null) {
258+
return -1L;
259+
}
260+
if (primaryTermDV.docID() > segmentDocId) {
261+
primaryTermDV = leafReader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
262+
}
263+
// Use -1 for docs which don't have primary term. The caller considers those docs as nested docs.
264+
if (primaryTermDV.advanceExact(segmentDocId) == false) {
265+
return -1;
266+
}
267+
return primaryTermDV.longValue();
268+
}
269+
270+
boolean isTombstone(int segmentDocId) throws IOException {
271+
if (tombstoneDV == null) {
272+
return false;
273+
}
274+
if (tombstoneDV.docID() > segmentDocId) {
275+
tombstoneDV = leafReader.getNumericDocValues(SeqNoFieldMapper.TOMBSTONE_NAME);
276+
}
277+
return tombstoneDV.advanceExact(segmentDocId) && tombstoneDV.longValue() > 0;
278+
}
279+
}
280+
}

0 commit comments

Comments
 (0)