Skip to content

Commit 72e2e7b

Browse files
committed
Feedback
1 parent 0895a5c commit 72e2e7b

File tree

6 files changed

+35
-25
lines changed

6 files changed

+35
-25
lines changed

server/src/main/java/org/elasticsearch/index/seqno/ReplicationTracker.java

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -99,25 +99,6 @@ public class ReplicationTracker extends AbstractIndexShardComponent implements L
9999
*/
100100
private volatile long operationPrimaryTerm;
101101

102-
/**
103-
* Returns the current operation primary term.
104-
*
105-
* @return the primary term
106-
*/
107-
public long getOperationPrimaryTerm() {
108-
return operationPrimaryTerm;
109-
}
110-
111-
/**
112-
* Sets the current operation primary term. This method should be invoked only when no other operations are possible on the shard. That
113-
* is, either from the constructor of {@link IndexShard} or while holding all permits on the {@link IndexShard} instance.
114-
*
115-
* @param operationPrimaryTerm the new operation primary term
116-
*/
117-
public void setOperationPrimaryTerm(final long operationPrimaryTerm) {
118-
this.operationPrimaryTerm = operationPrimaryTerm;
119-
}
120-
121102
/**
122103
* Boolean flag that indicates if a relocation handoff is in progress. A handoff is started by calling {@link #startRelocationHandoff}
123104
* and is finished by either calling {@link #completeRelocationHandoff} or {@link #abortRelocationHandoff}, depending on whether the
@@ -434,6 +415,25 @@ public boolean isPrimaryMode() {
434415
return primaryMode;
435416
}
436417

418+
/**
419+
* Returns the current operation primary term.
420+
*
421+
* @return the primary term
422+
*/
423+
public long getOperationPrimaryTerm() {
424+
return operationPrimaryTerm;
425+
}
426+
427+
/**
428+
* Sets the current operation primary term. This method should be invoked only when no other operations are possible on the shard. That
429+
* is, either from the constructor of {@link IndexShard} or while holding all permits on the {@link IndexShard} instance.
430+
*
431+
* @param operationPrimaryTerm the new operation primary term
432+
*/
433+
public void setOperationPrimaryTerm(final long operationPrimaryTerm) {
434+
this.operationPrimaryTerm = operationPrimaryTerm;
435+
}
436+
437437
/**
438438
* Returns whether the replication tracker has relocated away to another shard copy.
439439
*/
@@ -553,13 +553,15 @@ private static long inSyncCheckpointStates(
553553
* @param shardId the shard ID
554554
* @param allocationId the allocation ID
555555
* @param indexSettings the index settings
556+
* @param operationPrimaryTerm the current primary term
556557
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbers#UNASSIGNED_SEQ_NO}
557558
* @param onSyncRetentionLeases a callback when a new retention lease is created or an existing retention lease expires
558559
*/
559560
public ReplicationTracker(
560561
final ShardId shardId,
561562
final String allocationId,
562563
final IndexSettings indexSettings,
564+
final long operationPrimaryTerm,
563565
final long globalCheckpoint,
564566
final LongConsumer onGlobalCheckpointUpdated,
565567
final LongSupplier currentTimeMillisSupplier,
@@ -568,6 +570,7 @@ public ReplicationTracker(
568570
assert globalCheckpoint >= SequenceNumbers.UNASSIGNED_SEQ_NO : "illegal initial global checkpoint: " + globalCheckpoint;
569571
this.shardAllocationId = allocationId;
570572
this.primaryMode = false;
573+
this.operationPrimaryTerm = operationPrimaryTerm;
571574
this.handoffInProgress = false;
572575
this.appliedClusterStateVersion = -1L;
573576
this.checkpoints = new HashMap<>(1 + indexSettings.getNumberOfReplicas());

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -306,13 +306,16 @@ public IndexShard(
306306
this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP);
307307
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays);
308308
final String aId = shardRouting.allocationId().getId();
309+
final long primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
310+
this.pendingPrimaryTerm = primaryTerm;
309311
this.globalCheckpointListeners =
310312
new GlobalCheckpointListeners(shardId, threadPool.executor(ThreadPool.Names.LISTENER), threadPool.scheduler(), logger);
311313
final ReplicationTracker replicationTracker =
312314
new ReplicationTracker(
313315
shardId,
314316
aId,
315317
indexSettings,
318+
primaryTerm,
316319
UNASSIGNED_SEQ_NO,
317320
globalCheckpointListeners::globalCheckpointUpdated,
318321
threadPool::absoluteTimeInMillis,
@@ -337,9 +340,6 @@ public boolean shouldCache(Query query) {
337340
}
338341
indexShardOperationPermits = new IndexShardOperationPermits(shardId, threadPool);
339342
searcherWrapper = indexSearcherWrapper;
340-
final long primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
341-
this.pendingPrimaryTerm = primaryTerm;
342-
replicationTracker.setOperationPrimaryTerm(primaryTerm);
343343
refreshListeners = buildRefreshListeners();
344344
lastSearcherAccess.set(threadPool.relativeTimeInMillis());
345345
persistMetadata(path, indexSettings, shardRouting, null, logger);

server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerRetentionLeaseTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public void testAddOrRenewRetentionLease() {
5353
new ShardId("test", "_na", 0),
5454
allocationId.getId(),
5555
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
56+
randomNonNegativeLong(),
5657
UNASSIGNED_SEQ_NO,
5758
value -> {},
5859
() -> 0L,
@@ -88,6 +89,7 @@ public void testAddRetentionLeaseCausesRetentionLeaseSync() {
8889
new ShardId("test", "_na", 0),
8990
allocationId.getId(),
9091
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
92+
randomNonNegativeLong(),
9193
UNASSIGNED_SEQ_NO,
9294
value -> {},
9395
() -> 0L,
@@ -143,6 +145,7 @@ private void runExpirationTest(final boolean primaryMode) {
143145
new ShardId("test", "_na", 0),
144146
allocationId.getId(),
145147
IndexSettingsModule.newIndexSettings("test", settings),
148+
randomNonNegativeLong(),
146149
UNASSIGNED_SEQ_NO,
147150
value -> {},
148151
currentTimeMillis::get,
@@ -215,6 +218,7 @@ public void testRetentionLeaseExpirationCausesRetentionLeaseSync() {
215218
new ShardId("test", "_na", 0),
216219
allocationId.getId(),
217220
IndexSettingsModule.newIndexSettings("test", settings),
221+
randomNonNegativeLong(),
218222
UNASSIGNED_SEQ_NO,
219223
value -> {},
220224
currentTimeMillis::get,

server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTestCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ ReplicationTracker newTracker(
4545
new ShardId("test", "_na_", 0),
4646
allocationId.getId(),
4747
IndexSettingsModule.newIndexSettings("test", Settings.EMPTY),
48+
randomNonNegativeLong(),
4849
UNASSIGNED_SEQ_NO,
4950
updatedGlobalCheckpoint,
5051
currentTimeMillisSupplier,

server/src/test/java/org/elasticsearch/index/seqno/ReplicationTrackerTests.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -683,15 +683,16 @@ public void testPrimaryContextHandoff() throws IOException {
683683
final ShardId shardId = new ShardId("test", "_na_", 0);
684684

685685
FakeClusterState clusterState = initialState();
686-
final AllocationId primaryAllocationId = clusterState.routingTable.primaryShard().allocationId();
686+
final AllocationId aId = clusterState.routingTable.primaryShard().allocationId();
687687
final LongConsumer onUpdate = updatedGlobalCheckpoint -> {};
688+
final long primaryTerm = randomNonNegativeLong();
688689
final long globalCheckpoint = UNASSIGNED_SEQ_NO;
689690
final BiConsumer<Collection<RetentionLease>, ActionListener<ReplicationResponse>> onNewRetentionLease =
690691
(leases, listener) -> {};
691692
ReplicationTracker oldPrimary = new ReplicationTracker(
692-
shardId, primaryAllocationId.getId(), indexSettings, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease);
693+
shardId, aId.getId(), indexSettings, primaryTerm, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease);
693694
ReplicationTracker newPrimary = new ReplicationTracker(
694-
shardId, primaryAllocationId.getRelocationId(), indexSettings, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease);
695+
shardId, aId.getRelocationId(), indexSettings, primaryTerm, globalCheckpoint, onUpdate, () -> 0L, onNewRetentionLease);
695696

696697
Set<String> allocationIds = new HashSet<>(Arrays.asList(oldPrimary.shardAllocationId, newPrimary.shardAllocationId));
697698

test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,7 @@ public EngineConfig config(
655655
shardId,
656656
allocationId.getId(),
657657
indexSettings,
658+
randomNonNegativeLong(),
658659
SequenceNumbers.NO_OPS_PERFORMED,
659660
update -> {},
660661
() -> 0L,

0 commit comments

Comments
 (0)