Skip to content

Commit c9413d7

Browse files
committed
Fix InternalEngineTests#assertOpsOnPrimary (#37746)
The assertion `assertOpsOnPrimary` does not store seq_no and primary term of successful deletes to the `lastOpSeqNo` and `lastOpTerm`. This leads to failures of the subsequence CAS deletes or indexes with seq_no and term. Moreover, this assertion trips a translog assertion because it bumps the primary term of some operations but not the primary term of the engine. Relates #36467 Closes #37684
1 parent af134b5 commit c9413d7

File tree

2 files changed

+45
-12
lines changed

2 files changed

+45
-12
lines changed

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1813,10 +1813,13 @@ public void testVersionOnPrimaryWithConcurrentRefresh() throws Exception {
18131813
}
18141814
});
18151815
refreshThread.start();
1816-
latch.await();
1817-
assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine);
1818-
running.set(false);
1819-
refreshThread.join();
1816+
try {
1817+
latch.await();
1818+
assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine);
1819+
} finally {
1820+
running.set(false);
1821+
refreshThread.join();
1822+
}
18201823
}
18211824

18221825
private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion, boolean docDeleted, InternalEngine engine)
@@ -1826,7 +1829,7 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
18261829
long lastOpVersion = currentOpVersion;
18271830
long lastOpSeqNo = UNASSIGNED_SEQ_NO;
18281831
long lastOpTerm = UNASSIGNED_PRIMARY_TERM;
1829-
final AtomicLong currentTerm = new AtomicLong(1);
1832+
PrimaryTermSupplier currentTerm = (PrimaryTermSupplier) engine.engineConfig.getPrimaryTermSupplier();
18301833
BiFunction<Long, Engine.Index, Engine.Index> indexWithVersion = (version, index) -> new Engine.Index(index.uid(), index.parsedDoc(),
18311834
UNASSIGNED_SEQ_NO, currentTerm.get(), version, index.versionType(), index.origin(), index.startTime(),
18321835
index.getAutoGeneratedIdTimestamp(), index.isRetry(), UNASSIGNED_SEQ_NO, 0);
@@ -1839,6 +1842,12 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
18391842
TriFunction<Long, Long, Engine.Delete, Engine.Delete> delWithSeq = (seqNo, term, delete) -> new Engine.Delete(delete.type(),
18401843
delete.id(), delete.uid(), UNASSIGNED_SEQ_NO, currentTerm.get(), delete.version(), delete.versionType(), delete.origin(),
18411844
delete.startTime(), seqNo, term);
1845+
Function<Engine.Index, Engine.Index> indexWithCurrentTerm = index -> new Engine.Index(index.uid(),
1846+
index.parsedDoc(), UNASSIGNED_SEQ_NO, currentTerm.get(), index.version(), index.versionType(), index.origin(),
1847+
index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry(), index.getIfSeqNo(), index.getIfPrimaryTerm());
1848+
Function<Engine.Delete, Engine.Delete> deleteWithCurrentTerm = delete -> new Engine.Delete(delete.type(),
1849+
delete.id(), delete.uid(), UNASSIGNED_SEQ_NO, currentTerm.get(), delete.version(), delete.versionType(), delete.origin(),
1850+
delete.startTime(), delete.getIfSeqNo(), delete.getIfPrimaryTerm());
18421851
for (Engine.Operation op : ops) {
18431852
final boolean versionConflict = rarely();
18441853
final boolean versionedOp = versionConflict || randomBoolean();
@@ -1850,7 +1859,8 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
18501859
lastOpSeqNo;
18511860
final long conflictingTerm = conflictingSeqNo == lastOpSeqNo || randomBoolean() ? lastOpTerm + 1 : lastOpTerm;
18521861
if (rarely()) {
1853-
currentTerm.incrementAndGet();
1862+
currentTerm.set(currentTerm.get() + 1L);
1863+
engine.rollTranslogGeneration();
18541864
}
18551865
final long correctVersion = docDeleted && randomBoolean() ? Versions.MATCH_DELETED : lastOpVersion;
18561866
logger.info("performing [{}]{}{}",
@@ -1881,7 +1891,7 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
18811891
result = engine.index(indexWithVersion.apply(correctVersion, index));
18821892
}
18831893
} else {
1884-
result = engine.index(index);
1894+
result = engine.index(indexWithCurrentTerm.apply(index));
18851895
}
18861896
assertThat(result.isCreated(), equalTo(docDeleted));
18871897
assertThat(result.getVersion(), equalTo(Math.max(lastOpVersion + 1, 1)));
@@ -1915,16 +1925,16 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
19151925
} else if (versionedOp) {
19161926
result = engine.delete(delWithVersion.apply(correctVersion, delete));
19171927
} else {
1918-
result = engine.delete(delete);
1928+
result = engine.delete(deleteWithCurrentTerm.apply(delete));
19191929
}
19201930
assertThat(result.isFound(), equalTo(docDeleted == false));
19211931
assertThat(result.getVersion(), equalTo(Math.max(lastOpVersion + 1, 1)));
19221932
assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS));
19231933
assertThat(result.getFailure(), nullValue());
19241934
docDeleted = true;
19251935
lastOpVersion = result.getVersion();
1926-
lastOpSeqNo = UNASSIGNED_SEQ_NO;
1927-
lastOpTerm = UNASSIGNED_PRIMARY_TERM;
1936+
lastOpSeqNo = result.getSeqNo();
1937+
lastOpTerm = result.getTerm();
19281938
opsPerformed++;
19291939
}
19301940
}
@@ -1952,6 +1962,8 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
19521962
engine.clearDeletedTombstones();
19531963
if (docDeleted) {
19541964
lastOpVersion = Versions.NOT_FOUND;
1965+
lastOpSeqNo = UNASSIGNED_SEQ_NO;
1966+
lastOpTerm = UNASSIGNED_PRIMARY_TERM;
19551967
}
19561968
}
19571969
}

test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public abstract class EngineTestCase extends ESTestCase {
149149
protected Path primaryTranslogDir;
150150
protected Path replicaTranslogDir;
151151
// A default primary term is used by engine instances created in this test.
152-
protected AtomicLong primaryTerm = new AtomicLong();
152+
protected final PrimaryTermSupplier primaryTerm = new PrimaryTermSupplier(0L);
153153

154154
protected static void assertVisibleCount(Engine engine, int numDocs) throws IOException {
155155
assertVisibleCount(engine, numDocs, true);
@@ -688,7 +688,7 @@ public EngineConfig config(
688688
breakerService,
689689
globalCheckpointSupplier,
690690
retentionLeasesSupplier,
691-
primaryTerm::get,
691+
primaryTerm,
692692
tombstoneDocSupplier());
693693
}
694694

@@ -1100,4 +1100,25 @@ public static Translog getTranslog(Engine engine) {
11001100
InternalEngine internalEngine = (InternalEngine) engine;
11011101
return internalEngine.getTranslog();
11021102
}
1103+
1104+
public static final class PrimaryTermSupplier implements LongSupplier {
1105+
private final AtomicLong term;
1106+
1107+
PrimaryTermSupplier(long initialTerm) {
1108+
this.term = new AtomicLong(initialTerm);
1109+
}
1110+
1111+
public long get() {
1112+
return term.get();
1113+
}
1114+
1115+
public void set(long newTerm) {
1116+
this.term.set(newTerm);
1117+
}
1118+
1119+
@Override
1120+
public long getAsLong() {
1121+
return get();
1122+
}
1123+
}
11031124
}

0 commit comments

Comments
 (0)