Skip to content

Commit a5302e1

Browse files
committed
Add contains method to LocalCheckpointTracker
This change adds "contains" method to LocalCheckpointTracker. One of the use cases is to check if a given operation has been processed in an engine or not by looking up its seq_no in LocalCheckpointTracker.
1 parent 6ec12be commit a5302e1

File tree

2 files changed

+42
-4
lines changed

2 files changed

+42
-4
lines changed

server/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,21 @@ public synchronized void waitForOpsToComplete(final long seqNo) throws Interrupt
158158
}
159159
}
160160

161+
/**
162+
* Checks if the given sequence number was marked as completed in this tracker.
163+
*/
164+
public synchronized boolean contains(long seqNo) {
165+
if (seqNo >= nextSeqNo) {
166+
return false;
167+
}
168+
if (seqNo <= checkpoint) {
169+
return true;
170+
}
171+
final long bitSetKey = getBitSetKey(seqNo);
172+
final CountedBitSet bitSet = processedSeqNo.get(bitSetKey);
173+
return bitSet != null && bitSet.get(seqNoToBitSetOffset(seqNo));
174+
}
175+
161176
/**
162177
* Moves the checkpoint to the last consecutively processed sequence number. This method assumes that the sequence number following the
163178
* current checkpoint is processed.

server/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,52 +65,71 @@ public void testSimplePrimary() {
6565
assertThat(seqNo1, equalTo(0L));
6666
tracker.markSeqNoAsCompleted(seqNo1);
6767
assertThat(tracker.getCheckpoint(), equalTo(0L));
68+
assertThat(tracker.contains(0L), equalTo(true));
69+
assertThat(tracker.contains(atLeast(1)), equalTo(false));
6870
seqNo1 = tracker.generateSeqNo();
6971
seqNo2 = tracker.generateSeqNo();
7072
assertThat(seqNo1, equalTo(1L));
7173
assertThat(seqNo2, equalTo(2L));
7274
tracker.markSeqNoAsCompleted(seqNo2);
7375
assertThat(tracker.getCheckpoint(), equalTo(0L));
76+
assertThat(tracker.contains(seqNo1), equalTo(false));
77+
assertThat(tracker.contains(seqNo2), equalTo(true));
7478
tracker.markSeqNoAsCompleted(seqNo1);
7579
assertThat(tracker.getCheckpoint(), equalTo(2L));
80+
assertThat(tracker.contains(between(0, 2)), equalTo(true));
81+
assertThat(tracker.contains(atLeast(3)), equalTo(false));
7682
}
7783

7884
public void testSimpleReplica() {
7985
assertThat(tracker.getCheckpoint(), equalTo(SequenceNumbers.NO_OPS_PERFORMED));
86+
assertThat(tracker.contains(randomNonNegativeLong()), equalTo(false));
8087
tracker.markSeqNoAsCompleted(0L);
8188
assertThat(tracker.getCheckpoint(), equalTo(0L));
89+
assertThat(tracker.contains(0), equalTo(true));
8290
tracker.markSeqNoAsCompleted(2L);
8391
assertThat(tracker.getCheckpoint(), equalTo(0L));
92+
assertThat(tracker.contains(1L), equalTo(false));
93+
assertThat(tracker.contains(2L), equalTo(true));
8494
tracker.markSeqNoAsCompleted(1L);
8595
assertThat(tracker.getCheckpoint(), equalTo(2L));
96+
assertThat(tracker.contains(between(0, 2)), equalTo(true));
97+
assertThat(tracker.contains(atLeast(3)), equalTo(false));
8698
}
8799

88100
public void testLazyInitialization() {
89101
/*
90102
* Previously this would allocate the entire chain of bit sets to the one for the sequence number being marked; for very large
91103
* sequence numbers this could lead to excessive memory usage resulting in out of memory errors.
92104
*/
93-
tracker.markSeqNoAsCompleted(randomNonNegativeLong());
105+
long seqNo = randomNonNegativeLong();
106+
tracker.markSeqNoAsCompleted(seqNo);
107+
assertThat(tracker.processedSeqNo.size(), equalTo(1));
108+
assertThat(tracker.contains(seqNo), equalTo(true));
109+
assertThat(tracker.contains(randomValueOtherThan(seqNo, ESTestCase::randomNonNegativeLong)), equalTo(false));
94110
assertThat(tracker.processedSeqNo.size(), equalTo(1));
95111
}
96112

97113
public void testSimpleOverFlow() {
98-
List<Integer> seqNoList = new ArrayList<>();
114+
List<Long> seqNoList = new ArrayList<>();
99115
final boolean aligned = randomBoolean();
100116
final int maxOps = BIT_SET_SIZE * randomIntBetween(1, 5) + (aligned ? 0 : randomIntBetween(1, BIT_SET_SIZE - 1));
101117

102-
for (int i = 0; i < maxOps; i++) {
118+
for (long i = 0; i < maxOps; i++) {
103119
seqNoList.add(i);
104120
}
105121
Collections.shuffle(seqNoList, random());
106-
for (Integer seqNo : seqNoList) {
122+
for (Long seqNo : seqNoList) {
107123
tracker.markSeqNoAsCompleted(seqNo);
108124
}
109125
assertThat(tracker.checkpoint, equalTo(maxOps - 1L));
110126
assertThat(tracker.processedSeqNo.size(), equalTo(aligned ? 0 : 1));
111127
if (aligned == false) {
112128
assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / BIT_SET_SIZE));
113129
}
130+
assertThat(tracker.contains(randomFrom(seqNoList)), equalTo(true));
131+
final long notCompletedSeqNo = randomValueOtherThanMany(seqNoList::contains, ESTestCase::randomNonNegativeLong);
132+
assertThat(tracker.contains(notCompletedSeqNo), equalTo(false));
114133
}
115134

116135
public void testConcurrentPrimary() throws InterruptedException {
@@ -199,8 +218,12 @@ protected void doRun() throws Exception {
199218
}
200219
assertThat(tracker.getMaxSeqNo(), equalTo(maxOps - 1L));
201220
assertThat(tracker.getCheckpoint(), equalTo(unFinishedSeq - 1L));
221+
assertThat(tracker.contains(randomValueOtherThan(unFinishedSeq, () -> (long) randomFrom(seqNos))), equalTo(true));
222+
assertThat(tracker.contains(unFinishedSeq), equalTo(false));
202223
tracker.markSeqNoAsCompleted(unFinishedSeq);
203224
assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L));
225+
assertThat(tracker.contains(unFinishedSeq), equalTo(true));
226+
assertThat(tracker.contains(randomLongBetween(maxOps, Long.MAX_VALUE)), equalTo(false));
204227
assertThat(tracker.processedSeqNo.size(), isOneOf(0, 1));
205228
if (tracker.processedSeqNo.size() == 1) {
206229
assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / BIT_SET_SIZE));

0 commit comments

Comments
 (0)