Skip to content

Commit 86f868d

Browse files
committed
Fix InternalEngineTests#assertOpsOnPrimary
The assertion `assertOpsOnPrimary` does not store seq_no and primary term of successful deletes to the `lastOpSeqNo` and `lastOpTerm`. This leads to failures of the subsequence CAS deletes or indexes with seq_no and term. Moreover, this assertion trips a translog assertion because it bumps the primary term of some operations but not the primary term of the engine. Relates elastic#36467 Closes elastic#37684
1 parent 3f27233 commit 86f868d

File tree

2 files changed

+45
-12
lines changed

2 files changed

+45
-12
lines changed

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1792,10 +1792,13 @@ public void testVersionOnPrimaryWithConcurrentRefresh() throws Exception {
17921792
}
17931793
});
17941794
refreshThread.start();
1795-
latch.await();
1796-
assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine);
1797-
running.set(false);
1798-
refreshThread.join();
1795+
try {
1796+
latch.await();
1797+
assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine);
1798+
} finally {
1799+
running.set(false);
1800+
refreshThread.join();
1801+
}
17991802
}
18001803

18011804
private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion, boolean docDeleted, InternalEngine engine)
@@ -1805,7 +1808,7 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
18051808
long lastOpVersion = currentOpVersion;
18061809
long lastOpSeqNo = UNASSIGNED_SEQ_NO;
18071810
long lastOpTerm = UNASSIGNED_PRIMARY_TERM;
1808-
final AtomicLong currentTerm = new AtomicLong(1);
1811+
PrimaryTermSupplier currentTerm = (PrimaryTermSupplier) engine.engineConfig.getPrimaryTermSupplier();
18091812
BiFunction<Long, Engine.Index, Engine.Index> indexWithVersion = (version, index) -> new Engine.Index(index.uid(), index.parsedDoc(),
18101813
UNASSIGNED_SEQ_NO, currentTerm.get(), version, index.versionType(), index.origin(), index.startTime(),
18111814
index.getAutoGeneratedIdTimestamp(), index.isRetry(), UNASSIGNED_SEQ_NO, 0);
@@ -1818,6 +1821,12 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
18181821
TriFunction<Long, Long, Engine.Delete, Engine.Delete> delWithSeq = (seqNo, term, delete) -> new Engine.Delete(delete.type(),
18191822
delete.id(), delete.uid(), UNASSIGNED_SEQ_NO, currentTerm.get(), delete.version(), delete.versionType(), delete.origin(),
18201823
delete.startTime(), seqNo, term);
1824+
Function<Engine.Index, Engine.Index> indexWithCurrentTerm = index -> new Engine.Index(index.uid(),
1825+
index.parsedDoc(), UNASSIGNED_SEQ_NO, currentTerm.get(), index.version(), index.versionType(), index.origin(),
1826+
index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry(), index.getIfSeqNo(), index.getIfPrimaryTerm());
1827+
Function<Engine.Delete, Engine.Delete> deleteWithCurrentTerm = delete -> new Engine.Delete(delete.type(),
1828+
delete.id(), delete.uid(), UNASSIGNED_SEQ_NO, currentTerm.get(), delete.version(), delete.versionType(), delete.origin(),
1829+
delete.startTime(), delete.getIfSeqNo(), delete.getIfPrimaryTerm());
18211830
for (Engine.Operation op : ops) {
18221831
final boolean versionConflict = rarely();
18231832
final boolean versionedOp = versionConflict || randomBoolean();
@@ -1829,7 +1838,8 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
18291838
lastOpSeqNo;
18301839
final long conflictingTerm = conflictingSeqNo == lastOpSeqNo || randomBoolean() ? lastOpTerm + 1 : lastOpTerm;
18311840
if (rarely()) {
1832-
currentTerm.incrementAndGet();
1841+
currentTerm.set(currentTerm.get() + 1L);
1842+
engine.rollTranslogGeneration();
18331843
}
18341844
final long correctVersion = docDeleted && randomBoolean() ? Versions.MATCH_DELETED : lastOpVersion;
18351845
logger.info("performing [{}]{}{}",
@@ -1860,7 +1870,7 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
18601870
result = engine.index(indexWithVersion.apply(correctVersion, index));
18611871
}
18621872
} else {
1863-
result = engine.index(index);
1873+
result = engine.index(indexWithCurrentTerm.apply(index));
18641874
}
18651875
assertThat(result.isCreated(), equalTo(docDeleted));
18661876
assertThat(result.getVersion(), equalTo(Math.max(lastOpVersion + 1, 1)));
@@ -1894,16 +1904,16 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
18941904
} else if (versionedOp) {
18951905
result = engine.delete(delWithVersion.apply(correctVersion, delete));
18961906
} else {
1897-
result = engine.delete(delete);
1907+
result = engine.delete(deleteWithCurrentTerm.apply(delete));
18981908
}
18991909
assertThat(result.isFound(), equalTo(docDeleted == false));
19001910
assertThat(result.getVersion(), equalTo(Math.max(lastOpVersion + 1, 1)));
19011911
assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS));
19021912
assertThat(result.getFailure(), nullValue());
19031913
docDeleted = true;
19041914
lastOpVersion = result.getVersion();
1905-
lastOpSeqNo = UNASSIGNED_SEQ_NO;
1906-
lastOpTerm = UNASSIGNED_PRIMARY_TERM;
1915+
lastOpSeqNo = result.getSeqNo();
1916+
lastOpTerm = result.getTerm();
19071917
opsPerformed++;
19081918
}
19091919
}
@@ -1931,6 +1941,8 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
19311941
engine.clearDeletedTombstones();
19321942
if (docDeleted) {
19331943
lastOpVersion = Versions.NOT_FOUND;
1944+
lastOpSeqNo = UNASSIGNED_SEQ_NO;
1945+
lastOpTerm = UNASSIGNED_PRIMARY_TERM;
19341946
}
19351947
}
19361948
}

test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public abstract class EngineTestCase extends ESTestCase {
149149
protected Path primaryTranslogDir;
150150
protected Path replicaTranslogDir;
151151
// A default primary term is used by engine instances created in this test.
152-
protected AtomicLong primaryTerm = new AtomicLong();
152+
protected final PrimaryTermSupplier primaryTerm = new PrimaryTermSupplier(0L);
153153

154154
protected static void assertVisibleCount(Engine engine, int numDocs) throws IOException {
155155
assertVisibleCount(engine, numDocs, true);
@@ -682,7 +682,7 @@ public EngineConfig config(
682682
breakerService,
683683
globalCheckpointSupplier,
684684
retentionLeasesSupplier,
685-
primaryTerm::get,
685+
primaryTerm,
686686
tombstoneDocSupplier());
687687
}
688688

@@ -1081,4 +1081,25 @@ public static Translog getTranslog(Engine engine) {
10811081
InternalEngine internalEngine = (InternalEngine) engine;
10821082
return internalEngine.getTranslog();
10831083
}
1084+
1085+
public static final class PrimaryTermSupplier implements LongSupplier {
1086+
private final AtomicLong term;
1087+
1088+
PrimaryTermSupplier(long initialTerm) {
1089+
this.term = new AtomicLong(initialTerm);
1090+
}
1091+
1092+
public long get() {
1093+
return term.get();
1094+
}
1095+
1096+
public void set(long newTerm) {
1097+
this.term.set(newTerm);
1098+
}
1099+
1100+
@Override
1101+
public long getAsLong() {
1102+
return get();
1103+
}
1104+
}
10841105
}

0 commit comments

Comments
 (0)