Skip to content

Commit 8073997

Browse files
committed
Push primary term to replication tracker (#38044)
This commit pushes the primary term into the replication tracker. This is a precursor to using the primary term to resolving ordering problems for retention leases. Namely, it can be that out-of-order retention lease sync requests arrive on a replica. To resolve this, we need a tuple of (primary term, version). For this to be, the primary term needs to be accessible in the replication tracker. As the primary term is part of the replication group anyway, this change conceptually makes sense.
1 parent fe0a52e commit 8073997

File tree

6 files changed

+72
-33
lines changed

6 files changed

+72
-33
lines changed

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.common.io.stream.Writeable;
3434
import org.elasticsearch.index.IndexSettings;
3535
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
36+
import org.elasticsearch.index.shard.IndexShard;
3637
import org.elasticsearch.index.shard.ReplicationGroup;
3738
import org.elasticsearch.index.shard.ShardId;
3839

@@ -92,6 +93,12 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
9293
*/
9394
volatile boolean primaryMode;
9495

96+
/**
97+
* The current operation primary term. Management of this value is done through {@link IndexShard} and must only be done when safe. See
98+
* {@link #setOperationPrimaryTerm(long)}.
99+
*/
100+
private volatile long operationPrimaryTerm;
101+
95102
/**
96103
* Boolean flag that indicates if a relocation handoff is in progress. A handoff is started by calling {@link #startRelocationHandoff}
97104
* and is finished by either calling {@link #completeRelocationHandoff} or {@link #abortRelocationHandoff}, depending on whether the
@@ -408,6 +415,25 @@ public boolean isPrimaryMode() {
408415
return primaryMode;
409416
}
410417

418+
/**
419+
* Returns the current operation primary term.
420+
*
421+
* @return the primary term
422+
*/
423+
public long getOperationPrimaryTerm() {
424+
return operationPrimaryTerm;
425+
}
426+
427+
/**
428+
* Sets the current operation primary term. This method should be invoked only when no other operations are possible on the shard. That
429+
* is, either from the constructor of {@link IndexShard} or while holding all permits on the {@link IndexShard} instance.
430+
*
431+
* @param operationPrimaryTerm the new operation primary term
432+
*/
433+
public void setOperationPrimaryTerm(final long operationPrimaryTerm) {
434+
this.operationPrimaryTerm = operationPrimaryTerm;
435+
}
436+
411437
/**
412438
* Returns whether the replication tracker has relocated away to another shard copy.
413439
*/
@@ -527,13 +553,15 @@ private static long inSyncCheckpointStates(
527553
* @param shardId the shard ID
528554
* @param allocationId the allocation ID
529555
* @param indexSettings the index settings
556+
* @param operationPrimaryTerm the current primary term
530557
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
531558
* @param onSyncRetentionLeases a callback when a new retention lease is created or an existing retention lease expires
532559
*/
533560
public ReplicationTracker(
534561
final ShardId shardId,
535562
final String allocationId,
536563
final IndexSettings indexSettings,
564+
final long operationPrimaryTerm,
537565
final long globalCheckpoint,
538566
final LongConsumer onGlobalCheckpointUpdated,
539567
final LongSupplier currentTimeMillisSupplier,
@@ -542,6 +570,7 @@ public ReplicationTracker(
542570
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
543571
this.shardAllocationId = allocationId;
544572
this.primaryMode = false;
573+
this.operationPrimaryTerm = operationPrimaryTerm;
545574
this.handoffInProgress = false;
546575
this.appliedClusterStateVersion = -1L;
547576
this.checkpoints = new HashMap<>(1 + indexSettings.getNumberOfReplicas());

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 33 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,6 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
202202
protected volatile ShardRouting shardRouting;
203203
protected volatile IndexShardState state;
204204
private volatile long pendingPrimaryTerm; // see JavaDocs for getPendingPrimaryTerm
205-
private volatile long operationPrimaryTerm;
206205
protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
207206
final EngineFactory engineFactory;
208207

@@ -310,17 +309,21 @@ public IndexShard(
310309
}
311310
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays);
312311
final String aId = shardRouting.allocationId().getId();
312+
final long primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
313+
this.pendingPrimaryTerm = primaryTerm;
313314
this.globalCheckpointListeners =
314315
new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), threadPool.scheduler(), logger);
315-
this.replicationTracker =
316+
final ReplicationTracker replicationTracker =
316317
new ReplicationTracker(
317318
shardId,
318319
aId,
319320
indexSettings,
321+
primaryTerm,
320322
UNASSIGNED_SEQ_NO,
321323
globalCheckpointListeners::globalCheckpointUpdated,
322324
threadPool::absoluteTimeInMillis,
323325
retentionLeaseSyncer);
326+
this.replicationTracker = replicationTracker;
324327

325328
// the query cache is a node-level thing, however we want the most popular filters
326329
// to be computed on a per-shard basis
@@ -340,8 +343,6 @@ public boolean shouldCache(Query query) {
340343
}
341344
indexShardOperationPermits = new IndexShardOperationPermits(shardId, logger, threadPool);
342345
searcherWrapper = indexSearcherWrapper;
343-
pendingPrimaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
344-
operationPrimaryTerm = pendingPrimaryTerm;
345346
refreshListeners = buildRefreshListeners();
346347
persistMetadata(path, indexSettings, shardRouting, null, logger);
347348
}
@@ -402,7 +403,7 @@ public long getPendingPrimaryTerm() {
402403

403404
/** Returns the primary term that is currently being used to assign to operations */
404405
public long getOperationPrimaryTerm() {
405-
return this.operationPrimaryTerm;
406+
return replicationTracker.getOperationPrimaryTerm();
406407
}
407408

408409
/**
@@ -529,7 +530,7 @@ public void updateShardState(final ShardRouting newRouting,
529530
assert pendingPrimaryTerm == newPrimaryTerm :
530531
"shard term changed on primary. expected [" + newPrimaryTerm + "] but was [" + pendingPrimaryTerm + "]" +
531532
", current routing: " + currentRouting + ", new routing: " + newRouting;
532-
assert operationPrimaryTerm == newPrimaryTerm;
533+
assert getOperationPrimaryTerm() == newPrimaryTerm;
533534
try {
534535
replicationTracker.activatePrimaryMode(getLocalCheckpoint());
535536
/*
@@ -731,23 +732,23 @@ private IndexShardState changeState(IndexShardState newState, String reason) {
731732
public Engine.IndexResult applyIndexOperationOnPrimary(long version, VersionType versionType, SourceToParse sourceToParse,
732733
long ifSeqNo, long ifPrimaryTerm, long autoGeneratedTimestamp,
733734
boolean isRetry) throws IOException {
734-
return applyIndexOperation(getEngine(), SequenceNumbers.UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, versionType,
735+
return applyIndexOperation(getEngine(), SequenceNumbers.UNASSIGNED_SEQ_NO, getOperationPrimaryTerm(), version, versionType,
735736
ifSeqNo, ifPrimaryTerm, autoGeneratedTimestamp, isRetry, Engine.Operation.Origin.PRIMARY, sourceToParse);
736737
}
737738

738739
public Engine.IndexResult applyIndexOperationOnReplica(long seqNo, long version, VersionType versionType,
739740
long autoGeneratedTimeStamp, boolean isRetry, SourceToParse sourceToParse)
740741
throws IOException {
741-
return applyIndexOperation(getEngine(), seqNo, operationPrimaryTerm, version, versionType, UNASSIGNED_SEQ_NO, 0,
742+
return applyIndexOperation(getEngine(), seqNo, getOperationPrimaryTerm(), version, versionType, UNASSIGNED_SEQ_NO, 0,
742743
autoGeneratedTimeStamp, isRetry, Engine.Operation.Origin.REPLICA, sourceToParse);
743744
}
744745

745746
private Engine.IndexResult applyIndexOperation(Engine engine, long seqNo, long opPrimaryTerm, long version, VersionType versionType,
746747
long ifSeqNo, long ifPrimaryTerm,
747748
long autoGeneratedTimeStamp, boolean isRetry, Engine.Operation.Origin origin,
748749
SourceToParse sourceToParse) throws IOException {
749-
assert opPrimaryTerm <= this.operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm
750-
+ "]";
750+
assert opPrimaryTerm <= getOperationPrimaryTerm()
751+
: "op term [ " + opPrimaryTerm + " ] > shard term [" + getOperationPrimaryTerm() + "]";
751752
assert versionType.validateVersionForWrites(version);
752753
ensureWriteAllowed(origin);
753754
Engine.Index operation;
@@ -811,13 +812,13 @@ private Engine.IndexResult index(Engine engine, Engine.Index index) throws IOExc
811812
}
812813

813814
public Engine.NoOpResult markSeqNoAsNoop(long seqNo, String reason) throws IOException {
814-
return markSeqNoAsNoop(getEngine(), seqNo, operationPrimaryTerm, reason, Engine.Operation.Origin.REPLICA);
815+
return markSeqNoAsNoop(getEngine(), seqNo, getOperationPrimaryTerm(), reason, Engine.Operation.Origin.REPLICA);
815816
}
816817

817818
private Engine.NoOpResult markSeqNoAsNoop(Engine engine, long seqNo, long opPrimaryTerm, String reason,
818819
Engine.Operation.Origin origin) throws IOException {
819-
assert opPrimaryTerm <= this.operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm
820-
+ "]";
820+
assert opPrimaryTerm <= getOperationPrimaryTerm()
821+
: "op term [ " + opPrimaryTerm + " ] > shard term [" + getOperationPrimaryTerm() + "]";
821822
long startTime = System.nanoTime();
822823
ensureWriteAllowed(origin);
823824
final Engine.NoOp noOp = new Engine.NoOp(seqNo, opPrimaryTerm, origin, startTime, reason);
@@ -833,31 +834,33 @@ private Engine.NoOpResult noOp(Engine engine, Engine.NoOp noOp) {
833834
}
834835

835836
public Engine.IndexResult getFailedIndexResult(Exception e, long version) {
836-
return new Engine.IndexResult(e, version, operationPrimaryTerm);
837+
return new Engine.IndexResult(e, version, getOperationPrimaryTerm());
837838
}
838839

839840
public Engine.DeleteResult getFailedDeleteResult(Exception e, long version) {
840-
return new Engine.DeleteResult(e, version, operationPrimaryTerm);
841+
return new Engine.DeleteResult(e, version, getOperationPrimaryTerm());
841842
}
842843

843844
public Engine.DeleteResult applyDeleteOperationOnPrimary(long version, String type, String id, VersionType versionType,
844845
long ifSeqNo, long ifPrimaryTerm)
845846
throws IOException {
846-
return applyDeleteOperation(getEngine(), SequenceNumbers.UNASSIGNED_SEQ_NO, operationPrimaryTerm, version, type, id, versionType,
847-
ifSeqNo, ifPrimaryTerm, Engine.Operation.Origin.PRIMARY);
847+
return applyDeleteOperation(
848+
getEngine(), SequenceNumbers.UNASSIGNED_SEQ_NO, getOperationPrimaryTerm(), version, type, id, versionType, ifSeqNo,
849+
ifPrimaryTerm, Engine.Operation.Origin.PRIMARY);
848850
}
849851

850852
public Engine.DeleteResult applyDeleteOperationOnReplica(long seqNo, long version, String type, String id,
851853
VersionType versionType) throws IOException {
852-
return applyDeleteOperation(getEngine(), seqNo, operationPrimaryTerm, version, type, id, versionType, UNASSIGNED_SEQ_NO, 0,
854+
return applyDeleteOperation(getEngine(), seqNo, getOperationPrimaryTerm(), version, type, id, versionType, UNASSIGNED_SEQ_NO, 0,
853855
Engine.Operation.Origin.REPLICA);
856+
854857
}
855858

856859
private Engine.DeleteResult applyDeleteOperation(Engine engine, long seqNo, long opPrimaryTerm, long version, String type, String id,
857860
VersionType versionType, long ifSeqNo, long ifPrimaryTerm,
858861
Engine.Operation.Origin origin) throws IOException {
859-
assert opPrimaryTerm <= this.operationPrimaryTerm : "op term [ " + opPrimaryTerm + " ] > shard term [" + this.operationPrimaryTerm
860-
+ "]";
862+
assert opPrimaryTerm <= getOperationPrimaryTerm()
863+
: "op term [ " + opPrimaryTerm + " ] > shard term [" + getOperationPrimaryTerm() + "]";
861864
assert versionType.validateVersionForWrites(version);
862865
ensureWriteAllowed(origin);
863866
if (indexSettings().isSingleType()) {
@@ -874,7 +877,7 @@ private Engine.DeleteResult applyDeleteOperation(Engine engine, long seqNo, long
874877
return new Engine.DeleteResult(update);
875878
}
876879
} catch (MapperParsingException | IllegalArgumentException | TypeMissingException e) {
877-
return new Engine.DeleteResult(e, version, operationPrimaryTerm, seqNo, false);
880+
return new Engine.DeleteResult(e, version, getOperationPrimaryTerm(), seqNo, false);
878881
}
879882
}
880883
final Term uid = extractUidForDelete(type, id);
@@ -1304,7 +1307,7 @@ public void prepareForIndexRecovery() {
13041307
}
13051308

13061309
public void trimOperationOfPreviousPrimaryTerms(long aboveSeqNo) {
1307-
getEngine().trimOperationsFromTranslog(operationPrimaryTerm, aboveSeqNo);
1310+
getEngine().trimOperationsFromTranslog(getOperationPrimaryTerm(), aboveSeqNo);
13081311
}
13091312

13101313
/**
@@ -2421,7 +2424,7 @@ private EngineConfig newEngineConfig() {
24212424
Collections.singletonList(refreshListeners),
24222425
Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
24232426
indexSort, circuitBreakerService, replicationTracker, replicationTracker::getRetentionLeases,
2424-
() -> operationPrimaryTerm, tombstoneDocSupplier());
2427+
() -> getOperationPrimaryTerm(), tombstoneDocSupplier());
24252428
}
24262429

24272430
/**
@@ -2501,7 +2504,7 @@ private <E extends Exception> void bumpPrimaryTerm(final long newPrimaryTerm,
25012504
@Nullable ActionListener<Releasable> combineWithAction) {
25022505
assert Thread.holdsLock(mutex);
25032506
assert newPrimaryTerm > pendingPrimaryTerm || (newPrimaryTerm >= pendingPrimaryTerm && combineWithAction != null);
2504-
assert operationPrimaryTerm <= pendingPrimaryTerm;
2507+
assert getOperationPrimaryTerm() <= pendingPrimaryTerm;
25052508
final CountDownLatch termUpdated = new CountDownLatch(1);
25062509
asyncBlockOperations(new ActionListener<Releasable>() {
25072510
@Override
@@ -2527,12 +2530,12 @@ private void innerFail(final Exception e) {
25272530
public void onResponse(final Releasable releasable) {
25282531
final RunOnce releaseOnce = new RunOnce(releasable::close);
25292532
try {
2530-
assert operationPrimaryTerm <= pendingPrimaryTerm;
2533+
assert getOperationPrimaryTerm() <= pendingPrimaryTerm;
25312534
termUpdated.await();
25322535
// indexShardOperationPermits doesn't guarantee that async submissions are executed
25332536
// in the order submitted. We need to guard against another term bump
2534-
if (operationPrimaryTerm < newPrimaryTerm) {
2535-
operationPrimaryTerm = newPrimaryTerm;
2537+
if (getOperationPrimaryTerm() < newPrimaryTerm) {
2538+
replicationTracker.setOperationPrimaryTerm(newPrimaryTerm);
25362539
onBlocked.run();
25372540
}
25382541
} catch (final Exception e) {
@@ -2618,14 +2621,14 @@ private void innerAcquireReplicaOperationPermit(final long opPrimaryTerm,
26182621
final ActionListener<Releasable> operationListener = new ActionListener<Releasable>() {
26192622
@Override
26202623
public void onResponse(final Releasable releasable) {
2621-
if (opPrimaryTerm < operationPrimaryTerm) {
2624+
if (opPrimaryTerm < getOperationPrimaryTerm()) {
26222625
releasable.close();
26232626
final String message = String.format(
26242627
Locale.ROOT,
26252628
"%s operation primary term [%d] is too old (current [%d])",
26262629
shardId,
26272630
opPrimaryTerm,
2628-
operationPrimaryTerm);
2631+
getOperationPrimaryTerm());
26292632
onPermitAcquired.onFailure(new IllegalStateException(message));
26302633
} else {
26312634
assert assertReplicationTarget();
@@ -2686,7 +2689,7 @@ public void onFailure(final Exception e) {
26862689
}
26872690

26882691
private boolean requirePrimaryTermUpdate(final long opPrimaryTerm, final boolean allPermits) {
2689-
return (opPrimaryTerm > pendingPrimaryTerm) || (allPermits && opPrimaryTerm > operationPrimaryTerm);
2692+
return (opPrimaryTerm > pendingPrimaryTerm) || (allPermits && opPrimaryTerm > getOperationPrimaryTerm());
26902693
}
26912694

26922695
public int getActiveOperationsCount() {

server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public void testAddOrRenewRetentionLease() {
5353
new ShardId("test", "_na", 0),
5454
allocationId.getId(),
5555
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
56+
randomNonNegativeLong(),
5657
UNASSIGNED_SEQ_NO,
5758
value -> {},
5859
() -> 0L,
@@ -88,6 +89,7 @@ public void testAddRetentionLeaseCausesRetentionLeaseSync() {
8889
new ShardId("test", "_na", 0),
8990
allocationId.getId(),
9091
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
92+
randomNonNegativeLong(),
9193
UNASSIGNED_SEQ_NO,
9294
value -> {},
9395
() -> 0L,
@@ -143,6 +145,7 @@ private void runExpirationTest(final boolean primaryMode) {
143145
new ShardId("test", "_na", 0),
144146
allocationId.getId(),
145147
IndexSettingsModule.newIndexSettings("test", settings),
148+
randomNonNegativeLong(),
146149
UNASSIGNED_SEQ_NO,
147150
value -> {},
148151
currentTimeMillis::get,
@@ -215,6 +218,7 @@ public void testRetentionLeaseExpirationCausesRetentionLeaseSync() {
215218
new ShardId("test", "_na", 0),
216219
allocationId.getId(),
217220
IndexSettingsModule.newIndexSettings("test", settings),
221+
randomNonNegativeLong(),
218222
UNASSIGNED_SEQ_NO,
219223
value -> {},
220224
currentTimeMillis::get,

server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ ReplicationTracker newTracker(
4545
new ShardId("test", "_na_", 0),
4646
allocationId.getId(),
4747
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
48+
randomNonNegativeLong(),
4849
UNASSIGNED_SEQ_NO,
4950
updatedGlobalCheckpoint,
5051
currentTimeMillisSupplier,

0 commit comments

Comments
 (0)