Skip to content

Fix InternalEngineTests#assertOpsOnPrimary #37746

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jan 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1792,10 +1792,13 @@ public void testVersionOnPrimaryWithConcurrentRefresh() throws Exception {
}
});
refreshThread.start();
latch.await();
assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine);
running.set(false);
refreshThread.join();
try {
latch.await();
assertOpsOnPrimary(ops, Versions.NOT_FOUND, true, engine);
} finally {
running.set(false);
refreshThread.join();
}
}

private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion, boolean docDeleted, InternalEngine engine)
Expand All @@ -1805,7 +1808,7 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
long lastOpVersion = currentOpVersion;
long lastOpSeqNo = UNASSIGNED_SEQ_NO;
long lastOpTerm = UNASSIGNED_PRIMARY_TERM;
final AtomicLong currentTerm = new AtomicLong(1);
PrimaryTermSupplier currentTerm = (PrimaryTermSupplier) engine.engineConfig.getPrimaryTermSupplier();
BiFunction<Long, Engine.Index, Engine.Index> indexWithVersion = (version, index) -> new Engine.Index(index.uid(), index.parsedDoc(),
UNASSIGNED_SEQ_NO, currentTerm.get(), version, index.versionType(), index.origin(), index.startTime(),
index.getAutoGeneratedIdTimestamp(), index.isRetry(), UNASSIGNED_SEQ_NO, 0);
Expand All @@ -1818,6 +1821,12 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
TriFunction<Long, Long, Engine.Delete, Engine.Delete> delWithSeq = (seqNo, term, delete) -> new Engine.Delete(delete.type(),
delete.id(), delete.uid(), UNASSIGNED_SEQ_NO, currentTerm.get(), delete.version(), delete.versionType(), delete.origin(),
delete.startTime(), seqNo, term);
Function<Engine.Index, Engine.Index> indexWithCurrentTerm = index -> new Engine.Index(index.uid(),
index.parsedDoc(), UNASSIGNED_SEQ_NO, currentTerm.get(), index.version(), index.versionType(), index.origin(),
index.startTime(), index.getAutoGeneratedIdTimestamp(), index.isRetry(), index.getIfSeqNo(), index.getIfPrimaryTerm());
Function<Engine.Delete, Engine.Delete> deleteWithCurrentTerm = delete -> new Engine.Delete(delete.type(),
delete.id(), delete.uid(), UNASSIGNED_SEQ_NO, currentTerm.get(), delete.version(), delete.versionType(), delete.origin(),
delete.startTime(), delete.getIfSeqNo(), delete.getIfPrimaryTerm());
for (Engine.Operation op : ops) {
final boolean versionConflict = rarely();
final boolean versionedOp = versionConflict || randomBoolean();
Expand All @@ -1829,7 +1838,8 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
lastOpSeqNo;
final long conflictingTerm = conflictingSeqNo == lastOpSeqNo || randomBoolean() ? lastOpTerm + 1 : lastOpTerm;
if (rarely()) {
currentTerm.incrementAndGet();
currentTerm.set(currentTerm.get() + 1L);
engine.rollTranslogGeneration();
}
final long correctVersion = docDeleted && randomBoolean() ? Versions.MATCH_DELETED : lastOpVersion;
logger.info("performing [{}]{}{}",
Expand Down Expand Up @@ -1860,7 +1870,7 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
result = engine.index(indexWithVersion.apply(correctVersion, index));
}
} else {
result = engine.index(index);
result = engine.index(indexWithCurrentTerm.apply(index));
}
assertThat(result.isCreated(), equalTo(docDeleted));
assertThat(result.getVersion(), equalTo(Math.max(lastOpVersion + 1, 1)));
Expand Down Expand Up @@ -1894,16 +1904,16 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
} else if (versionedOp) {
result = engine.delete(delWithVersion.apply(correctVersion, delete));
} else {
result = engine.delete(delete);
result = engine.delete(deleteWithCurrentTerm.apply(delete));
}
assertThat(result.isFound(), equalTo(docDeleted == false));
assertThat(result.getVersion(), equalTo(Math.max(lastOpVersion + 1, 1)));
assertThat(result.getResultType(), equalTo(Engine.Result.Type.SUCCESS));
assertThat(result.getFailure(), nullValue());
docDeleted = true;
lastOpVersion = result.getVersion();
lastOpSeqNo = UNASSIGNED_SEQ_NO;
lastOpTerm = UNASSIGNED_PRIMARY_TERM;
lastOpSeqNo = result.getSeqNo();
lastOpTerm = result.getTerm();
opsPerformed++;
}
}
Expand Down Expand Up @@ -1931,6 +1941,8 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
engine.clearDeletedTombstones();
if (docDeleted) {
lastOpVersion = Versions.NOT_FOUND;
lastOpSeqNo = UNASSIGNED_SEQ_NO;
lastOpTerm = UNASSIGNED_PRIMARY_TERM;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public abstract class EngineTestCase extends ESTestCase {
protected Path primaryTranslogDir;
protected Path replicaTranslogDir;
// A default primary term is used by engine instances created in this test.
protected AtomicLong primaryTerm = new AtomicLong();
protected final PrimaryTermSupplier primaryTerm = new PrimaryTermSupplier(0L);

protected static void assertVisibleCount(Engine engine, int numDocs) throws IOException {
assertVisibleCount(engine, numDocs, true);
Expand Down Expand Up @@ -682,7 +682,7 @@ public EngineConfig config(
breakerService,
globalCheckpointSupplier,
retentionLeasesSupplier,
primaryTerm::get,
primaryTerm,
tombstoneDocSupplier());
}

Expand Down Expand Up @@ -1081,4 +1081,25 @@ public static Translog getTranslog(Engine engine) {
InternalEngine internalEngine = (InternalEngine) engine;
return internalEngine.getTranslog();
}

public static final class PrimaryTermSupplier implements LongSupplier {
private final AtomicLong term;

PrimaryTermSupplier(long initialTerm) {
this.term = new AtomicLong(initialTerm);
}

public long get() {
return term.get();
}

public void set(long newTerm) {
this.term.set(newTerm);
}

@Override
public long getAsLong() {
return get();
}
}
}