Skip to content

Commit 59657ad

Browse files
authored
Lazy initialize checkpoint tracker bit sets
This local checkpoint tracker uses collections of bit sets to track which sequence numbers are complete, eventually removing these bit sets when the local checkpoint advances. However, these bit sets were eagerly allocated so that if a sequence number far ahead of the checkpoint was marked as completed, all bit sets between the "last" bit set and the bit set needed to track the marked sequence number were allocated. If this sequence number was too far ahead, the memory requirements could be excessive. This commit opts for a different strategy for holding on to these bit sets and enables them to be lazily allocated. Relates #27179
1 parent 90d6317 commit 59657ad

File tree

2 files changed

+84
-53
lines changed

2 files changed

+84
-53
lines changed

core/src/main/java/org/elasticsearch/index/seqno/LocalCheckpointTracker.java

Lines changed: 48 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,9 @@
1919

2020
package org.elasticsearch.index.seqno;
2121

22+
import com.carrotsearch.hppc.LongObjectHashMap;
2223
import org.apache.lucene.util.FixedBitSet;
2324
import org.elasticsearch.common.SuppressForbidden;
24-
import org.elasticsearch.common.settings.Setting;
25-
import org.elasticsearch.index.IndexSettings;
26-
27-
import java.util.LinkedList;
2825

2926
/**
3027
* This class generates sequences numbers and keeps track of the so-called "local checkpoint" which is the highest number for which all
@@ -33,21 +30,16 @@
3330
public class LocalCheckpointTracker {
3431

3532
/**
36-
* We keep a bit for each sequence number that is still pending. To optimize allocation, we do so in multiple arrays allocating them on
37-
* demand and cleaning up while completed. This constant controls the size of the arrays.
33+
* We keep a bit for each sequence number that is still pending. To optimize allocation, we do so in multiple sets allocating them on
34+
* demand and cleaning up while completed. This constant controls the size of the sets.
3835
*/
39-
static final int BIT_ARRAYS_SIZE = 1024;
36+
static final int BIT_SET_SIZE = 1024;
4037

4138
/**
42-
* An ordered list of bit arrays representing pending sequence numbers. The list is "anchored" in {@link #firstProcessedSeqNo} which
43-
* marks the sequence number the fist bit in the first array corresponds to.
39+
* A collection of bit sets representing pending sequence numbers. Each sequence number is mapped to a bit set by dividing by the
40+
* bit set size.
4441
*/
45-
final LinkedList<FixedBitSet> processedSeqNo = new LinkedList<>();
46-
47-
/**
48-
* The sequence number that the first bit in the first array corresponds to.
49-
*/
50-
long firstProcessedSeqNo;
42+
final LongObjectHashMap<FixedBitSet> processedSeqNo = new LongObjectHashMap<>();
5143

5244
/**
5345
* The current local checkpoint, i.e., all sequence numbers no more than this number have been completed.
@@ -77,7 +69,6 @@ public LocalCheckpointTracker(final long maxSeqNo, final long localCheckpoint) {
7769
throw new IllegalArgumentException(
7870
"max seq. no. must be non-negative or [" + SequenceNumbers.NO_OPS_PERFORMED + "] but was [" + maxSeqNo + "]");
7971
}
80-
firstProcessedSeqNo = localCheckpoint == SequenceNumbers.NO_OPS_PERFORMED ? 0 : localCheckpoint + 1;
8172
nextSeqNo = maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED ? 0 : maxSeqNo + 1;
8273
checkpoint = localCheckpoint;
8374
}
@@ -122,7 +113,6 @@ synchronized void resetCheckpoint(final long checkpoint) {
122113
assert checkpoint != SequenceNumbers.UNASSIGNED_SEQ_NO;
123114
assert checkpoint <= this.checkpoint;
124115
processedSeqNo.clear();
125-
firstProcessedSeqNo = checkpoint + 1;
126116
this.checkpoint = checkpoint;
127117
}
128118

@@ -175,24 +165,28 @@ synchronized void waitForOpsToComplete(final long seqNo) throws InterruptedExcep
175165
@SuppressForbidden(reason = "Object#notifyAll")
176166
private void updateCheckpoint() {
177167
assert Thread.holdsLock(this);
178-
assert checkpoint < firstProcessedSeqNo + BIT_ARRAYS_SIZE - 1 :
179-
"checkpoint should be below the end of the first bit set (o.w. current bit set is completed and shouldn't be there)";
180-
assert getBitSetForSeqNo(checkpoint + 1) == processedSeqNo.getFirst() :
181-
"checkpoint + 1 doesn't point to the first bit set (o.w. current bit set is completed and shouldn't be there)";
182168
assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1)) :
183169
"updateCheckpoint is called but the bit following the checkpoint is not set";
184170
try {
185171
// keep it simple for now, get the checkpoint one by one; in the future we can optimize and read words
186-
FixedBitSet current = processedSeqNo.getFirst();
172+
long bitSetKey = getBitSetKey(checkpoint);
173+
FixedBitSet current = processedSeqNo.get(bitSetKey);
174+
if (current == null) {
175+
// the bit set corresponding to the checkpoint has already been removed, set ourselves up for the next bit set
176+
assert checkpoint % BIT_SET_SIZE == BIT_SET_SIZE - 1;
177+
current = processedSeqNo.get(++bitSetKey);
178+
}
187179
do {
188180
checkpoint++;
189-
// the checkpoint always falls in the first bit set or just before. If it falls
190-
// on the last bit of the current bit set, we can clean it.
191-
if (checkpoint == firstProcessedSeqNo + BIT_ARRAYS_SIZE - 1) {
192-
processedSeqNo.removeFirst();
193-
firstProcessedSeqNo += BIT_ARRAYS_SIZE;
194-
assert checkpoint - firstProcessedSeqNo < BIT_ARRAYS_SIZE;
195-
current = processedSeqNo.peekFirst();
181+
/*
182+
* The checkpoint always falls in the current bit set or we have already cleaned it; if it falls on the last bit of the
183+
* current bit set, we can clean it.
184+
*/
185+
if (checkpoint == lastSeqNoInBitSet(bitSetKey)) {
186+
assert current != null;
187+
final FixedBitSet removed = processedSeqNo.remove(bitSetKey);
188+
assert removed == current;
189+
current = processedSeqNo.get(++bitSetKey);
196190
}
197191
} while (current != null && current.get(seqNoToBitSetOffset(checkpoint + 1)));
198192
} finally {
@@ -201,37 +195,45 @@ assert getBitSetForSeqNo(checkpoint + 1).get(seqNoToBitSetOffset(checkpoint + 1)
201195
}
202196
}
203197

198+
private long lastSeqNoInBitSet(final long bitSetKey) {
199+
return (1 + bitSetKey) * BIT_SET_SIZE - 1;
200+
}
201+
204202
/**
205-
* Return the bit array for the provided sequence number, possibly allocating a new array if needed.
203+
* Return the bit set for the provided sequence number, possibly allocating a new set if needed.
206204
*
207-
* @param seqNo the sequence number to obtain the bit array for
208-
* @return the bit array corresponding to the provided sequence number
205+
* @param seqNo the sequence number to obtain the bit set for
206+
* @return the bit set corresponding to the provided sequence number
209207
*/
208+
private long getBitSetKey(final long seqNo) {
209+
assert Thread.holdsLock(this);
210+
return seqNo / BIT_SET_SIZE;
211+
}
212+
210213
private FixedBitSet getBitSetForSeqNo(final long seqNo) {
211214
assert Thread.holdsLock(this);
212-
assert seqNo >= firstProcessedSeqNo : "seqNo: " + seqNo + " firstProcessedSeqNo: " + firstProcessedSeqNo;
213-
final long bitSetOffset = (seqNo - firstProcessedSeqNo) / BIT_ARRAYS_SIZE;
214-
if (bitSetOffset > Integer.MAX_VALUE) {
215-
throw new IndexOutOfBoundsException(
216-
"sequence number too high; got [" + seqNo + "], firstProcessedSeqNo [" + firstProcessedSeqNo + "]");
217-
}
218-
while (bitSetOffset >= processedSeqNo.size()) {
219-
processedSeqNo.add(new FixedBitSet(BIT_ARRAYS_SIZE));
215+
final long bitSetKey = getBitSetKey(seqNo);
216+
final int index = processedSeqNo.indexOf(bitSetKey);
217+
final FixedBitSet bitSet;
218+
if (processedSeqNo.indexExists(index)) {
219+
bitSet = processedSeqNo.indexGet(index);
220+
} else {
221+
bitSet = new FixedBitSet(BIT_SET_SIZE);
222+
processedSeqNo.indexInsert(index, bitSetKey, bitSet);
220223
}
221-
return processedSeqNo.get((int) bitSetOffset);
224+
return bitSet;
222225
}
223226

224227
/**
225-
* Obtain the position in the bit array corresponding to the provided sequence number. The bit array corresponding to the sequence
226-
* number can be obtained via {@link #getBitSetForSeqNo(long)}.
228+
* Obtain the position in the bit set corresponding to the provided sequence number. The bit set corresponding to the sequence number
229+
* can be obtained via {@link #getBitSetForSeqNo(long)}.
227230
*
228231
* @param seqNo the sequence number to obtain the position for
229-
* @return the position in the bit array corresponding to the provided sequence number
232+
* @return the position in the bit set corresponding to the provided sequence number
230233
*/
231234
private int seqNoToBitSetOffset(final long seqNo) {
232235
assert Thread.holdsLock(this);
233-
assert seqNo >= firstProcessedSeqNo;
234-
return ((int) (seqNo - firstProcessedSeqNo)) % BIT_ARRAYS_SIZE;
236+
return Math.toIntExact(seqNo % BIT_SET_SIZE);
235237
}
236238

237239
}

core/src/test/java/org/elasticsearch/index/seqno/LocalCheckpointTrackerTests.java

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,14 @@
1919

2020
package org.elasticsearch.index.seqno;
2121

22+
import com.carrotsearch.hppc.LongObjectHashMap;
23+
import org.apache.lucene.util.FixedBitSet;
2224
import org.elasticsearch.ElasticsearchException;
2325
import org.elasticsearch.common.Randomness;
2426
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
2527
import org.elasticsearch.test.ESTestCase;
28+
import org.hamcrest.BaseMatcher;
29+
import org.hamcrest.Description;
2630
import org.junit.Before;
2731

2832
import java.util.ArrayList;
@@ -36,8 +40,7 @@
3640
import java.util.stream.Collectors;
3741
import java.util.stream.IntStream;
3842

39-
import static org.elasticsearch.index.seqno.LocalCheckpointTracker.BIT_ARRAYS_SIZE;
40-
import static org.hamcrest.Matchers.empty;
43+
import static org.elasticsearch.index.seqno.LocalCheckpointTracker.BIT_SET_SIZE;
4144
import static org.hamcrest.Matchers.equalTo;
4245
import static org.hamcrest.Matchers.isOneOf;
4346

@@ -83,10 +86,19 @@ public void testSimpleReplica() {
8386
assertThat(tracker.getCheckpoint(), equalTo(2L));
8487
}
8588

89+
public void testLazyInitialization() {
90+
/*
91+
* Previously this would allocate the entire chain of bit sets to the one for the sequence number being marked; for very large
92+
* sequence numbers this could lead to excessive memory usage resulting in out of memory errors.
93+
*/
94+
tracker.markSeqNoAsCompleted(randomNonNegativeLong());
95+
assertThat(tracker.processedSeqNo.size(), equalTo(1));
96+
}
97+
8698
public void testSimpleOverFlow() {
8799
List<Integer> seqNoList = new ArrayList<>();
88100
final boolean aligned = randomBoolean();
89-
final int maxOps = BIT_ARRAYS_SIZE * randomIntBetween(1, 5) + (aligned ? 0 : randomIntBetween(1, BIT_ARRAYS_SIZE - 1));
101+
final int maxOps = BIT_SET_SIZE * randomIntBetween(1, 5) + (aligned ? 0 : randomIntBetween(1, BIT_SET_SIZE - 1));
90102

91103
for (int i = 0; i < maxOps; i++) {
92104
seqNoList.add(i);
@@ -97,7 +109,9 @@ public void testSimpleOverFlow() {
97109
}
98110
assertThat(tracker.checkpoint, equalTo(maxOps - 1L));
99111
assertThat(tracker.processedSeqNo.size(), equalTo(aligned ? 0 : 1));
100-
assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / BIT_ARRAYS_SIZE) * BIT_ARRAYS_SIZE));
112+
if (aligned == false) {
113+
assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / BIT_SET_SIZE));
114+
}
101115
}
102116

103117
public void testConcurrentPrimary() throws InterruptedException {
@@ -138,7 +152,9 @@ protected void doRun() throws Exception {
138152
tracker.markSeqNoAsCompleted(unFinishedSeq);
139153
assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L));
140154
assertThat(tracker.processedSeqNo.size(), isOneOf(0, 1));
141-
assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / BIT_ARRAYS_SIZE) * BIT_ARRAYS_SIZE));
155+
if (tracker.processedSeqNo.size() == 1) {
156+
assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / BIT_SET_SIZE));
157+
}
142158
}
143159

144160
public void testConcurrentReplica() throws InterruptedException {
@@ -186,7 +202,10 @@ protected void doRun() throws Exception {
186202
assertThat(tracker.getCheckpoint(), equalTo(unFinishedSeq - 1L));
187203
tracker.markSeqNoAsCompleted(unFinishedSeq);
188204
assertThat(tracker.getCheckpoint(), equalTo(maxOps - 1L));
189-
assertThat(tracker.firstProcessedSeqNo, equalTo(((long) maxOps / BIT_ARRAYS_SIZE) * BIT_ARRAYS_SIZE));
205+
assertThat(tracker.processedSeqNo.size(), isOneOf(0, 1));
206+
if (tracker.processedSeqNo.size() == 1) {
207+
assertThat(tracker.processedSeqNo.keys().iterator().next().value, equalTo(tracker.checkpoint / BIT_SET_SIZE));
208+
}
190209
}
191210

192211
public void testWaitForOpsToComplete() throws BrokenBarrierException, InterruptedException {
@@ -241,7 +260,17 @@ public void testResetCheckpoint() {
241260
tracker.resetCheckpoint(localCheckpoint);
242261
assertThat(tracker.getCheckpoint(), equalTo((long) localCheckpoint));
243262
assertThat(tracker.getMaxSeqNo(), equalTo((long) maxSeqNo));
244-
assertThat(tracker.processedSeqNo, empty());
263+
assertThat(tracker.processedSeqNo, new BaseMatcher<LongObjectHashMap<FixedBitSet>>() {
264+
@Override
265+
public boolean matches(Object item) {
266+
return (item instanceof LongObjectHashMap && ((LongObjectHashMap) item).isEmpty());
267+
}
268+
269+
@Override
270+
public void describeTo(Description description) {
271+
description.appendText("empty");
272+
}
273+
});
245274
assertThat(tracker.generateSeqNo(), equalTo((long) (maxSeqNo + 1)));
246275
}
247276
}

0 commit comments

Comments
 (0)