Skip to content

Commit 05bf9dc

Browse files
authored
Add contains method to LocalCheckpointTracker (#33871)
This change adds "contains" method to LocalCheckpointTracker. One of the use cases is to check if a given operation has been processed in an engine or not by looking up its seq_no in LocalCheckpointTracker. Relates #33656
1 parent 4767a01 commit 05bf9dc

File tree

2 files changed

+65
-6
lines changed

2 files changed

+65
-6
lines changed

server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,25 @@ public synchronized void waitForOpsToComplete(final long seqNo) throws Interrupt
158158
}
159159
}
160160

161+
/**
162+
* Checks if the given sequence number was marked as completed in this tracker.
163+
*/
164+
public boolean contains(final long seqNo) {
165+
assert seqNo >= 0 : "invalid seq_no=" + seqNo;
166+
if (seqNo >= nextSeqNo) {
167+
return false;
168+
}
169+
if (seqNo <= checkpoint) {
170+
return true;
171+
}
172+
final long bitSetKey = getBitSetKey(seqNo);
173+
final CountedBitSet bitSet;
174+
synchronized (this) {
175+
bitSet = processedSeqNo.get(bitSetKey);
176+
}
177+
return bitSet != null && bitSet.get(seqNoToBitSetOffset(seqNo));
178+
}
179+
161180
/**
162181
* Moves the checkpoint to the last consecutively processed sequence number. This method assumes that the sequence number following the
163182
* current checkpoint is processed.
@@ -206,7 +225,6 @@ private long lastSeqNoInBitSet(final long bitSetKey) {
206225
* @return the bit set corresponding to the provided sequence number
207226
*/
208227
private long getBitSetKey(final long seqNo) {
209-
assert Thread.holdsLock(this);
210228
return seqNo / BIT_SET_SIZE;
211229
}
212230

@@ -232,7 +250,6 @@ private CountedBitSet getBitSetForSeqNo(final long seqNo) {
232250
* @return the position in the bit set corresponding to the provided sequence number
233251
*/
234252
private int seqNoToBitSetOffset(final long seqNo) {
235-
assert Thread.holdsLock(this);
236253
return Math.toIntExact(seqNo % BIT_SET_SIZE);
237254
}
238255

server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,52 +65,71 @@ public void testSimplePrimary() {
6565
assertThat(seqNo1, equalTo(0L));
6666
tracker.markSeqNoAsCompleted(seqNo1);
6767
assertThat(tracker.getCheckpoint(), equalTo(0L));
68+
assertThat(tracker.contains(0L), equalTo(true));
69+
assertThat(tracker.contains(atLeast(1)), equalTo(false));
6870
seqNo1 = tracker.generateSeqNo();
6971
seqNo2 = tracker.generateSeqNo();
7072
assertThat(seqNo1, equalTo(1L));
7173
assertThat(seqNo2, equalTo(2L));
7274
tracker.markSeqNoAsCompleted(seqNo2);
7375
assertThat(tracker.getCheckpoint(), equalTo(0L));
76+
assertThat(tracker.contains(seqNo1), equalTo(false));
77+
assertThat(tracker.contains(seqNo2), equalTo(true));
7478
tracker.markSeqNoAsCompleted(seqNo1);
7579
assertThat(tracker.getCheckpoint(), equalTo(2L));
80+
assertThat(tracker.contains(between(0, 2)), equalTo(true));
81+
assertThat(tracker.contains(atLeast(3)), equalTo(false));
7682
}
7783

7884
public void testSimpleReplica() {
7985
assertThat(tracker.getCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
86+
assertThat(tracker.contains(randomNonNegativeLong()), equalTo(false));
8087
tracker.markSeqNoAsCompleted(0L);
8188
assertThat(tracker.getCheckpoint(), equalTo(0L));
89+
assertThat(tracker.contains(0), equalTo(true));
8290
tracker.markSeqNoAsCompleted(2L);
8391
assertThat(tracker.getCheckpoint(), equalTo(0L));
92+
assertThat(tracker.contains(1L), equalTo(false));
93+
assertThat(tracker.contains(2L), equalTo(true));
8494
tracker.markSeqNoAsCompleted(1L);
8595
assertThat(tracker.getCheckpoint(), equalTo(2L));
96+
assertThat(tracker.contains(between(0, 2)), equalTo(true));
97+
assertThat(tracker.contains(atLeast(3)), equalTo(false));
8698
}
8799

88100
public void testLazyInitialization() {
89101
/*
90102
* Previously this would allocate the entire chain of bit sets to the one for the sequence number being marked; for very large
91103
* sequence numbers this could lead to excessive memory usage resulting in out of memory errors.
92104
*/
93-
tracker.markSeqNoAsCompleted(randomNonNegativeLong());
105+
long seqNo = randomNonNegativeLong();
106+
tracker.markSeqNoAsCompleted(seqNo);
107+
assertThat(tracker.processedSeqNo.size(), equalTo(1));
108+
assertThat(tracker.contains(seqNo), equalTo(true));
109+
assertThat(tracker.contains(randomValueOtherThan(seqNo, ESTestCase::randomNonNegativeLong)), equalTo(false));
94110
assertThat(tracker.processedSeqNo.size(), equalTo(1));
95111
}
96112

97113
public void testSimpleOverFlow() {
98-
List<Integer> seqNoList = new ArrayList<>();
114+
List<Long> seqNoList = new ArrayList<>();
99115
final boolean aligned = randomBoolean();
100116
final int maxOps = BIT_SET_SIZE * randomIntBetween(1, 5) + (aligned ? 0 : randomIntBetween(1, BIT_SET_SIZE - 1));
101117

102-
for (int i = 0; i < maxOps; i++) {
118+
for (long i = 0; i < maxOps; i++) {
103119
seqNoList.add(i);
104120
}
105121
Collections.shuffle(seqNoList, random());
106-
for (Integer seqNo : seqNoList) {
122+
for (Long seqNo : seqNoList) {
107123
tracker.markSeqNoAsCompleted(seqNo);
108124
}
109125
assertThat(tracker.checkpoint, equalTo(maxOps - 1L));
110126
assertThat(tracker.processedSeqNo.size(), equalTo(aligned ? 0 : 1));
111127
if (aligned == false) {
112128
assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / BIT_SET_SIZE));
113129
}
130+
assertThat(tracker.contains(randomFrom(seqNoList)), equalTo(true));
131+
final long notCompletedSeqNo = randomValueOtherThanMany(seqNoList::contains, ESTestCase::randomNonNegativeLong);
132+
assertThat(tracker.contains(notCompletedSeqNo), equalTo(false));
114133
}
115134

116135
public void testConcurrentPrimary() throws InterruptedException {
@@ -199,8 +218,12 @@ protected void doRun() throws Exception {
199218
}
200219
assertThat(tracker.getMaxSeqNo(), equalTo(maxOps - 1L));
201220
assertThat(tracker.getCheckpoint(), equalTo(unFinishedSeq - 1L));
221+
assertThat(tracker.contains(randomValueOtherThan(unFinishedSeq, () -> (long) randomFrom(seqNos))), equalTo(true));
222+
assertThat(tracker.contains(unFinishedSeq), equalTo(false));
202223
tracker.markSeqNoAsCompleted(unFinishedSeq);
203224
assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L));
225+
assertThat(tracker.contains(unFinishedSeq), equalTo(true));
226+
assertThat(tracker.contains(randomLongBetween(maxOps, Long.MAX_VALUE)), equalTo(false));
204227
assertThat(tracker.processedSeqNo.size(), isOneOf(0, 1));
205228
if (tracker.processedSeqNo.size() == 1) {
206229
assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / BIT_SET_SIZE));
@@ -272,4 +295,23 @@ public void describeTo(Description description) {
272295
});
273296
assertThat(tracker.generateSeqNo(), equalTo((long) (maxSeqNo + 1)));
274297
}
298+
299+
public void testContains() {
300+
final long maxSeqNo = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, 100);
301+
final long localCheckpoint = randomLongBetween(SequenceNumbers.NO_OPS_PERFORMED, maxSeqNo);
302+
final LocalCheckpointTracker tracker = new LocalCheckpointTracker(maxSeqNo, localCheckpoint);
303+
if (localCheckpoint >= 0) {
304+
assertThat(tracker.contains(randomLongBetween(0, localCheckpoint)), equalTo(true));
305+
}
306+
assertThat(tracker.contains(randomLongBetween(localCheckpoint + 1, Long.MAX_VALUE)), equalTo(false));
307+
final int numOps = between(1, 100);
308+
final List<Long> seqNos = new ArrayList<>();
309+
for (int i = 0; i < numOps; i++) {
310+
long seqNo = randomLongBetween(0, 1000);
311+
seqNos.add(seqNo);
312+
tracker.markSeqNoAsCompleted(seqNo);
313+
}
314+
final long seqNo = randomNonNegativeLong();
315+
assertThat(tracker.contains(seqNo), equalTo(seqNo <= localCheckpoint || seqNos.contains(seqNo)));
316+
}
275317
}

0 commit comments

Comments
 (0)