Skip to content

Commit f6b5d7e

Browse files
authored
Add sequence numbers based optimistic concurrency control support to Engine (#36467)
This commit add support to engine operations for resolving and verifying the sequence number and primary term of the last modification to a document before performing an operation. This is infrastructure to move our (optimistic concurrency control)[http://en.wikipedia.org/wiki/Optimistic_concurrency_control] API to use sequence numbers instead of internal versioning. Relates #36148 Relates #10708
1 parent cd1bec3 commit f6b5d7e

File tree

16 files changed

+340
-225
lines changed

16 files changed

+340
-225
lines changed

server/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ final class PerThreadIDVersionAndSeqNoLookup {
9494
* using the same cache key. Otherwise we'd have to disable caching
9595
* entirely for these readers.
9696
*/
97-
public DocIdAndVersion lookupVersion(BytesRef id, LeafReaderContext context)
97+
public DocIdAndVersion lookupVersion(BytesRef id, boolean loadSeqNo, LeafReaderContext context)
9898
throws IOException {
9999
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
100100
"context's reader is not the same as the reader class was initialized on.";
@@ -108,7 +108,28 @@ public DocIdAndVersion lookupVersion(BytesRef id, LeafReaderContext context)
108108
if (versions.advanceExact(docID) == false) {
109109
throw new IllegalArgumentException("Document [" + docID + "] misses the [" + VersionFieldMapper.NAME + "] field");
110110
}
111-
return new DocIdAndVersion(docID, versions.longValue(), context.reader(), context.docBase);
111+
final long seqNo;
112+
final long term;
113+
if (loadSeqNo) {
114+
NumericDocValues seqNos = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
115+
// remove the null check in 7.0 once we can't read indices with no seq#
116+
if (seqNos != null && seqNos.advanceExact(docID)) {
117+
seqNo = seqNos.longValue();
118+
} else {
119+
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
120+
}
121+
NumericDocValues terms = context.reader().getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
122+
if (terms != null && terms.advanceExact(docID)) {
123+
term = terms.longValue();
124+
} else {
125+
term = 0;
126+
}
127+
128+
} else {
129+
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO;
130+
term = 0;
131+
}
132+
return new DocIdAndVersion(docID, versions.longValue(), seqNo, term, context.reader(), context.docBase);
112133
} else {
113134
return null;
114135
}
@@ -150,6 +171,7 @@ DocIdAndSeqNo lookupSeqNo(BytesRef id, LeafReaderContext context) throws IOExcep
150171
final NumericDocValues seqNoDV = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
151172
for (; docID != DocIdSetIterator.NO_MORE_DOCS; docID = docsEnum.nextDoc()) {
152173
final long seqNo;
174+
// remove the null check in 7.0 once we can't read indices with no seq#
153175
if (seqNoDV != null && seqNoDV.advanceExact(docID)) {
154176
seqNo = seqNoDV.longValue();
155177
} else {

server/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@
3131
import java.util.Objects;
3232
import java.util.concurrent.ConcurrentMap;
3333

34-
import static org.elasticsearch.common.lucene.uid.Versions.NOT_FOUND;
35-
3634
/** Utility class to resolve the Lucene doc ID, version, seqNo and primaryTerms for a given uid. */
3735
public final class VersionsAndSeqNoResolver {
3836

@@ -96,12 +94,16 @@ private VersionsAndSeqNoResolver() {
9694
public static class DocIdAndVersion {
9795
public final int docId;
9896
public final long version;
97+
public final long seqNo;
98+
public final long primaryTerm;
9999
public final LeafReader reader;
100100
public final int docBase;
101101

102-
public DocIdAndVersion(int docId, long version, LeafReader reader, int docBase) {
102+
public DocIdAndVersion(int docId, long version, long seqNo, long primaryTerm, LeafReader reader, int docBase) {
103103
this.docId = docId;
104104
this.version = version;
105+
this.seqNo = seqNo;
106+
this.primaryTerm = primaryTerm;
105107
this.reader = reader;
106108
this.docBase = docBase;
107109
}
@@ -129,15 +131,15 @@ public static class DocIdAndSeqNo {
129131
* <li>a doc ID and a version otherwise
130132
* </ul>
131133
*/
132-
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException {
134+
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term, boolean loadSeqNo) throws IOException {
133135
PerThreadIDVersionAndSeqNoLookup[] lookups = getLookupState(reader, term.field());
134136
List<LeafReaderContext> leaves = reader.leaves();
135137
// iterate backwards to optimize for the frequently updated documents
136138
// which are likely to be in the last segments
137139
for (int i = leaves.size() - 1; i >= 0; i--) {
138140
final LeafReaderContext leaf = leaves.get(i);
139141
PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord];
140-
DocIdAndVersion result = lookup.lookupVersion(term.bytes(), leaf);
142+
DocIdAndVersion result = lookup.lookupVersion(term.bytes(), loadSeqNo, leaf);
141143
if (result != null) {
142144
return result;
143145
}
@@ -175,15 +177,4 @@ public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) thr
175177
}
176178
return latest;
177179
}
178-
179-
/**
180-
* Load the version for the uid from the reader, returning<ul>
181-
* <li>{@link Versions#NOT_FOUND} if no matching doc exists,
182-
* <li>the version associated with the provided uid otherwise
183-
* </ul>
184-
*/
185-
public static long loadVersion(IndexReader reader, Term term) throws IOException {
186-
final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term);
187-
return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version;
188-
}
189180
}

server/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -606,7 +606,7 @@ protected final GetResult getFromSearcher(Get get, BiFunction<String, SearcherSc
606606
final Searcher searcher = searcherFactory.apply("get", scope);
607607
final DocIdAndVersion docIdAndVersion;
608608
try {
609-
docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.reader(), get.uid());
609+
docIdAndVersion = VersionsAndSeqNoResolver.loadDocIdAndVersion(searcher.reader(), get.uid(), true);
610610
} catch (Exception e) {
611611
Releasables.closeWhileHandlingException(searcher);
612612
//TODO: A better exception goes here
@@ -1345,14 +1345,23 @@ public static class Index extends Operation {
13451345
private final ParsedDocument doc;
13461346
private final long autoGeneratedIdTimestamp;
13471347
private final boolean isRetry;
1348+
private final long ifSeqNoMatch;
1349+
private final long ifPrimaryTermMatch;
13481350

13491351
public Index(Term uid, ParsedDocument doc, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin,
1350-
long startTime, long autoGeneratedIdTimestamp, boolean isRetry) {
1352+
long startTime, long autoGeneratedIdTimestamp, boolean isRetry, long ifSeqNoMatch, long ifPrimaryTermMatch) {
13511353
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
13521354
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
1355+
assert ifPrimaryTermMatch >= 0 : "ifPrimaryTermMatch [" + ifPrimaryTermMatch + "] must be non negative";
1356+
assert ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO || ifSeqNoMatch >=0 :
1357+
"ifSeqNoMatch [" + ifSeqNoMatch + "] must be non negative or unset";
1358+
assert (origin == Origin.PRIMARY) || (ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO && ifPrimaryTermMatch == 0) :
1359+
"cas operations are only allowed if origin is primary. get [" + origin + "]";
13531360
this.doc = doc;
13541361
this.isRetry = isRetry;
13551362
this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
1363+
this.ifSeqNoMatch = ifSeqNoMatch;
1364+
this.ifPrimaryTermMatch = ifPrimaryTermMatch;
13561365
}
13571366

13581367
public Index(Term uid, long primaryTerm, ParsedDocument doc) {
@@ -1361,7 +1370,7 @@ public Index(Term uid, long primaryTerm, ParsedDocument doc) {
13611370

13621371
Index(Term uid, long primaryTerm, ParsedDocument doc, long version) {
13631372
this(uid, doc, SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, version, VersionType.INTERNAL,
1364-
Origin.PRIMARY, System.nanoTime(), -1, false);
1373+
Origin.PRIMARY, System.nanoTime(), -1, false, SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
13651374
} // TEST ONLY
13661375

13671376
public ParsedDocument parsedDoc() {
@@ -1417,29 +1426,45 @@ public boolean isRetry() {
14171426
return isRetry;
14181427
}
14191428

1429+
public long getIfSeqNoMatch() {
1430+
return ifSeqNoMatch;
1431+
}
1432+
1433+
public long getIfPrimaryTermMatch() {
1434+
return ifPrimaryTermMatch;
1435+
}
14201436
}
14211437

14221438
public static class Delete extends Operation {
14231439

14241440
private final String type;
14251441
private final String id;
1442+
private final long ifSeqNoMatch;
1443+
private final long ifPrimaryTermMatch;
14261444

14271445
public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType,
1428-
Origin origin, long startTime) {
1446+
Origin origin, long startTime, long ifSeqNoMatch, long ifPrimaryTermMatch) {
14291447
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
14301448
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
1449+
assert ifPrimaryTermMatch >= 0 : "ifPrimaryTermMatch [" + ifPrimaryTermMatch + "] must be non negative";
1450+
assert ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO || ifSeqNoMatch >=0 :
1451+
"ifSeqNoMatch [" + ifSeqNoMatch + "] must be non negative or unset";
1452+
assert (origin == Origin.PRIMARY) || (ifSeqNoMatch == SequenceNumbers.UNASSIGNED_SEQ_NO && ifPrimaryTermMatch == 0) :
1453+
"cas operations are only allowed if origin is primary. get [" + origin + "]";
14311454
this.type = Objects.requireNonNull(type);
14321455
this.id = Objects.requireNonNull(id);
1456+
this.ifSeqNoMatch = ifSeqNoMatch;
1457+
this.ifPrimaryTermMatch = ifPrimaryTermMatch;
14331458
}
14341459

14351460
public Delete(String type, String id, Term uid, long primaryTerm) {
14361461
this(type, id, uid, SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm, Versions.MATCH_ANY, VersionType.INTERNAL,
1437-
Origin.PRIMARY, System.nanoTime());
1462+
Origin.PRIMARY, System.nanoTime(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
14381463
}
14391464

14401465
public Delete(Delete template, VersionType versionType) {
14411466
this(template.type(), template.id(), template.uid(), template.seqNo(), template.primaryTerm(), template.version(),
1442-
versionType, template.origin(), template.startTime());
1467+
versionType, template.origin(), template.startTime(), SequenceNumbers.UNASSIGNED_SEQ_NO, 0);
14431468
}
14441469

14451470
@Override
@@ -1462,6 +1487,13 @@ public int estimatedSizeInBytes() {
14621487
return (uid().field().length() + uid().text().length()) * 2 + 20;
14631488
}
14641489

1490+
public long getIfSeqNoMatch() {
1491+
return ifSeqNoMatch;
1492+
}
1493+
1494+
public long getIfPrimaryTermMatch() {
1495+
return ifPrimaryTermMatch;
1496+
}
14651497
}
14661498

14671499
public static class NoOp extends Operation {

0 commit comments

Comments
 (0)