@@ -2931,39 +2931,106 @@ public void testSequenceIDs() throws Exception {
2931
2931
searchResult .close ();
2932
2932
}
2933
2933
2934
- public void testSequenceNumberAdvancesToMaxSeqNoOnEngineOpen () throws IOException {
2934
+ public void testSequenceNumberAdvancesToMaxSeqOnEngineOpenOnPrimary () throws BrokenBarrierException , InterruptedException , IOException {
2935
2935
engine .close ();
2936
- final long maxSeqNo = randomIntBetween (16 , 32 );
2937
- final long localCheckpoint =
2938
- rarely () ? SequenceNumbersService .NO_OPS_PERFORMED : randomIntBetween (0 , Math .toIntExact (maxSeqNo ));
2939
- final long globalCheckpoint =
2940
- localCheckpoint == SequenceNumbersService .NO_OPS_PERFORMED || rarely ()
2941
- ? SequenceNumbersService .UNASSIGNED_SEQ_NO
2942
- : Math .toIntExact (localCheckpoint );
2943
- Engine initialEngine = null ;
2936
+ final int docs = randomIntBetween (1 , 32 );
2937
+ InternalEngine initialEngine = null ;
2944
2938
try {
2945
- initialEngine = createEngine (
2946
- defaultSettings ,
2947
- store ,
2948
- primaryTranslogDir ,
2949
- newMergePolicy (),
2950
- null ,
2951
- () -> new SequenceNumbersService (shardId , defaultSettings , maxSeqNo , localCheckpoint , globalCheckpoint ) {
2939
+ final CountDownLatch latch = new CountDownLatch (1 );
2940
+ final CyclicBarrier barrier = new CyclicBarrier (2 );
2941
+ final AtomicBoolean skip = new AtomicBoolean ();
2942
+ final AtomicLong expectedLocalCheckpoint = new AtomicLong (SequenceNumbersService .NO_OPS_PERFORMED );
2943
+ final List <Thread > threads = new ArrayList <>();
2944
+ final SequenceNumbersService seqNoService =
2945
+ new SequenceNumbersService (
2946
+ shardId ,
2947
+ defaultSettings ,
2948
+ SequenceNumbersService .NO_OPS_PERFORMED ,
2949
+ SequenceNumbersService .NO_OPS_PERFORMED ,
2950
+ SequenceNumbersService .UNASSIGNED_SEQ_NO ) {
2952
2951
@ Override
2953
2952
public long generateSeqNo () {
2954
- if (rarely ()) {
2955
- // force skipping a sequence number
2956
- super .generateSeqNo ();
2953
+ final long seqNo = super .generateSeqNo ();
2954
+ if (skip .get ()) {
2955
+ try {
2956
+ barrier .await ();
2957
+ latch .await ();
2958
+ } catch (BrokenBarrierException | InterruptedException e ) {
2959
+ throw new RuntimeException (e );
2960
+ }
2961
+ } else {
2962
+ if (expectedLocalCheckpoint .get () + 1 == seqNo ) {
2963
+ expectedLocalCheckpoint .set (seqNo );
2964
+ }
2957
2965
}
2958
- return super . generateSeqNo () ;
2966
+ return seqNo ;
2959
2967
}
2960
- });
2968
+ };
2969
+ initialEngine = createEngine (defaultSettings , store , primaryTranslogDir , newMergePolicy (), null , () -> seqNoService );
2970
+ final InternalEngine finalInitialEngine = initialEngine ;
2971
+ for (int i = 0 ; i < docs ; i ++) {
2972
+ final String id = Integer .toString (i );
2973
+ final Term uid = newUid (id );
2974
+ final ParsedDocument doc = testParsedDocument (id , id , "test" , null , testDocumentWithTextField (), SOURCE , null );
2975
+
2976
+ skip .set (randomBoolean ());
2977
+ final Thread thread = new Thread (() -> finalInitialEngine .index (new Engine .Index (uid , doc )));
2978
+ thread .start ();
2979
+ if (skip .get ()) {
2980
+ threads .add (thread );
2981
+ barrier .await ();
2982
+ } else {
2983
+ thread .join ();
2984
+ }
2985
+ }
2986
+
2987
+ assertThat (initialEngine .seqNoService ().getLocalCheckpoint (), equalTo (expectedLocalCheckpoint .get ()));
2988
+ assertThat (initialEngine .seqNoService ().getMaxSeqNo (), equalTo ((long ) (docs - 1 )));
2989
+ initialEngine .flush (true , true );
2990
+
2991
+ latch .countDown ();
2992
+ for (final Thread thread : threads ) {
2993
+ thread .join ();
2994
+ }
2995
+ } finally {
2996
+ IOUtils .close (initialEngine );
2997
+ }
2998
+
2999
+ try (final Engine recoveringEngine =
3000
+ new InternalEngine (copy (initialEngine .config (), EngineConfig .OpenMode .OPEN_INDEX_AND_TRANSLOG ))) {
3001
+ // there might be gaps in the translog; the best that we can say is that the local checkpoint is at least as far the max
3002
+ // sequence number
3003
+ assertThat (recoveringEngine .seqNoService ().getLocalCheckpoint (), greaterThanOrEqualTo ((long ) (docs - 1 )));
3004
+ }
3005
+ }
3006
+
3007
+ public void testSequenceNumberAdvancesToMaxSeqNoOnEngineOpenOnReplica () throws IOException {
3008
+ final long v = Versions .MATCH_ANY ;
3009
+ final VersionType t = VersionType .INTERNAL ;
3010
+ final long ts = IndexRequest .UNSET_AUTO_GENERATED_TIMESTAMP ;
3011
+ final int docs = randomIntBetween (1 , 32 );
3012
+ InternalEngine initialEngine = null ;
3013
+ try {
3014
+ initialEngine = engine ;
3015
+ for (int i = 0 ; i < docs ; i ++) {
3016
+ final String id = Integer .toString (i );
3017
+ final Term uid = newUid (id );
3018
+ final ParsedDocument doc = testParsedDocument (id , id , "test" , null , testDocumentWithTextField (), SOURCE , null );
3019
+ // create a gap at sequence number 3 * i + 1
3020
+ initialEngine .index (new Engine .Index (uid , doc , 3 * i , 1 , v , t , REPLICA , System .nanoTime (), ts , false ));
3021
+ initialEngine .delete (new Engine .Delete ("type" , id , uid , 3 * i + 2 , 1 , v , t , REPLICA , System .nanoTime ()));
3022
+ }
3023
+
3024
+ // bake the commit with the local checkpoint stuck at 0 and gaps all along the way up to the max sequence number
3025
+ assertThat (initialEngine .seqNoService ().getLocalCheckpoint (), equalTo ((long ) 0 ));
3026
+ assertThat (initialEngine .seqNoService ().getMaxSeqNo (), equalTo ((long ) (3 * (docs - 1 ) + 2 )));
2961
3027
initialEngine .flush (true , true );
2962
- final int docs = randomIntBetween ( 1 , 32 );
3028
+
2963
3029
for (int i = 0 ; i < docs ; i ++) {
2964
3030
final String id = Integer .toString (i );
3031
+ final Term uid = newUid (id );
2965
3032
final ParsedDocument doc = testParsedDocument (id , id , "test" , null , testDocumentWithTextField (), SOURCE , null );
2966
- initialEngine .index (new Engine .Index (newUid ( id ), doc ));
3033
+ initialEngine .index (new Engine .Index (uid , doc , 3 * i + 1 , 1 , v , t , REPLICA , System . nanoTime ( ), ts , false ));
2967
3034
}
2968
3035
} finally {
2969
3036
IOUtils .close (initialEngine );
@@ -2973,8 +3040,7 @@ public long generateSeqNo() {
2973
3040
new InternalEngine (copy (initialEngine .config (), EngineConfig .OpenMode .OPEN_INDEX_AND_TRANSLOG ))) {
2974
3041
// there might be gaps in the translog; the best that we can say is that the local checkpoint is at least as far the max
2975
3042
// sequence number
2976
- // TODO: tighten this assertion after gaps in the translog are addressed
2977
- assertThat (recoveringEngine .seqNoService ().getLocalCheckpoint (), greaterThanOrEqualTo (maxSeqNo ));
3043
+ assertThat (recoveringEngine .seqNoService ().getLocalCheckpoint (), greaterThanOrEqualTo ((long ) (3 * docs + 2 - 1 )));
2978
3044
}
2979
3045
}
2980
3046
0 commit comments