Skip to content

Commit 0ac89a3

Browse files
authored
Do not optimize append-only if seen normal op with higher seqno (#28787)
When processing an append-only operation, primary knows that operations can only conflict with another instance of the same operation. This is true as the id was freshly generated. However this property doesn't hold for replicas. As soon as an auto-generated ID was indexed into the primary, it can be exposed to a search and users can issue a follow up operation on it. In extremely rare cases, the follow up operation can be arrived and processed on a replica before the original append-only request. In this case we can't simply proceed with the append-only request and blindly add it to the index without consulting the version map. The following scenario can cause difference between primary and replica. 1. Primary indexes an auto-gen-id doc. (id=X, v=1, s#=20) 2. A refresh cycle happens on primary 3. The new doc is picked up and modified - say by a delete by query request - Primary gets a delete doc (id=X, v=2, s#=30) 4. Delete doc is processed first on the replica (id=X, v=2, s#=30) 5. Indexing operation arrives on the replica, since it's an auto-gen-id request and the retry marker is lower, we put it into lucene without any check. Replica has a doc the primary doesn't have. To deal with a potential conflict between an append-only operation and a normal operation on replicas, we need to rely on sequence numbers. This commit maintains the max seqno of non-append-only operations on replica then only apply optimization for an append-only operation only if its seq# is higher than the seq# of all non-append-only.
1 parent 7bf9091 commit 0ac89a3

File tree

3 files changed

+150
-9
lines changed

3 files changed

+150
-9
lines changed

server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ public class InternalEngine extends Engine {
136136
private final AtomicBoolean pendingTranslogRecovery = new AtomicBoolean(false);
137137
public static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID = "max_unsafe_auto_id_timestamp";
138138
private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
139+
private final AtomicLong maxSeqNoOfNonAppendOnlyOperations = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
139140
private final CounterMetric numVersionLookups = new CounterMetric();
140141
private final CounterMetric numIndexVersionsLookups = new CounterMetric();
141142
/**
@@ -186,7 +187,7 @@ public InternalEngine(EngineConfig engineConfig) {
186187
this.combinedDeletionPolicy = new CombinedDeletionPolicy(logger, translogDeletionPolicy,
187188
translog::getLastSyncedGlobalCheckpoint, startingCommit);
188189
writer = createWriter(startingCommit);
189-
updateMaxUnsafeAutoIdTimestampFromWriter(writer);
190+
bootstrapAppendOnlyInfoFromWriter(writer);
190191
historyUUID = loadOrGenerateHistoryUUID(writer);
191192
Objects.requireNonNull(historyUUID, "history uuid should not be null");
192193
indexWriter = writer;
@@ -345,15 +346,20 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException {
345346
}
346347
}
347348

348-
private void updateMaxUnsafeAutoIdTimestampFromWriter(IndexWriter writer) {
349-
long commitMaxUnsafeAutoIdTimestamp = Long.MIN_VALUE;
349+
private void bootstrapAppendOnlyInfoFromWriter(IndexWriter writer) {
350350
for (Map.Entry<String, String> entry : writer.getLiveCommitData()) {
351-
if (entry.getKey().equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) {
352-
commitMaxUnsafeAutoIdTimestamp = Long.parseLong(entry.getValue());
353-
break;
351+
final String key = entry.getKey();
352+
if (key.equals(MAX_UNSAFE_AUTO_ID_TIMESTAMP_COMMIT_ID)) {
353+
assert maxUnsafeAutoIdTimestamp.get() == -1 :
354+
"max unsafe timestamp was assigned already [" + maxUnsafeAutoIdTimestamp.get() + "]";
355+
maxUnsafeAutoIdTimestamp.set(Long.parseLong(entry.getValue()));
356+
}
357+
if (key.equals(SequenceNumbers.MAX_SEQ_NO)) {
358+
assert maxSeqNoOfNonAppendOnlyOperations.get() == -1 :
359+
"max unsafe append-only seq# was assigned already [" + maxSeqNoOfNonAppendOnlyOperations.get() + "]";
360+
maxSeqNoOfNonAppendOnlyOperations.set(Long.parseLong(entry.getValue()));
354361
}
355362
}
356-
maxUnsafeAutoIdTimestamp.set(Math.max(maxUnsafeAutoIdTimestamp.get(), commitMaxUnsafeAutoIdTimestamp));
357363
}
358364

359365
@Override
@@ -803,11 +809,24 @@ public IndexResult index(Index index) throws IOException {
803809

804810
private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOException {
805811
final IndexingStrategy plan;
806-
if (canOptimizeAddDocument(index) && mayHaveBeenIndexedBefore(index) == false) {
807-
// no need to deal with out of order delivery - we never saw this one
812+
final boolean appendOnlyRequest = canOptimizeAddDocument(index);
813+
if (appendOnlyRequest && mayHaveBeenIndexedBefore(index) == false && index.seqNo() > maxSeqNoOfNonAppendOnlyOperations.get()) {
814+
/*
815+
* As soon as an append-only request was indexed into the primary, it can be exposed to a search then users can issue
816+
* a follow-up operation on it. In rare cases, the follow up operation can be arrived and processed on a replica before
817+
* the original append-only. In this case we can't simply proceed with the append only without consulting the version map.
818+
* If a replica has seen a non-append-only operation with a higher seqno than the seqno of an append-only, it may have seen
819+
* the document of that append-only request. However if the seqno of an append-only is higher than seqno of any non-append-only
820+
* requests, we can assert the replica have not seen the document of that append-only request, thus we can apply optimization.
821+
*/
808822
assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]";
809823
plan = IndexingStrategy.optimizedAppendOnly(index.seqNo());
810824
} else {
825+
if (appendOnlyRequest == false) {
826+
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(index.seqNo(), curr));
827+
assert maxSeqNoOfNonAppendOnlyOperations.get() >= index.seqNo() : "max_seqno of non-append-only was not updated;" +
828+
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of index [" + index.seqNo() + "]";
829+
}
811830
versionMap.enforceSafeAccess();
812831
// drop out of order operations
813832
assert index.versionType().versionTypeForReplicationAndRecovery() == index.versionType() :
@@ -942,6 +961,11 @@ private boolean mayHaveBeenIndexedBefore(Index index) {
942961
return mayHaveBeenIndexBefore;
943962
}
944963

964+
// for testing
965+
long getMaxSeqNoOfNonAppendOnlyOperations() {
966+
return maxSeqNoOfNonAppendOnlyOperations.get();
967+
}
968+
945969
private static void index(final List<ParseContext.Document> docs, final IndexWriter indexWriter) throws IOException {
946970
if (docs.size() > 1) {
947971
indexWriter.addDocuments(docs);
@@ -1097,6 +1121,9 @@ private DeletionStrategy planDeletionAsNonPrimary(Delete delete) throws IOExcept
10971121
assert delete.versionType().versionTypeForReplicationAndRecovery() == delete.versionType() :
10981122
"resolving out of order delivery based on versioning but version type isn't fit for it. got ["
10991123
+ delete.versionType() + "]";
1124+
maxSeqNoOfNonAppendOnlyOperations.updateAndGet(curr -> Math.max(delete.seqNo(), curr));
1125+
assert maxSeqNoOfNonAppendOnlyOperations.get() >= delete.seqNo() : "max_seqno of non-append-only was not updated;" +
1126+
"max_seqno non-append-only [" + maxSeqNoOfNonAppendOnlyOperations.get() + "], seqno of delete [" + delete.seqNo() + "]";
11001127
// unlike the primary, replicas don't really care to about found status of documents
11011128
// this allows to ignore the case where a document was found in the live version maps in
11021129
// a delete state and return true for the found flag in favor of code simplicity

server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4530,4 +4530,96 @@ public void testPruneOnlyDeletesAtMostLocalCheckpoint() throws Exception {
45304530
}
45314531
}
45324532

4533+
public void testTrackMaxSeqNoOfNonAppendOnlyOperations() throws Exception {
4534+
IOUtils.close(engine, store);
4535+
store = createStore();
4536+
final Path translogPath = createTempDir();
4537+
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.NO_OPS_PERFORMED);
4538+
try (InternalEngine engine = createEngine(store, translogPath, globalCheckpoint::get)) {
4539+
final CountDownLatch latch = new CountDownLatch(1);
4540+
final Thread appendOnlyIndexer = new Thread(() -> {
4541+
try {
4542+
latch.countDown();
4543+
final int numDocs = scaledRandomIntBetween(100, 1000);
4544+
for (int i = 0; i < numDocs; i++) {
4545+
ParsedDocument doc = testParsedDocument("append-only" + i, null, testDocumentWithTextField(), SOURCE, null);
4546+
if (randomBoolean()) {
4547+
engine.index(appendOnlyReplica(doc, randomBoolean(), 1, engine.getLocalCheckpointTracker().generateSeqNo()));
4548+
} else {
4549+
engine.index(appendOnlyPrimary(doc, randomBoolean(), randomNonNegativeLong()));
4550+
}
4551+
}
4552+
} catch (Exception ex) {
4553+
throw new RuntimeException("Failed to index", ex);
4554+
}
4555+
});
4556+
appendOnlyIndexer.setName("append-only indexer");
4557+
appendOnlyIndexer.start();
4558+
latch.await();
4559+
long maxSeqNoOfNonAppendOnly = SequenceNumbers.NO_OPS_PERFORMED;
4560+
final int numOps = scaledRandomIntBetween(100, 1000);
4561+
for (int i = 0; i < numOps; i++) {
4562+
ParsedDocument parsedDocument = testParsedDocument(Integer.toString(i), null, testDocumentWithTextField(), SOURCE, null);
4563+
if (randomBoolean()) { // On replica - update max_seqno for non-append-only operations
4564+
final long seqno = engine.getLocalCheckpointTracker().generateSeqNo();
4565+
final Engine.Index doc = replicaIndexForDoc(parsedDocument, 1, seqno, randomBoolean());
4566+
if (randomBoolean()) {
4567+
engine.index(doc);
4568+
} else {
4569+
engine.delete(new Engine.Delete(doc.type(), doc.id(), doc.uid(), seqno, doc.primaryTerm(),
4570+
doc.version(), doc.versionType(), doc.origin(), threadPool.relativeTimeInMillis()));
4571+
}
4572+
maxSeqNoOfNonAppendOnly = seqno;
4573+
} else { // On primary - do not update max_seqno for non-append-only operations
4574+
if (randomBoolean()) {
4575+
engine.index(indexForDoc(parsedDocument));
4576+
} else {
4577+
engine.delete(new Engine.Delete(parsedDocument.type(), parsedDocument.id(), newUid(parsedDocument.id())));
4578+
}
4579+
}
4580+
}
4581+
appendOnlyIndexer.join(120_000);
4582+
assertThat(engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(maxSeqNoOfNonAppendOnly));
4583+
globalCheckpoint.set(engine.getLocalCheckpointTracker().getCheckpoint());
4584+
engine.syncTranslog();
4585+
engine.flush();
4586+
}
4587+
try (InternalEngine engine = createEngine(store, translogPath, globalCheckpoint::get)) {
4588+
assertThat("max_seqno from non-append-only was not bootstrap from the safe commit",
4589+
engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(globalCheckpoint.get()));
4590+
}
4591+
}
4592+
4593+
public void testSkipOptimizeForExposedAppendOnlyOperations() throws Exception {
4594+
long lookupTimes = 0L;
4595+
final LocalCheckpointTracker localCheckpointTracker = engine.getLocalCheckpointTracker();
4596+
final int initDocs = between(0, 10);
4597+
for (int i = 0; i < initDocs; i++) {
4598+
index(engine, i);
4599+
lookupTimes++;
4600+
}
4601+
// doc1 is delayed and arrived after a non-append-only op.
4602+
final long seqNoAppendOnly1 = localCheckpointTracker.generateSeqNo();
4603+
final long seqnoNormalOp = localCheckpointTracker.generateSeqNo();
4604+
if (randomBoolean()) {
4605+
engine.index(replicaIndexForDoc(
4606+
testParsedDocument("d", null, testDocumentWithTextField(), SOURCE, null), 1, seqnoNormalOp, false));
4607+
} else {
4608+
engine.delete(replicaDeleteForDoc("d", 1, seqnoNormalOp, randomNonNegativeLong()));
4609+
}
4610+
lookupTimes++;
4611+
assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes));
4612+
assertThat(engine.getMaxSeqNoOfNonAppendOnlyOperations(), equalTo(seqnoNormalOp));
4613+
4614+
// should not optimize for doc1 and process as a regular doc (eg. look up in version map)
4615+
engine.index(appendOnlyReplica(testParsedDocument("append-only-1", null, testDocumentWithTextField(), SOURCE, null),
4616+
false, randomNonNegativeLong(), seqNoAppendOnly1));
4617+
lookupTimes++;
4618+
assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes));
4619+
4620+
// optimize for other append-only 2 (its seqno > max_seqno of non-append-only) - do not look up in version map.
4621+
engine.index(appendOnlyReplica(testParsedDocument("append-only-2", null, testDocumentWithTextField(), SOURCE, null),
4622+
false, randomNonNegativeLong(), localCheckpointTracker.generateSeqNo()));
4623+
assertThat(engine.getNumVersionLookups(), equalTo(lookupTimes));
4624+
}
45334625
}

server/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.elasticsearch.cluster.routing.ShardRouting;
3333
import org.elasticsearch.common.settings.Settings;
3434
import org.elasticsearch.common.unit.TimeValue;
35+
import org.elasticsearch.common.util.iterable.Iterables;
3536
import org.elasticsearch.common.xcontent.XContentType;
3637
import org.elasticsearch.index.IndexSettings;
3738
import org.elasticsearch.index.engine.Engine;
@@ -419,6 +420,27 @@ private void updateGCDeleteCycle(IndexShard shard, TimeValue interval) {
419420
shard.onSettingsChanged();
420421
}
421422

423+
/**
424+
* This test ensures the consistency between primary and replica when non-append-only (eg. index request with id or delete) operation
425+
* of the same document is processed before the original append-only request on replicas. The append-only document can be exposed and
426+
* deleted on the primary before it is added to replica. Replicas should treat a late append-only request as a regular index request.
427+
*/
428+
public void testOutOfOrderDeliveryForAppendOnlyOperations() throws Exception {
429+
try (ReplicationGroup shards = createGroup(1)) {
430+
shards.startAll();
431+
final IndexShard primary = shards.getPrimary();
432+
final IndexShard replica = shards.getReplicas().get(0);
433+
// Append-only request - without id
434+
final BulkShardRequest indexRequest = indexOnPrimary(
435+
new IndexRequest(index.getName(), "type", null).source("{}", XContentType.JSON), primary);
436+
final String docId = Iterables.get(getShardDocUIDs(primary), 0);
437+
final BulkShardRequest deleteRequest = deleteOnPrimary(new DeleteRequest(index.getName(), "type", docId), primary);
438+
deleteOnReplica(deleteRequest, shards, replica);
439+
indexOnReplica(indexRequest, shards, replica);
440+
shards.assertAllEqual(0);
441+
}
442+
}
443+
422444
/** Throws <code>documentFailure</code> on every indexing operation */
423445
static class ThrowingDocumentFailureEngineFactory implements EngineFactory {
424446
final String documentFailureMessage;

0 commit comments

Comments
 (0)