147
147
import java .util .concurrent .atomic .AtomicLong ;
148
148
import java .util .concurrent .atomic .AtomicReference ;
149
149
import java .util .function .BiFunction ;
150
+ import java .util .function .Function ;
150
151
import java .util .function .LongSupplier ;
151
152
import java .util .function .Supplier ;
152
153
import java .util .function .ToLongBiFunction ;
@@ -1529,32 +1530,101 @@ public void testVersionOnPrimaryWithConcurrentRefresh() throws Exception {
1529
1530
}
1530
1531
});
1531
1532
refreshThread .start ();
1532
- latch .await ();
1533
- assertOpsOnPrimary (ops , Versions .NOT_FOUND , true , engine );
1534
- running .set (false );
1535
- refreshThread .join ();
1533
+ try {
1534
+ latch .await ();
1535
+ assertOpsOnPrimary (ops , Versions .NOT_FOUND , true , engine );
1536
+ } finally {
1537
+ running .set (false );
1538
+ refreshThread .join ();
1539
+ }
1536
1540
}
1537
1541
1538
- private int assertOpsOnPrimary (List <Engine .Operation > ops , long currentOpVersion , boolean docDeleted , InternalEngine engine )
1542
+ private int assertOpsOnPrimary (List <Engine .Operation > ops ,
1543
+ long currentOpVersion ,
1544
+ boolean docDeleted ,
1545
+ InternalEngine engine )
1539
1546
throws IOException {
1540
1547
String lastFieldValue = null ;
1541
1548
int opsPerformed = 0 ;
1542
1549
long lastOpVersion = currentOpVersion ;
1543
1550
long lastOpSeqNo = UNASSIGNED_SEQ_NO ;
1544
- long lastOpTerm = 0 ;
1545
- final AtomicLong currentTerm = new AtomicLong (1 );
1546
- BiFunction <Long , Engine .Index , Engine .Index > indexWithVersion = (version , index ) -> new Engine .Index (index .uid (), index .parsedDoc (),
1547
- UNASSIGNED_SEQ_NO , currentTerm .get (), version , index .versionType (), index .origin (), index .startTime (),
1548
- index .getAutoGeneratedIdTimestamp (), index .isRetry (), UNASSIGNED_SEQ_NO , 0 );
1549
- BiFunction <Long , Engine .Delete , Engine .Delete > delWithVersion = (version , delete ) -> new Engine .Delete (delete .type (), delete .id (),
1550
- delete .uid (), UNASSIGNED_SEQ_NO , currentTerm .get (), version , delete .versionType (), delete .origin (), delete .startTime (),
1551
- UNASSIGNED_SEQ_NO , 0 );
1552
- TriFunction <Long , Long , Engine .Index , Engine .Index > indexWithSeq = (seqNo , term , index ) -> new Engine .Index (index .uid (),
1553
- index .parsedDoc (), UNASSIGNED_SEQ_NO , currentTerm .get (), index .version (), index .versionType (), index .origin (),
1554
- index .startTime (), index .getAutoGeneratedIdTimestamp (), index .isRetry (), seqNo , term );
1555
- TriFunction <Long , Long , Engine .Delete , Engine .Delete > delWithSeq = (seqNo , term , delete ) -> new Engine .Delete (delete .type (),
1556
- delete .id (), delete .uid (), UNASSIGNED_SEQ_NO , currentTerm .get (), delete .version (), delete .versionType (), delete .origin (),
1557
- delete .startTime (), seqNo , term );
1551
+ long lastOpTerm = UNASSIGNED_PRIMARY_TERM ;
1552
+ PrimaryTermSupplier currentTerm = (PrimaryTermSupplier ) engine .engineConfig .getPrimaryTermSupplier ();
1553
+ BiFunction <Long , Engine .Index , Engine .Index > indexWithVersion = (version , index ) -> new Engine .Index (
1554
+ index .uid (),
1555
+ index .parsedDoc (),
1556
+ UNASSIGNED_SEQ_NO ,
1557
+ currentTerm .get (),
1558
+ version ,
1559
+ index .versionType (),
1560
+ index .origin (),
1561
+ index .startTime (),
1562
+ index .getAutoGeneratedIdTimestamp (),
1563
+ index .isRetry (),
1564
+ UNASSIGNED_SEQ_NO ,
1565
+ 0 );
1566
+ BiFunction <Long , Engine .Delete , Engine .Delete > delWithVersion = (version , delete ) -> new Engine .Delete (
1567
+ delete .type (),
1568
+ delete .id (),
1569
+ delete .uid (),
1570
+ UNASSIGNED_SEQ_NO ,
1571
+ currentTerm .get (),
1572
+ version ,
1573
+ delete .versionType (),
1574
+ delete .origin (),
1575
+ delete .startTime (),
1576
+ UNASSIGNED_SEQ_NO ,
1577
+ 0 );
1578
+ TriFunction <Long , Long , Engine .Index , Engine .Index > indexWithSeq = (seqNo , term , index ) -> new Engine .Index (
1579
+ index .uid (),
1580
+ index .parsedDoc (),
1581
+ UNASSIGNED_SEQ_NO ,
1582
+ currentTerm .get (),
1583
+ index .version (),
1584
+ index .versionType (),
1585
+ index .origin (),
1586
+ index .startTime (),
1587
+ index .getAutoGeneratedIdTimestamp (),
1588
+ index .isRetry (),
1589
+ seqNo ,
1590
+ term );
1591
+ TriFunction <Long , Long , Engine .Delete , Engine .Delete > delWithSeq = (seqNo , term , delete ) -> new Engine .Delete (
1592
+ delete .type (),
1593
+ delete .id (),
1594
+ delete .uid (),
1595
+ UNASSIGNED_SEQ_NO ,
1596
+ currentTerm .get (),
1597
+ delete .version (),
1598
+ delete .versionType (),
1599
+ delete .origin (),
1600
+ delete .startTime (),
1601
+ seqNo ,
1602
+ term );
1603
+ Function <Engine .Index , Engine .Index > indexWithCurrentTerm = index -> new Engine .Index (
1604
+ index .uid (),
1605
+ index .parsedDoc (),
1606
+ UNASSIGNED_SEQ_NO ,
1607
+ currentTerm .get (),
1608
+ index .version (),
1609
+ index .versionType (),
1610
+ index .origin (),
1611
+ index .startTime (),
1612
+ index .getAutoGeneratedIdTimestamp (),
1613
+ index .isRetry (),
1614
+ index .getIfSeqNoMatch (),
1615
+ index .getIfPrimaryTermMatch ());
1616
+ Function <Engine .Delete , Engine .Delete > deleteWithCurrentTerm = delete -> new Engine .Delete (
1617
+ delete .type (),
1618
+ delete .id (),
1619
+ delete .uid (),
1620
+ UNASSIGNED_SEQ_NO ,
1621
+ currentTerm .get (),
1622
+ delete .version (),
1623
+ delete .versionType (),
1624
+ delete .origin (),
1625
+ delete .startTime (),
1626
+ delete .getIfSeqNoMatch (),
1627
+ delete .getIfPrimaryTermMatch ());
1558
1628
for (Engine .Operation op : ops ) {
1559
1629
final boolean versionConflict = rarely ();
1560
1630
final boolean versionedOp = versionConflict || randomBoolean ();
@@ -1566,7 +1636,8 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
1566
1636
lastOpSeqNo ;
1567
1637
final long conflictingTerm = conflictingSeqNo == lastOpSeqNo || randomBoolean () ? lastOpTerm + 1 : lastOpTerm ;
1568
1638
if (rarely ()) {
1569
- currentTerm .incrementAndGet ();
1639
+ currentTerm .set (currentTerm .get () + 1L );
1640
+ engine .rollTranslogGeneration ();
1570
1641
}
1571
1642
final long correctVersion = docDeleted && randomBoolean () ? Versions .MATCH_DELETED : lastOpVersion ;
1572
1643
logger .info ("performing [{}]{}{}" ,
@@ -1597,7 +1668,7 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
1597
1668
result = engine .index (indexWithVersion .apply (correctVersion , index ));
1598
1669
}
1599
1670
} else {
1600
- result = engine .index (index );
1671
+ result = engine .index (indexWithCurrentTerm . apply ( index ) );
1601
1672
}
1602
1673
assertThat (result .isCreated (), equalTo (docDeleted ));
1603
1674
assertThat (result .getVersion (), equalTo (Math .max (lastOpVersion + 1 , 1 )));
@@ -1631,16 +1702,16 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
1631
1702
} else if (versionedOp ) {
1632
1703
result = engine .delete (delWithVersion .apply (correctVersion , delete ));
1633
1704
} else {
1634
- result = engine .delete (delete );
1705
+ result = engine .delete (deleteWithCurrentTerm . apply ( delete ) );
1635
1706
}
1636
1707
assertThat (result .isFound (), equalTo (docDeleted == false ));
1637
1708
assertThat (result .getVersion (), equalTo (Math .max (lastOpVersion + 1 , 1 )));
1638
1709
assertThat (result .getResultType (), equalTo (Engine .Result .Type .SUCCESS ));
1639
1710
assertThat (result .getFailure (), nullValue ());
1640
1711
docDeleted = true ;
1641
1712
lastOpVersion = result .getVersion ();
1642
- lastOpSeqNo = UNASSIGNED_SEQ_NO ;
1643
- lastOpTerm = 0 ;
1713
+ lastOpSeqNo = result . getSeqNo () ;
1714
+ lastOpTerm = result . getTerm () ;
1644
1715
opsPerformed ++;
1645
1716
}
1646
1717
}
@@ -1668,6 +1739,8 @@ private int assertOpsOnPrimary(List<Engine.Operation> ops, long currentOpVersion
1668
1739
engine .clearDeletedTombstones ();
1669
1740
if (docDeleted ) {
1670
1741
lastOpVersion = Versions .NOT_FOUND ;
1742
+ lastOpSeqNo = UNASSIGNED_SEQ_NO ;
1743
+ lastOpTerm = UNASSIGNED_PRIMARY_TERM ;
1671
1744
}
1672
1745
}
1673
1746
}
0 commit comments