Skip to content

Commit c049b02

Browse files
authored
Fix searchable snapshot stats fwd/bwd seeking counters (#52760)
This commit fixes the backward/forward seeking counters in searchable snapshot statistics. These counters use the current file pointer position and the new seeking position to compute the seeking stats, but at the time the stat is computed the current file pointer position is already updated to the new position, making the delta always equals to 0.
1 parent c9ac57f commit c049b02

File tree

3 files changed

+156
-6
lines changed

3 files changed

+156
-6
lines changed

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheDirectory.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,16 @@ public Map<String, IndexInputStats> getStats() {
8686
}
8787

8888
// pkg private for tests
89-
@Nullable IndexInputStats getStats(String name) {
89+
@Nullable
90+
IndexInputStats getStats(String name) {
9091
return stats.get(name);
9192
}
9293

94+
// pkg private so tests can override
95+
IndexInputStats createIndexInputStats(final long fileLength) {
96+
return new IndexInputStats(fileLength);
97+
}
98+
9399
public void close() throws IOException {
94100
super.close();
95101
// Ideally we could let the cache evict/remove cached files by itself after the
@@ -101,7 +107,7 @@ public void close() throws IOException {
101107
public IndexInput openInput(final String name, final IOContext context) throws IOException {
102108
ensureOpen();
103109
final long fileLength = fileLength(name);
104-
return new CacheBufferedIndexInput(name, fileLength, context, stats.computeIfAbsent(name, n -> new IndexInputStats(fileLength)));
110+
return new CacheBufferedIndexInput(name, fileLength, context, stats.computeIfAbsent(name, n -> createIndexInputStats(fileLength)));
105111
}
106112

107113
private class CacheFileReference implements CacheFile.EvictionListener {
@@ -184,6 +190,8 @@ public class CacheBufferedIndexInput extends BufferedIndexInput {
184190

185191
// last read position is kept around in order to detect (non)contiguous reads for stats
186192
private long lastReadPosition;
193+
// last seek position is kept around in order to detect forward/backward seeks for stats
194+
private long lastSeekPosition;
187195

188196
CacheBufferedIndexInput(String fileName, long fileLength, IOContext ioContext, IndexInputStats stats) {
189197
this(new CacheFileReference(fileName, fileLength), ioContext, stats,
@@ -201,6 +209,8 @@ private CacheBufferedIndexInput(CacheFileReference cacheFileReference, IOContext
201209
this.end = offset + length;
202210
this.closed = new AtomicBoolean(false);
203211
this.isClone = isClone;
212+
this.lastReadPosition = this.offset;
213+
this.lastSeekPosition = this.offset;
204214
}
205215

206216
@Override
@@ -260,6 +270,7 @@ protected void readInternal(final byte[] buffer, final int offset, final int len
260270
assert totalBytesRead == length : "partial read operation, read [" + totalBytesRead + "] bytes of [" + length + "]";
261271
stats.incrementBytesRead(lastReadPosition, position, totalBytesRead);
262272
lastReadPosition = position + totalBytesRead;
273+
lastSeekPosition = lastReadPosition;
263274
}
264275

265276
int readCacheFile(FileChannel fc, long end, long position, byte[] buffer, int offset, long length) throws IOException {
@@ -302,7 +313,9 @@ protected void seekInternal(long pos) throws IOException {
302313
} else if (pos < 0L) {
303314
throw new IOException("Seeking to negative position [" + pos + "] for " + toString());
304315
}
305-
stats.incrementSeeks(getFilePointer(), pos);
316+
final long position = pos + this.offset;
317+
stats.incrementSeeks(lastSeekPosition, position);
318+
lastSeekPosition = position;
306319
}
307320

308321
@Override

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/xpack/searchablesnapshots/cache/IndexInputStats.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ public TimedCounter getCachedBytesWritten() {
156156
}
157157

158158
@SuppressForbidden(reason = "Handles Long.MIN_VALUE before using Math.abs()")
159-
private boolean isLargeSeek(long delta) {
159+
boolean isLargeSeek(long delta) {
160160
return delta != Long.MIN_VALUE && Math.abs(delta) > seekingThreshold;
161161
}
162162

x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/xpack/searchablesnapshots/cache/CacheBufferedIndexInputStatsTests.java

Lines changed: 139 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,132 @@ public void testReadBytesNonContiguously() throws Exception {
294294
});
295295
}
296296

297+
public void testForwardSeeks() throws Exception {
298+
// use default cache service settings
299+
final CacheService cacheService = new CacheService(Settings.EMPTY);
300+
301+
executeTestCase(cacheService, (fileName, fileContent, cacheDirectory) -> {
302+
final IOContext ioContext = newIOContext(random());
303+
try (IndexInput indexInput = cacheDirectory.openInput(fileName, ioContext)) {
304+
IndexInput input = indexInput;
305+
if (randomBoolean()) {
306+
final long sliceOffset = randomLongBetween(0L, input.length() - 1L);
307+
final long sliceLength = randomLongBetween(1L, input.length() - sliceOffset);
308+
input = input.slice("slice", sliceOffset, sliceLength);
309+
}
310+
if (randomBoolean()) {
311+
input = input.clone();
312+
}
313+
314+
final IndexInputStats inputStats = cacheDirectory.getStats(fileName);
315+
final IndexInputStats.Counter forwardSmallSeeksCounter = inputStats.getForwardSmallSeeks();
316+
assertCounter(forwardSmallSeeksCounter, 0L, 0L, 0L, 0L);
317+
318+
long totalSmallSeeks = 0L;
319+
long countSmallSeeks = 0L;
320+
long minSmallSeeks = Long.MAX_VALUE;
321+
long maxSmallSeeks = Long.MIN_VALUE;
322+
323+
final IndexInputStats.Counter forwardLargeSeeksCounter = inputStats.getForwardLargeSeeks();
324+
assertCounter(forwardLargeSeeksCounter, 0L, 0L, 0L, 0L);
325+
326+
long totalLargeSeeks = 0L;
327+
long countLargeSeeks = 0L;
328+
long minLargeSeeks = Long.MAX_VALUE;
329+
long maxLargeSeeks = Long.MIN_VALUE;
330+
331+
while (input.getFilePointer() < input.length()) {
332+
long moveForward = randomLongBetween(1L, input.length() - input.getFilePointer());
333+
input.seek(input.getFilePointer() + moveForward);
334+
335+
if (inputStats.isLargeSeek(moveForward)) {
336+
minLargeSeeks = (moveForward < minLargeSeeks) ? moveForward : minLargeSeeks;
337+
maxLargeSeeks = (moveForward > maxLargeSeeks) ? moveForward : maxLargeSeeks;
338+
totalLargeSeeks += moveForward;
339+
countLargeSeeks += 1;
340+
341+
assertCounter(forwardLargeSeeksCounter, totalLargeSeeks, countLargeSeeks, minLargeSeeks, maxLargeSeeks);
342+
343+
} else {
344+
minSmallSeeks = (moveForward < minSmallSeeks) ? moveForward : minSmallSeeks;
345+
maxSmallSeeks = (moveForward > maxSmallSeeks) ? moveForward : maxSmallSeeks;
346+
totalSmallSeeks += moveForward;
347+
countSmallSeeks += 1;
348+
349+
assertCounter(forwardSmallSeeksCounter, totalSmallSeeks, countSmallSeeks, minSmallSeeks, maxSmallSeeks);
350+
}
351+
}
352+
} catch (IOException e) {
353+
throw new AssertionError(e);
354+
}
355+
});
356+
}
357+
358+
public void testBackwardSeeks() throws Exception {
359+
// use default cache service settings
360+
final CacheService cacheService = new CacheService(Settings.EMPTY);
361+
362+
executeTestCase(cacheService, (fileName, fileContent, cacheDirectory) -> {
363+
final IOContext ioContext = newIOContext(random());
364+
try (IndexInput indexInput = cacheDirectory.openInput(fileName, ioContext)) {
365+
IndexInput input = indexInput;
366+
if (randomBoolean()) {
367+
final long sliceOffset = randomLongBetween(0L, input.length() - 1L);
368+
final long sliceLength = randomLongBetween(1L, input.length() - sliceOffset);
369+
input = input.slice("slice", sliceOffset, sliceLength);
370+
}
371+
if (randomBoolean()) {
372+
input = input.clone();
373+
}
374+
375+
final IndexInputStats inputStats = cacheDirectory.getStats(fileName);
376+
final IndexInputStats.Counter backwardSmallSeeks = inputStats.getBackwardSmallSeeks();
377+
assertCounter(backwardSmallSeeks, 0L, 0L, 0L, 0L);
378+
379+
long totalSmallSeeks = 0L;
380+
long countSmallSeeks = 0L;
381+
long minSmallSeeks = Long.MAX_VALUE;
382+
long maxSmallSeeks = Long.MIN_VALUE;
383+
384+
final IndexInputStats.Counter backwardLargeSeeks = inputStats.getBackwardLargeSeeks();
385+
assertCounter(backwardLargeSeeks, 0L, 0L, 0L, 0L);
386+
387+
long totalLargeSeeks = 0L;
388+
long countLargeSeeks = 0L;
389+
long minLargeSeeks = Long.MAX_VALUE;
390+
long maxLargeSeeks = Long.MIN_VALUE;
391+
392+
input.seek(input.length());
393+
assertThat(input.getFilePointer(), equalTo(input.length()));
394+
395+
do {
396+
long moveBackward = -1L * randomLongBetween(1L, input.getFilePointer());
397+
input.seek(input.getFilePointer() + moveBackward);
398+
399+
if (inputStats.isLargeSeek(moveBackward)) {
400+
minLargeSeeks = (moveBackward < minLargeSeeks) ? moveBackward : minLargeSeeks;
401+
maxLargeSeeks = (moveBackward > maxLargeSeeks) ? moveBackward : maxLargeSeeks;
402+
totalLargeSeeks += moveBackward;
403+
countLargeSeeks += 1;
404+
405+
assertCounter(backwardLargeSeeks, totalLargeSeeks, countLargeSeeks, minLargeSeeks, maxLargeSeeks);
406+
407+
} else {
408+
minSmallSeeks = (moveBackward < minSmallSeeks) ? moveBackward : minSmallSeeks;
409+
maxSmallSeeks = (moveBackward > maxSmallSeeks) ? moveBackward : maxSmallSeeks;
410+
totalSmallSeeks += moveBackward;
411+
countSmallSeeks += 1;
412+
413+
assertCounter(backwardSmallSeeks, totalSmallSeeks, countSmallSeeks, minSmallSeeks, maxSmallSeeks);
414+
}
415+
416+
} while (input.getFilePointer() > 0L);
417+
} catch (IOException e) {
418+
throw new AssertionError(e);
419+
}
420+
});
421+
}
422+
297423
private static void executeTestCase(CacheService cacheService, TriConsumer<String, byte[], CacheDirectory> test) throws Exception {
298424
final byte[] fileContent = randomUnicodeOfLength(randomIntBetween(10, MAX_FILE_LENGTH)).getBytes(StandardCharsets.UTF_8);
299425
final String fileName = randomAlphaOfLength(10);
@@ -302,10 +428,21 @@ private static void executeTestCase(CacheService cacheService, TriConsumer<Strin
302428
final ShardId shardId = new ShardId("_name", "_uuid", 0);
303429
final AtomicLong fakeClock = new AtomicLong();
304430

431+
final Long seekingThreshold = randomBoolean() ? randomLongBetween(1L, fileContent.length) : null;
432+
305433
try (CacheService ignored = cacheService;
306434
Directory directory = newDirectory();
307-
CacheDirectory cacheDirectory = new CacheDirectory(directory, cacheService, createTempDir(), snapshotId, indexId, shardId,
308-
() -> fakeClock.addAndGet(FAKE_CLOCK_ADVANCE_NANOS))
435+
CacheDirectory cacheDirectory =
436+
new CacheDirectory(directory, cacheService, createTempDir(), snapshotId, indexId, shardId,
437+
() -> fakeClock.addAndGet(FAKE_CLOCK_ADVANCE_NANOS)) {
438+
@Override
439+
IndexInputStats createIndexInputStats(long fileLength) {
440+
if (seekingThreshold == null) {
441+
return super.createIndexInputStats(fileLength);
442+
}
443+
return new IndexInputStats(fileLength, seekingThreshold);
444+
}
445+
}
309446
) {
310447
cacheService.start();
311448
assertThat(cacheDirectory.getStats(fileName), nullValue());

0 commit comments

Comments
 (0)