Skip to content

Commit d167eaf

Browse files
dnhatntalevy
authored andcommitted
Fix InternalEngineTests#assertOpsOnPrimary (elastic#37746)
The assertion `assertOpsOnPrimary` does not store seq_no and primary term of successful deletes to the `lastOpSeqNo` and `lastOpTerm`. This leads to failures of the subsequence CAS deletes or indexes with seq_no and term. Moreover, this assertion trips a translog assertion because it bumps the primary term of some operations but not the primary term of the engine. Relates elastic#36467 Closes elastic#37684
1 parent 22ccd7a commit d167eaf

File tree

2 files changed

+45
-12
lines changed

2 files changed

+45
-12
lines changed

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1811,10 +1811,13 @@ public void testVersionOnPrimaryWithConcurrentRefresh() throws Exception {
18111811
}
18121812
});
18131813
refreshThread.start();
1814-
latch.await();
1815-
assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine);
1816-
running.set(false);
1817-
refreshThread.join();
1814+
try {
1815+
latch.await();
1816+
assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine);
1817+
} finally {
1818+
running.set(false);
1819+
refreshThread.join();
1820+
}
18181821
}
18191822

18201823
private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion, boolean docDeleted, InternalEngine engine)
@@ -1824,7 +1827,7 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
18241827
long lastOpVersion = currentOpVersion;
18251828
long lastOpSeqNo = UNASSIGNED_SEQ_NO;
18261829
long lastOpTerm = UNASSIGNED_PRIMARY_TERM;
1827-
final AtomicLong currentTerm = new AtomicLong(1);
1830+
PrimaryTermSupplier currentTerm = (PrimaryTermSupplier) engine.engineConfig.getPrimaryTermSupplier();
18281831
BiFunction<Long, Engine.Index, Engine.Index> indexWithVersion = (version, index) -> new Engine.Index(index.uid(), index.parsedDoc(),
18291832
UNASSIGNED_SEQ_NO, currentTerm.get(), version, index.versionType(), index.origin(), index.startTime(),
18301833
index.getAutoGeneratedIdTimestamp(), index.isRetry(), UNASSIGNED_SEQ_NO, 0);
@@ -1837,6 +1840,12 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
18371840
TriFunction<Long, Long, Engine.Delete, Engine.Delete> delWithSeq = (seqNo, term, delete) -> new Engine.Delete(delete.type(),
18381841
delete.id(), delete.uid(), UNASSIGNED_SEQ_NO, currentTerm.get(), delete.version(), delete.versionType(), delete.origin(),
18391842
delete.startTime(), seqNo, term);
1843+
Function<Engine.Index, Engine.Index> indexWithCurrentTerm = index -> new Engine.Index(index.uid(),
1844+
index.parsedDoc(), UNASSIGNED_SEQ_NO, currentTerm.get(), index.version(), index.versionType(), index.origin(),
1845+
index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry(), index.getIfSeqNo(), index.getIfPrimaryTerm());
1846+
Function<Engine.Delete, Engine.Delete> deleteWithCurrentTerm = delete -> new Engine.Delete(delete.type(),
1847+
delete.id(), delete.uid(), UNASSIGNED_SEQ_NO, currentTerm.get(), delete.version(), delete.versionType(), delete.origin(),
1848+
delete.startTime(), delete.getIfSeqNo(), delete.getIfPrimaryTerm());
18401849
for (Engine.Operation op : ops) {
18411850
final boolean versionConflict = rarely();
18421851
final boolean versionedOp = versionConflict || randomBoolean();
@@ -1848,7 +1857,8 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
18481857
lastOpSeqNo;
18491858
final long conflictingTerm = conflictingSeqNo == lastOpSeqNo || randomBoolean() ? lastOpTerm + 1 : lastOpTerm;
18501859
if (rarely()) {
1851-
currentTerm.incrementAndGet();
1860+
currentTerm.set(currentTerm.get() + 1L);
1861+
engine.rollTranslogGeneration();
18521862
}
18531863
final long correctVersion = docDeleted && randomBoolean() ? Versions.MATCH_DELETED : lastOpVersion;
18541864
logger.info("performing [{}]{}{}",
@@ -1879,7 +1889,7 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
18791889
result = engine.index(indexWithVersion.apply(correctVersion, index));
18801890
}
18811891
} else {
1882-
result = engine.index(index);
1892+
result = engine.index(indexWithCurrentTerm.apply(index));
18831893
}
18841894
assertThat(result.isCreated(), equalTo(docDeleted));
18851895
assertThat(result.getVersion(), equalTo(Math.max(lastOpVersion + 1, 1)));
@@ -1913,16 +1923,16 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
19131923
} else if (versionedOp) {
19141924
result = engine.delete(delWithVersion.apply(correctVersion, delete));
19151925
} else {
1916-
result = engine.delete(delete);
1926+
result = engine.delete(deleteWithCurrentTerm.apply(delete));
19171927
}
19181928
assertThat(result.isFound(), equalTo(docDeleted == false));
19191929
assertThat(result.getVersion(), equalTo(Math.max(lastOpVersion + 1, 1)));
19201930
assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS));
19211931
assertThat(result.getFailure(), nullValue());
19221932
docDeleted = true;
19231933
lastOpVersion = result.getVersion();
1924-
lastOpSeqNo = UNASSIGNED_SEQ_NO;
1925-
lastOpTerm = UNASSIGNED_PRIMARY_TERM;
1934+
lastOpSeqNo = result.getSeqNo();
1935+
lastOpTerm = result.getTerm();
19261936
opsPerformed++;
19271937
}
19281938
}
@@ -1950,6 +1960,8 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
19501960
engine.clearDeletedTombstones();
19511961
if (docDeleted) {
19521962
lastOpVersion = Versions.NOT_FOUND;
1963+
lastOpSeqNo = UNASSIGNED_SEQ_NO;
1964+
lastOpTerm = UNASSIGNED_PRIMARY_TERM;
19531965
}
19541966
}
19551967
}

test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ public abstract class EngineTestCase extends ESTestCase {
146146
protected Path primaryTranslogDir;
147147
protected Path replicaTranslogDir;
148148
// A default primary term is used by engine instances created in this test.
149-
protected AtomicLong primaryTerm = new AtomicLong();
149+
protected final PrimaryTermSupplier primaryTerm = new PrimaryTermSupplier(0L);
150150

151151
protected static void assertVisibleCount(Engine engine, int numDocs) throws IOException {
152152
assertVisibleCount(engine, numDocs, true);
@@ -601,7 +601,7 @@ public void onFailedEngine(String reason, @Nullable Exception e) {
601601
breakerService,
602602
globalCheckpointSupplier == null ?
603603
new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {}) :
604-
globalCheckpointSupplier, primaryTerm::get, tombstoneDocSupplier());
604+
globalCheckpointSupplier, primaryTerm, tombstoneDocSupplier());
605605
return config;
606606
}
607607

@@ -1013,4 +1013,25 @@ public static Translog getTranslog(Engine engine) {
10131013
InternalEngine internalEngine = (InternalEngine) engine;
10141014
return internalEngine.getTranslog();
10151015
}
1016+
1017+
public static final class PrimaryTermSupplier implements LongSupplier {
1018+
private final AtomicLong term;
1019+
1020+
PrimaryTermSupplier(long initialTerm) {
1021+
this.term = new AtomicLong(initialTerm);
1022+
}
1023+
1024+
public long get() {
1025+
return term.get();
1026+
}
1027+
1028+
public void set(long newTerm) {
1029+
this.term.set(newTerm);
1030+
}
1031+
1032+
@Override
1033+
public long getAsLong() {
1034+
return get();
1035+
}
1036+
}
10161037
}

0 commit comments

Comments
 (0)