Skip to content

Commit a9b12b3

Browse files
authored
Push primary term to replication tracker (#38044)
This commit pushes the primary term into the replication tracker. This is a precursor to using the primary term to resolving ordering problems for retention leases. Namely, it can be that out-of-order retention lease sync requests arrive on a replica. To resolve this, we need a tuple of (primary term, version). For this to be, the primary term needs to be accessible in the replication tracker. As the primary term is part of the replication group anyway, this change conceptually makes sense.
1 parent 622fb78 commit a9b12b3

File tree

6 files changed

+69
-32
lines changed

6 files changed

+69
-32
lines changed

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.common.io.stream.Writeable;
3434
import org.elasticsearch.index.IndexSettings;
3535
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
36+
import org.elasticsearch.index.shard.IndexShard;
3637
import org.elasticsearch.index.shard.ReplicationGroup;
3738
import org.elasticsearch.index.shard.ShardId;
3839

@@ -92,6 +93,12 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
9293
*/
9394
volatile boolean primaryMode;
9495

96+
/**
97+
* The current operation primary term. Management of this value is done through {@link IndexShard} and must only be done when safe. See
98+
* {@link #setOperationPrimaryTerm(long)}.
99+
*/
100+
private volatile long operationPrimaryTerm;
101+
95102
/**
96103
* Boolean flag that indicates if a relocation handoff is in progress. A handoff is started by calling {@link #startRelocationHandoff}
97104
* and is finished by either calling {@link #completeRelocationHandoff} or {@link #abortRelocationHandoff}, depending on whether the
@@ -408,6 +415,25 @@ public boolean isPrimaryMode() {
408415
return primaryMode;
409416
}
410417

418+
/**
419+
* Returns the current operation primary term.
420+
*
421+
* @return the primary term
422+
*/
423+
public long getOperationPrimaryTerm() {
424+
return operationPrimaryTerm;
425+
}
426+
427+
/**
428+
* Sets the current operation primary term. This method should be invoked only when no other operations are possible on the shard. That
429+
* is, either from the constructor of {@link IndexShard} or while holding all permits on the {@link IndexShard} instance.
430+
*
431+
* @param operationPrimaryTerm the new operation primary term
432+
*/
433+
public void setOperationPrimaryTerm(final long operationPrimaryTerm) {
434+
this.operationPrimaryTerm = operationPrimaryTerm;
435+
}
436+
411437
/**
412438
* Returns whether the replication tracker has relocated away to another shard copy.
413439
*/
@@ -527,13 +553,15 @@ private static long inSyncCheckpointStates(
527553
* @param shardId the shard ID
528554
* @param allocationId the allocation ID
529555
* @param indexSettings the index settings
556+
* @param operationPrimaryTerm the current primary term
530557
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
531558
* @param onSyncRetentionLeases a callback when a new retention lease is created or an existing retention lease expires
532559
*/
533560
public ReplicationTracker(
534561
final ShardId shardId,
535562
final String allocationId,
536563
final IndexSettings indexSettings,
564+
final long operationPrimaryTerm,
537565
final long globalCheckpoint,
538566
final LongConsumer onGlobalCheckpointUpdated,
539567
final LongSupplier currentTimeMillisSupplier,
@@ -542,6 +570,7 @@ public ReplicationTracker(
542570
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
543571
this.shardAllocationId = allocationId;
544572
this.primaryMode = false;
573+
this.operationPrimaryTerm = operationPrimaryTerm;
545574
this.handoffInProgress = false;
546575
this.appliedClusterStateVersion = -1L;
547576
this.checkpoints = new HashMap<>(1 + indexSettings.getNumberOfReplicas());

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
200200
protected volatile ShardRouting shardRouting;
201201
protected volatile IndexShardState state;
202202
private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm
203-
private volatile long operationPrimaryTerm;
204203
protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
205204
final EngineFactory engineFactory;
206205

@@ -307,17 +306,21 @@ public IndexShard(
307306
this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP);
308307
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays);
309308
final String aId = shardRouting.allocationId().getId();
309+
final long primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
310+
this.pendingPrimaryTerm = primaryTerm;
310311
this.globalCheckpointListeners =
311312
new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), threadPool.scheduler(), logger);
312-
this.replicationTracker =
313+
final ReplicationTracker replicationTracker =
313314
new ReplicationTracker(
314315
shardId,
315316
aId,
316317
indexSettings,
318+
primaryTerm,
317319
UNASSIGNED_SEQ_NO,
318320
globalCheckpointListeners::globalCheckpointUpdated,
319321
threadPool::absoluteTimeInMillis,
320322
retentionLeaseSyncer);
323+
this.replicationTracker = replicationTracker;
321324

322325
// the query cache is a node-level thing, however we want the most popular filters
323326
// to be computed on a per-shard basis
@@ -337,8 +340,6 @@ public boolean shouldCache(Query query) {
337340
}
338341
indexShardOperationPermits = new IndexShardOperationPermits(shardId, threadPool);
339342
searcherWrapper = indexSearcherWrapper;
340-
pendingPrimaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
341-
operationPrimaryTerm = pendingPrimaryTerm;
342343
refreshListeners = buildRefreshListeners();
343344
lastSearcherAccess.set(threadPool.relativeTimeInMillis());
344345
persistMetadata(path, indexSettings, shardRouting, null, logger);
@@ -400,7 +401,7 @@ public long getPendingPrimaryTerm() {
400401

401402
/** Returns the primary term that is currently being used to assign to operations */
402403
public long getOperationPrimaryTerm() {
403-
return this.operationPrimaryTerm;
404+
return replicationTracker.getOperationPrimaryTerm();
404405
}
405406

406407
/**
@@ -509,7 +510,7 @@ public void updateShardState(final ShardRouting newRouting,
509510
assert pendingPrimaryTerm == newPrimaryTerm :
510511
"shard term changed on primary. expected [" + newPrimaryTerm + "] but was [" + pendingPrimaryTerm + "]" +
511512
", current routing: " + currentRouting + ", new routing: " + newRouting;
512-
assert operationPrimaryTerm == newPrimaryTerm;
513+
assert getOperationPrimaryTerm() == newPrimaryTerm;
513514
try {
514515
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
515516
/*
@@ -705,23 +706,23 @@ public Engine.IndexResult applyIndexOperationOnPrimary(long version, VersionType
705706
boolean isRetry)
706707
throws IOException {
707708
assert versionType.validateVersionForWrites(version);
708-
return applyIndexOperation(getEngine(), UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, versionType, ifSeqNo,
709+
return applyIndexOperation(getEngine(), UNASSIGNED_SEQ_NO, getOperationPrimaryTerm(), version, versionType, ifSeqNo,
709710
ifPrimaryTerm, autoGeneratedTimestamp, isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse);
710711
}
711712

712713
public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, long autoGeneratedTimeStamp,
713714
boolean isRetry, SourceToParse sourceToParse)
714715
throws IOException {
715-
return applyIndexOperation(getEngine(), seqNo, operationPrimaryTerm, version, null, UNASSIGNED_SEQ_NO, 0,
716+
return applyIndexOperation(getEngine(), seqNo, getOperationPrimaryTerm(), version, null, UNASSIGNED_SEQ_NO, 0,
716717
autoGeneratedTimeStamp, isRetry, Engine.Operation.Origin.REPLICA, sourceToParse);
717718
}
718719

719720
private Engine.IndexResult applyIndexOperation(Engine engine, long seqNo, long opPrimaryTerm, long version,
720721
@Nullable VersionType versionType, long ifSeqNo, long ifPrimaryTerm,
721722
long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin,
722723
SourceToParse sourceToParse) throws IOException {
723-
assert opPrimaryTerm <= this.operationPrimaryTerm: "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm
724-
+ "]";
724+
assert opPrimaryTerm <= getOperationPrimaryTerm()
725+
: "op term [ " + opPrimaryTerm + " ] > shard term [" + getOperationPrimaryTerm() + "]";
725726
ensureWriteAllowed(origin);
726727
Engine.Index operation;
727728
try {
@@ -784,13 +785,13 @@ private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOExc
784785
}
785786

786787
public Engine.NoOpResult markSeqNoAsNoop(long seqNo, String reason) throws IOException {
787-
return markSeqNoAsNoop(getEngine(), seqNo, operationPrimaryTerm, reason, Engine.Operation.Origin.REPLICA);
788+
return markSeqNoAsNoop(getEngine(), seqNo, getOperationPrimaryTerm(), reason, Engine.Operation.Origin.REPLICA);
788789
}
789790

790791
private Engine.NoOpResult markSeqNoAsNoop(Engine engine, long seqNo, long opPrimaryTerm, String reason,
791792
Engine.Operation.Origin origin) throws IOException {
792-
assert opPrimaryTerm <= this.operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm
793-
+ "]";
793+
assert opPrimaryTerm <= getOperationPrimaryTerm()
794+
: "op term [ " + opPrimaryTerm + " ] > shard term [" + getOperationPrimaryTerm() + "]";
794795
long startTime = System.nanoTime();
795796
ensureWriteAllowed(origin);
796797
final Engine.NoOp noOp = new Engine.NoOp(seqNo, opPrimaryTerm, origin, startTime, reason);
@@ -806,31 +807,31 @@ private Engine.NoOpResult noOp(Engine engine, Engine.NoOp noOp) {
806807
}
807808

808809
public Engine.IndexResult getFailedIndexResult(Exception e, long version) {
809-
return new Engine.IndexResult(e, version, operationPrimaryTerm);
810+
return new Engine.IndexResult(e, version, getOperationPrimaryTerm());
810811
}
811812

812813
public Engine.DeleteResult getFailedDeleteResult(Exception e, long version) {
813-
return new Engine.DeleteResult(e, version, operationPrimaryTerm);
814+
return new Engine.DeleteResult(e, version, getOperationPrimaryTerm());
814815
}
815816

816817
public Engine.DeleteResult applyDeleteOperationOnPrimary(long version, String type, String id, VersionType versionType,
817818
long ifSeqNo, long ifPrimaryTerm)
818819
throws IOException {
819820
assert versionType.validateVersionForWrites(version);
820-
return applyDeleteOperation(getEngine(), UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, type, id, versionType,
821+
return applyDeleteOperation(getEngine(), UNASSIGNED_SEQ_NO, getOperationPrimaryTerm(), version, type, id, versionType,
821822
ifSeqNo, ifPrimaryTerm, Engine.Operation.Origin.PRIMARY);
822823
}
823824

824825
public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id) throws IOException {
825826
return applyDeleteOperation(
826-
getEngine(), seqNo, operationPrimaryTerm, version, type, id, null, UNASSIGNED_SEQ_NO, 0, Engine.Operation.Origin.REPLICA);
827+
getEngine(), seqNo, getOperationPrimaryTerm(), version, type, id, null, UNASSIGNED_SEQ_NO, 0, Engine.Operation.Origin.REPLICA);
827828
}
828829

829830
private Engine.DeleteResult applyDeleteOperation(Engine engine, long seqNo, long opPrimaryTerm, long version, String type, String id,
830831
@Nullable VersionType versionType, long ifSeqNo, long ifPrimaryTerm,
831832
Engine.Operation.Origin origin) throws IOException {
832-
assert opPrimaryTerm <= this.operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm
833-
+ "]";
833+
assert opPrimaryTerm <= getOperationPrimaryTerm()
834+
: "op term [ " + opPrimaryTerm + " ] > shard term [" + getOperationPrimaryTerm() + "]";
834835
ensureWriteAllowed(origin);
835836
// When there is a single type, the unique identifier is only composed of the _id,
836837
// so there is no way to differentiate foo#1 from bar#1. This is especially an issue
@@ -846,7 +847,7 @@ private Engine.DeleteResult applyDeleteOperation(Engine engine, long seqNo, long
846847
return new Engine.DeleteResult(update);
847848
}
848849
} catch (MapperParsingException | IllegalArgumentException | TypeMissingException e) {
849-
return new Engine.DeleteResult(e, version, operationPrimaryTerm, seqNo, false);
850+
return new Engine.DeleteResult(e, version, getOperationPrimaryTerm(), seqNo, false);
850851
}
851852
if (mapperService.resolveDocumentType(type).equals(mapperService.documentMapper().type()) == false) {
852853
// We should never get there due to the fact that we generate mapping updates on deletes,
@@ -1273,7 +1274,7 @@ public void prepareForIndexRecovery() {
12731274
}
12741275

12751276
public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) {
1276-
getEngine().trimOperationsFromTranslog(operationPrimaryTerm, aboveSeqNo);
1277+
getEngine().trimOperationsFromTranslog(getOperationPrimaryTerm(), aboveSeqNo);
12771278
}
12781279

12791280
/**
@@ -2388,7 +2389,7 @@ private EngineConfig newEngineConfig() {
23882389
Collections.singletonList(refreshListeners),
23892390
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
23902391
indexSort, circuitBreakerService, replicationTracker, replicationTracker::getRetentionLeases,
2391-
() -> operationPrimaryTerm, tombstoneDocSupplier());
2392+
() -> getOperationPrimaryTerm(), tombstoneDocSupplier());
23922393
}
23932394

23942395
/**
@@ -2468,7 +2469,7 @@ private <E extends Exception> void bumpPrimaryTerm(final long newPrimaryTerm,
24682469
@Nullable ActionListener<Releasable> combineWithAction) {
24692470
assert Thread.holdsLock(mutex);
24702471
assert newPrimaryTerm > pendingPrimaryTerm || (newPrimaryTerm >= pendingPrimaryTerm && combineWithAction != null);
2471-
assert operationPrimaryTerm <= pendingPrimaryTerm;
2472+
assert getOperationPrimaryTerm() <= pendingPrimaryTerm;
24722473
final CountDownLatch termUpdated = new CountDownLatch(1);
24732474
asyncBlockOperations(new ActionListener<Releasable>() {
24742475
@Override
@@ -2494,12 +2495,12 @@ private void innerFail(final Exception e) {
24942495
public void onResponse(final Releasable releasable) {
24952496
final RunOnce releaseOnce = new RunOnce(releasable::close);
24962497
try {
2497-
assert operationPrimaryTerm <= pendingPrimaryTerm;
2498+
assert getOperationPrimaryTerm() <= pendingPrimaryTerm;
24982499
termUpdated.await();
24992500
// indexShardOperationPermits doesn't guarantee that async submissions are executed
25002501
// in the order submitted. We need to guard against another term bump
2501-
if (operationPrimaryTerm < newPrimaryTerm) {
2502-
operationPrimaryTerm = newPrimaryTerm;
2502+
if (getOperationPrimaryTerm() < newPrimaryTerm) {
2503+
replicationTracker.setOperationPrimaryTerm(newPrimaryTerm);
25032504
onBlocked.run();
25042505
}
25052506
} catch (final Exception e) {
@@ -2585,14 +2586,14 @@ private void innerAcquireReplicaOperationPermit(final long opPrimaryTerm,
25852586
final ActionListener<Releasable> operationListener = new ActionListener<Releasable>() {
25862587
@Override
25872588
public void onResponse(final Releasable releasable) {
2588-
if (opPrimaryTerm < operationPrimaryTerm) {
2589+
if (opPrimaryTerm < getOperationPrimaryTerm()) {
25892590
releasable.close();
25902591
final String message = String.format(
25912592
Locale.ROOT,
25922593
"%s operation primary term [%d] is too old (current [%d])",
25932594
shardId,
25942595
opPrimaryTerm,
2595-
operationPrimaryTerm);
2596+
getOperationPrimaryTerm());
25962597
onPermitAcquired.onFailure(new IllegalStateException(message));
25972598
} else {
25982599
assert assertReplicationTarget();
@@ -2653,7 +2654,7 @@ public void onFailure(final Exception e) {
26532654
}
26542655

26552656
private boolean requirePrimaryTermUpdate(final long opPrimaryTerm, final boolean allPermits) {
2656-
return (opPrimaryTerm > pendingPrimaryTerm) || (allPermits && opPrimaryTerm > operationPrimaryTerm);
2657+
return (opPrimaryTerm > pendingPrimaryTerm) || (allPermits && opPrimaryTerm > getOperationPrimaryTerm());
26572658
}
26582659

26592660
public int getActiveOperationsCount() {

server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public void testAddOrRenewRetentionLease() {
5353
new ShardId("test", "_na", 0),
5454
allocationId.getId(),
5555
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
56+
randomNonNegativeLong(),
5657
UNASSIGNED_SEQ_NO,
5758
value -> {},
5859
() -> 0L,
@@ -88,6 +89,7 @@ public void testAddRetentionLeaseCausesRetentionLeaseSync() {
8889
new ShardId("test", "_na", 0),
8990
allocationId.getId(),
9091
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
92+
randomNonNegativeLong(),
9193
UNASSIGNED_SEQ_NO,
9294
value -> {},
9395
() -> 0L,
@@ -143,6 +145,7 @@ private void runExpirationTest(final boolean primaryMode) {
143145
new ShardId("test", "_na", 0),
144146
allocationId.getId(),
145147
IndexSettingsModule.newIndexSettings("test", settings),
148+
randomNonNegativeLong(),
146149
UNASSIGNED_SEQ_NO,
147150
value -> {},
148151
currentTimeMillis::get,
@@ -215,6 +218,7 @@ public void testRetentionLeaseExpirationCausesRetentionLeaseSync() {
215218
new ShardId("test", "_na", 0),
216219
allocationId.getId(),
217220
IndexSettingsModule.newIndexSettings("test", settings),
221+
randomNonNegativeLong(),
218222
UNASSIGNED_SEQ_NO,
219223
value -> {},
220224
currentTimeMillis::get,

server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ ReplicationTracker newTracker(
4545
new ShardId("test", "_na_", 0),
4646
allocationId.getId(),
4747
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
48+
randomNonNegativeLong(),
4849
UNASSIGNED_SEQ_NO,
4950
updatedGlobalCheckpoint,
5051
currentTimeMillisSupplier,

0 commit comments

Comments
 (0)