Skip to content

Commit f9eb059

Browse files
authored
Centralize common code for footer checksum read in BaseSearchableSnapshotIndexInput class (#68902) (#69015)
This commit factorizes in the BaseSearchableSnapshotIndexInput class some common code used to read the footer checksum of searchable snapshots IndexInput implementations. This change helps to maintain the code to read the footer checksum in one place and helps to enable this optimisation for DirectBlobContainerIndexInput. It also helps to assert the non-usage of the cache fetch async thread pool when reading from a index input in a single place. Finally, the index input unit tests have been adapted to account for the footer checksum read optimisation: they now generate a binary content that contains a valid footer checksum. Backport of #68902
1 parent 4817afa commit f9eb059

File tree

12 files changed

+210
-185
lines changed

12 files changed

+210
-185
lines changed

test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java

+5
Original file line numberDiff line numberDiff line change
@@ -668,6 +668,11 @@ public static byte randomByte() {
668668
return (byte) random().nextInt();
669669
}
670670

671+
public static byte randomNonNegativeByte() {
672+
byte randomByte = randomByte();
673+
return (byte) (randomByte == Byte.MIN_VALUE ? 0 : Math.abs(randomByte));
674+
}
675+
671676
/**
672677
* Helper method to create a byte array of a given length populated with random byte values
673678
*

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotIndexInput.java

+78-16
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
*/
77
package org.elasticsearch.index.store;
88

9+
import org.apache.logging.log4j.Logger;
10+
import org.apache.lucene.codecs.CodecUtil;
911
import org.apache.lucene.store.BufferedIndexInput;
1012
import org.apache.lucene.store.IOContext;
1113
import org.elasticsearch.common.blobstore.BlobContainer;
@@ -16,11 +18,16 @@
1618

1719
import java.io.IOException;
1820
import java.io.InputStream;
21+
import java.nio.ByteBuffer;
1922
import java.util.Objects;
2023
import java.util.concurrent.atomic.AtomicBoolean;
24+
import java.util.function.Predicate;
25+
26+
import static org.elasticsearch.index.store.checksum.ChecksumBlobContainerIndexInput.checksumToBytesArray;
2127

2228
public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInput {
2329

30+
protected final Logger logger;
2431
protected final BlobContainer blobContainer;
2532
protected final FileInfo fileInfo;
2633
protected final IOContext context;
@@ -33,6 +40,7 @@ public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInpu
3340
private AtomicBoolean closed;
3441

3542
public BaseSearchableSnapshotIndexInput(
43+
Logger logger,
3644
String resourceDesc,
3745
BlobContainer blobContainer,
3846
FileInfo fileInfo,
@@ -42,6 +50,7 @@ public BaseSearchableSnapshotIndexInput(
4250
long length
4351
) {
4452
super(resourceDesc, context);
53+
this.logger = Objects.requireNonNull(logger);
4554
this.blobContainer = Objects.requireNonNull(blobContainer);
4655
this.fileInfo = Objects.requireNonNull(fileInfo);
4756
this.context = Objects.requireNonNull(context);
@@ -54,25 +63,60 @@ public BaseSearchableSnapshotIndexInput(
5463
this.isClone = false;
5564
}
5665

57-
public BaseSearchableSnapshotIndexInput(
58-
String resourceDesc,
59-
BlobContainer blobContainer,
60-
FileInfo fileInfo,
61-
IOContext context,
62-
IndexInputStats stats,
63-
long offset,
64-
long length,
65-
int bufferSize
66-
) {
67-
this(resourceDesc, blobContainer, fileInfo, context, stats, offset, length);
68-
setBufferSize(bufferSize);
69-
}
70-
7166
@Override
7267
public final long length() {
7368
return length;
7469
}
7570

71+
@Override
72+
protected final void readInternal(ByteBuffer b) throws IOException {
73+
assert assertCurrentThreadIsNotCacheFetchAsync();
74+
75+
// We can detect that we're going to read the last 16 bytes (that contains the footer checksum) of the file. Such reads are often
76+
// executed when opening a Directory and since we have the checksum in the snapshot metadata we can use it to fill the ByteBuffer.
77+
if (maybeReadChecksumFromFileInfo(b)) {
78+
logger.trace("read footer of file [{}], bypassing all caches", fileInfo.physicalName());
79+
assert b.remaining() == 0L : b.remaining();
80+
return;
81+
}
82+
83+
doReadInternal(b);
84+
}
85+
86+
protected abstract void doReadInternal(ByteBuffer b) throws IOException;
87+
88+
/**
89+
* Detects read operations that are executed on the last 16 bytes of the index input which is where Lucene stores the footer checksum
90+
* of Lucene files. If such a read is detected this method tries to complete the read operation by reading the checksum from the
91+
* {@link FileInfo} in memory rather than reading the bytes from the {@link BufferedIndexInput} because that could trigger more cache
92+
* operations.
93+
*
94+
* @return true if the footer checksum has been read from the {@link FileInfo}
95+
*/
96+
private boolean maybeReadChecksumFromFileInfo(ByteBuffer b) throws IOException {
97+
final int remaining = b.remaining();
98+
if (remaining != CodecUtil.footerLength()) {
99+
return false;
100+
}
101+
final long position = getFilePointer() + this.offset;
102+
if (position != fileInfo.length() - CodecUtil.footerLength()) {
103+
return false;
104+
}
105+
if (isClone) {
106+
return false;
107+
}
108+
boolean success = false;
109+
try {
110+
b.put(checksumToBytesArray(fileInfo.checksum()));
111+
success = true;
112+
} catch (NumberFormatException e) {
113+
// tests disable this optimisation by passing an invalid checksum
114+
} finally {
115+
assert b.remaining() == (success ? 0L : remaining) : b.remaining() + " remaining bytes but success is " + success;
116+
}
117+
return success;
118+
}
119+
76120
/**
77121
* Opens an {@link InputStream} for the given range of bytes which reads the data directly from the blob store. If the requested range
78122
* spans multiple blobs then this stream will request them in turn.
@@ -173,11 +217,18 @@ public final void close() throws IOException {
173217
if (isClone == false) {
174218
stats.incrementCloseCount();
175219
}
176-
innerClose();
220+
doClose();
177221
}
178222
}
179223

180-
public abstract void innerClose() throws IOException;
224+
public abstract void doClose() throws IOException;
225+
226+
protected void ensureContext(Predicate<IOContext> predicate) throws IOException {
227+
if (predicate.test(context) == false) {
228+
assert false : "this method should not be used with this context " + context;
229+
throw new IOException("Cannot read the index input using context [context=" + context + ", input=" + this + ']');
230+
}
231+
}
181232

182233
protected final boolean assertCurrentThreadMayAccessBlobStore() {
183234
final String threadName = Thread.currentThread().getName();
@@ -199,4 +250,15 @@ protected final boolean assertCurrentThreadMayAccessBlobStore() {
199250
return true;
200251
}
201252

253+
protected static boolean isCacheFetchAsyncThread(final String threadName) {
254+
return threadName.contains('[' + SearchableSnapshotsConstants.CACHE_FETCH_ASYNC_THREAD_POOL_NAME + ']');
255+
}
256+
257+
protected static boolean assertCurrentThreadIsNotCacheFetchAsync() {
258+
final String threadName = Thread.currentThread().getName();
259+
assert false == isCacheFetchAsyncThread(threadName) : "expected the current thread ["
260+
+ threadName
261+
+ "] to belong to the cache fetch async thread pool";
262+
return true;
263+
}
202264
}

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java

+3-58
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import org.apache.logging.log4j.LogManager;
1111
import org.apache.logging.log4j.Logger;
1212
import org.apache.logging.log4j.message.ParameterizedMessage;
13-
import org.apache.lucene.codecs.CodecUtil;
1413
import org.apache.lucene.store.AlreadyClosedException;
1514
import org.apache.lucene.store.IOContext;
1615
import org.apache.lucene.store.IndexInput;
@@ -29,7 +28,6 @@
2928
import org.elasticsearch.index.store.BaseSearchableSnapshotIndexInput;
3029
import org.elasticsearch.index.store.IndexInputStats;
3130
import org.elasticsearch.index.store.SearchableSnapshotDirectory;
32-
import org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsConstants;
3331
import org.elasticsearch.xpack.searchablesnapshots.cache.ByteRange;
3432

3533
import java.io.EOFException;
@@ -42,10 +40,8 @@
4240
import java.util.concurrent.atomic.AtomicLong;
4341
import java.util.concurrent.atomic.AtomicReference;
4442
import java.util.function.Consumer;
45-
import java.util.function.Predicate;
4643
import java.util.stream.IntStream;
4744

48-
import static org.elasticsearch.index.store.checksum.ChecksumBlobContainerIndexInput.checksumToBytesArray;
4945
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshotsUtils.toIntBytes;
5046

5147
public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexInput {
@@ -106,7 +102,7 @@ private CachedBlobContainerIndexInput(
106102
int rangeSize,
107103
int recoveryRangeSize
108104
) {
109-
super(resourceDesc, directory.blobContainer(), fileInfo, context, stats, offset, length);
105+
super(logger, resourceDesc, directory.blobContainer(), fileInfo, context, stats, offset, length);
110106
this.directory = directory;
111107
this.cacheFileReference = cacheFileReference;
112108
this.lastReadPosition = this.offset;
@@ -116,19 +112,12 @@ private CachedBlobContainerIndexInput(
116112
}
117113

118114
@Override
119-
public void innerClose() {
115+
public void doClose() {
120116
if (isClone == false) {
121117
cacheFileReference.releaseOnClose();
122118
}
123119
}
124120

125-
private void ensureContext(Predicate<IOContext> predicate) throws IOException {
126-
if (predicate.test(context) == false) {
127-
assert false : "this method should not be used with this context " + context;
128-
throw new IOException("Cannot read the index input using context [context=" + context + ", input=" + this + ']');
129-
}
130-
}
131-
132121
private long getDefaultRangeSize() {
133122
return (context != CACHE_WARMING_CONTEXT)
134123
? (directory.isRecoveryFinalized() ? defaultRangeSize : recoveryRangeSize)
@@ -143,24 +132,12 @@ private ByteRange computeRange(long position) {
143132
}
144133

145134
@Override
146-
protected void readInternal(ByteBuffer b) throws IOException {
135+
protected void doReadInternal(ByteBuffer b) throws IOException {
147136
ensureContext(ctx -> ctx != CACHE_WARMING_CONTEXT);
148-
assert assertCurrentThreadIsNotCacheFetchAsync();
149137
final long position = getFilePointer() + this.offset;
150138
final int length = b.remaining();
151139

152-
// We can detect that we're going to read the last 16 bytes (that contains the footer checksum) of the file. Such reads are often
153-
// executed when opening a Directory and since we have the checksum in the snapshot metadata we can use it to fill the ByteBuffer.
154-
if (length == CodecUtil.footerLength() && isClone == false && position == fileInfo.length() - length) {
155-
if (readChecksumFromFileInfo(b)) {
156-
logger.trace("read footer of file [{}] at position [{}], bypassing all caches", fileInfo.physicalName(), position);
157-
return;
158-
}
159-
assert b.remaining() == length;
160-
}
161-
162140
logger.trace("readInternal: read [{}-{}] ([{}] bytes) from [{}]", position, position + length, length, this);
163-
164141
try {
165142
final CacheFile cacheFile = cacheFileReference.get();
166143

@@ -401,26 +378,6 @@ private int readDirectlyIfAlreadyClosed(long position, ByteBuffer b, Exception e
401378
throw new IOException("failed to read data from cache", e);
402379
}
403380

404-
private boolean readChecksumFromFileInfo(ByteBuffer b) throws IOException {
405-
assert isClone == false;
406-
byte[] footer;
407-
try {
408-
footer = checksumToBytesArray(fileInfo.checksum());
409-
} catch (NumberFormatException e) {
410-
// tests disable this optimisation by passing an invalid checksum
411-
footer = null;
412-
}
413-
if (footer == null) {
414-
return false;
415-
}
416-
417-
b.put(footer);
418-
assert b.remaining() == 0L;
419-
return true;
420-
421-
// TODO we should add this to DirectBlobContainerIndexInput too.
422-
}
423-
424381
/**
425382
* Prefetches a complete part and writes it in cache. This method is used to prewarm the cache.
426383
* @return a tuple with {@code Tuple<Persistent Cache Length, Prefetched Length>} values
@@ -737,23 +694,11 @@ private static boolean assertFileChannelOpen(FileChannel fileChannel) {
737694
return true;
738695
}
739696

740-
private static boolean isCacheFetchAsyncThread(final String threadName) {
741-
return threadName.contains('[' + SearchableSnapshotsConstants.CACHE_FETCH_ASYNC_THREAD_POOL_NAME + ']');
742-
}
743-
744697
private static boolean assertCurrentThreadMayWriteCacheFile() {
745698
final String threadName = Thread.currentThread().getName();
746699
assert isCacheFetchAsyncThread(threadName) : "expected the current thread ["
747700
+ threadName
748701
+ "] to belong to the cache fetch async thread pool";
749702
return true;
750703
}
751-
752-
private static boolean assertCurrentThreadIsNotCacheFetchAsync() {
753-
final String threadName = Thread.currentThread().getName();
754-
assert false == isCacheFetchAsyncThread(threadName) : "expected the current thread ["
755-
+ threadName
756-
+ "] to belong to the cache fetch async thread pool";
757-
return true;
758-
}
759704
}

0 commit comments

Comments
 (0)