@@ -956,18 +956,18 @@ private void finish() {
956
956
// our operation should be blocked until the previous operations complete
957
957
assertFalse (onResponse .get ());
958
958
assertNull (onFailure .get ());
959
- assertThat (indexShard .operationPrimaryTerm , equalTo (primaryTerm ));
959
+ assertThat (indexShard .getOperationPrimaryTerm () , equalTo (primaryTerm ));
960
960
assertThat (TestTranslog .getCurrentTerm (getTranslog (indexShard )), equalTo (primaryTerm ));
961
961
Releasables .close (operation1 );
962
962
// our operation should still be blocked
963
963
assertFalse (onResponse .get ());
964
964
assertNull (onFailure .get ());
965
- assertThat (indexShard .operationPrimaryTerm , equalTo (primaryTerm ));
965
+ assertThat (indexShard .getOperationPrimaryTerm () , equalTo (primaryTerm ));
966
966
assertThat (TestTranslog .getCurrentTerm (getTranslog (indexShard )), equalTo (primaryTerm ));
967
967
Releasables .close (operation2 );
968
968
barrier .await ();
969
969
// now lock acquisition should have succeeded
970
- assertThat (indexShard .operationPrimaryTerm , equalTo (newPrimaryTerm ));
970
+ assertThat (indexShard .getOperationPrimaryTerm () , equalTo (newPrimaryTerm ));
971
971
assertThat (indexShard .getPendingPrimaryTerm (), equalTo (newPrimaryTerm ));
972
972
assertThat (TestTranslog .getCurrentTerm (getTranslog (indexShard )), equalTo (newPrimaryTerm ));
973
973
if (engineClosed ) {
@@ -1008,7 +1008,7 @@ public void onFailure(Exception e) {
1008
1008
}
1009
1009
};
1010
1010
1011
- final long oldPrimaryTerm = indexShard .pendingPrimaryTerm - 1 ;
1011
+ final long oldPrimaryTerm = indexShard .getPendingPrimaryTerm () - 1 ;
1012
1012
randomReplicaOperationPermitAcquisition (indexShard , oldPrimaryTerm , indexShard .getGlobalCheckpoint (),
1013
1013
randomNonNegativeLong (), onLockAcquired , "" );
1014
1014
latch .await ();
@@ -1030,7 +1030,7 @@ public void testAcquireReplicaPermitAdvanceMaxSeqNoOfUpdates() throws Exception
1030
1030
1031
1031
long newMaxSeqNoOfUpdates = randomLongBetween (SequenceNumbers .NO_OPS_PERFORMED , Long .MAX_VALUE );
1032
1032
PlainActionFuture <Releasable > fut = new PlainActionFuture <>();
1033
- randomReplicaOperationPermitAcquisition (replica , replica .operationPrimaryTerm , replica .getGlobalCheckpoint (),
1033
+ randomReplicaOperationPermitAcquisition (replica , replica .getOperationPrimaryTerm () , replica .getGlobalCheckpoint (),
1034
1034
newMaxSeqNoOfUpdates , fut , "" );
1035
1035
try (Releasable ignored = fut .actionGet ()) {
1036
1036
assertThat (replica .getMaxSeqNoOfUpdatesOrDeletes (), equalTo (Math .max (currentMaxSeqNoOfUpdates , newMaxSeqNoOfUpdates )));
@@ -1181,7 +1181,7 @@ public void testRollbackReplicaEngineOnPromotion() throws IOException, Interrupt
1181
1181
final Engine beforeRollbackEngine = indexShard .getEngine ();
1182
1182
final long newMaxSeqNoOfUpdates = randomLongBetween (indexShard .getMaxSeqNoOfUpdatesOrDeletes (), Long .MAX_VALUE );
1183
1183
randomReplicaOperationPermitAcquisition (indexShard ,
1184
- indexShard .pendingPrimaryTerm + 1 ,
1184
+ indexShard .getPendingPrimaryTerm () + 1 ,
1185
1185
globalCheckpoint ,
1186
1186
newMaxSeqNoOfUpdates ,
1187
1187
new ActionListener <Releasable >() {
@@ -2105,10 +2105,6 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception {
2105
2105
new SourceToParse (indexName , "_doc" , "doc-1" , new BytesArray ("{}" ), XContentType .JSON ));
2106
2106
flushShard (shard );
2107
2107
assertThat (getShardDocUIDs (shard ), containsInAnyOrder ("doc-0" , "doc-1" ));
2108
- // Here we try to increase term (i.e. a new primary is promoted) without rolling back a replica so we can keep stale operations
2109
- // in the index commit; then verify that a recovery from store (started with the safe commit) will remove all stale operations.
2110
- shard .pendingPrimaryTerm ++;
2111
- shard .operationPrimaryTerm ++;
2112
2108
shard .getEngine ().rollTranslogGeneration ();
2113
2109
shard .markSeqNoAsNoop (1 , "test" );
2114
2110
shard .applyIndexOperationOnReplica (2 , 1 , IndexRequest .UNSET_AUTO_GENERATED_TIMESTAMP , false ,
@@ -2118,11 +2114,20 @@ public void testRecoverFromStoreRemoveStaleOperations() throws Exception {
2118
2114
closeShard (shard , false );
2119
2115
// Recovering from store should discard doc #1
2120
2116
final ShardRouting replicaRouting = shard .routingEntry ();
2121
- IndexShard newShard = reinitShard (shard ,
2122
- newShardRouting (replicaRouting .shardId (), replicaRouting .currentNodeId (), true , ShardRoutingState .INITIALIZING ,
2123
- RecoverySource .ExistingStoreRecoverySource .INSTANCE ));
2124
- newShard .pendingPrimaryTerm ++;
2125
- newShard .operationPrimaryTerm ++;
2117
+ final IndexMetaData newShardIndexMetadata = IndexMetaData .builder (shard .indexSettings ().getIndexMetaData ())
2118
+ .primaryTerm (replicaRouting .shardId ().id (), shard .getOperationPrimaryTerm () + 1 )
2119
+ .build ();
2120
+ closeShards (shard );
2121
+ IndexShard newShard = newShard (
2122
+ newShardRouting (replicaRouting .shardId (), replicaRouting .currentNodeId (), true , ShardRoutingState .INITIALIZING ,
2123
+ RecoverySource .ExistingStoreRecoverySource .INSTANCE ),
2124
+ shard .shardPath (),
2125
+ newShardIndexMetadata ,
2126
+ null ,
2127
+ null ,
2128
+ shard .getEngineFactory (),
2129
+ shard .getGlobalCheckpointSyncer (),
2130
+ EMPTY_EVENT_LISTENER );
2126
2131
DiscoveryNode localNode = new DiscoveryNode ("foo" , buildNewFakeTransportAddress (), emptyMap (), emptySet (), Version .CURRENT );
2127
2132
newShard .markAsRecovering ("store" , new RecoveryState (newShard .routingEntry (), localNode , null ));
2128
2133
assertTrue (newShard .recoverFromStore ());
0 commit comments