Skip to content

Commit 815c861

Browse files
authored
Extract CacheBufferedIndexInput from CacheDirectory (#53879)
Following #53860, this commit extracts the CacheBufferedIndexInput class from the CacheDirectory so that it can be merged with SearchableSnapshotDirectory.
1 parent 8c732d0 commit 815c861

File tree

4 files changed

+309
-286
lines changed

4 files changed

+309
-286
lines changed

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/BaseSearchableSnapshotDirectory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ public BaseSearchableSnapshotDirectory(BlobContainer blobContainer, BlobStoreInd
3333
this.closed = new AtomicBoolean(false);
3434
}
3535

36+
public BlobContainer blobContainer() {
37+
return blobContainer;
38+
}
39+
3640
protected final FileInfo fileInfo(final String name) throws FileNotFoundException {
3741
return snapshot.indexFiles()
3842
.stream()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,295 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
7+
package org.elasticsearch.xpack.searchablesnapshots.cache;
8+
9+
import org.apache.logging.log4j.LogManager;
10+
import org.apache.logging.log4j.Logger;
11+
import org.apache.logging.log4j.message.ParameterizedMessage;
12+
import org.apache.lucene.store.AlreadyClosedException;
13+
import org.apache.lucene.store.IOContext;
14+
import org.apache.lucene.store.IndexInput;
15+
import org.elasticsearch.common.Nullable;
16+
import org.elasticsearch.common.SuppressForbidden;
17+
import org.elasticsearch.common.io.Channels;
18+
import org.elasticsearch.common.util.concurrent.ReleasableLock;
19+
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
20+
import org.elasticsearch.index.store.BaseSearchableSnapshotIndexInput;
21+
22+
import java.io.EOFException;
23+
import java.io.IOException;
24+
import java.io.InputStream;
25+
import java.nio.ByteBuffer;
26+
import java.nio.channels.FileChannel;
27+
import java.util.concurrent.atomic.AtomicBoolean;
28+
import java.util.concurrent.atomic.AtomicReference;
29+
30+
public class CacheBufferedIndexInput extends BaseSearchableSnapshotIndexInput {
31+
32+
private static final Logger logger = LogManager.getLogger(CacheBufferedIndexInput.class);
33+
private static final int COPY_BUFFER_SIZE = 8192;
34+
35+
private final CacheDirectory directory;
36+
private final long offset;
37+
private final long end;
38+
private final CacheFileReference cacheFileReference;
39+
private final IndexInputStats stats;
40+
41+
// the following are only mutable so they can be adjusted after cloning
42+
private AtomicBoolean closed;
43+
private boolean isClone;
44+
45+
// last read position is kept around in order to detect (non)contiguous reads for stats
46+
private long lastReadPosition;
47+
// last seek position is kept around in order to detect forward/backward seeks for stats
48+
private long lastSeekPosition;
49+
50+
CacheBufferedIndexInput(CacheDirectory directory, FileInfo fileInfo, IOContext context, IndexInputStats stats) {
51+
this("CachedBufferedIndexInput(" + fileInfo.physicalName() + ")", directory, fileInfo, context, stats, 0L, fileInfo.length(),
52+
false, new CacheFileReference(directory, fileInfo.physicalName(), fileInfo.length()));
53+
stats.incrementOpenCount();
54+
}
55+
56+
private CacheBufferedIndexInput(String resourceDesc, CacheDirectory directory, FileInfo fileInfo, IOContext context,
57+
IndexInputStats stats, long offset, long length, boolean isClone,
58+
CacheFileReference cacheFileReference) {
59+
super(resourceDesc, directory.blobContainer(), fileInfo, context);
60+
this.directory = directory;
61+
this.offset = offset;
62+
this.stats = stats;
63+
this.end = offset + length;
64+
this.closed = new AtomicBoolean(false);
65+
this.isClone = isClone;
66+
this.cacheFileReference = cacheFileReference;
67+
this.lastReadPosition = this.offset;
68+
this.lastSeekPosition = this.offset;
69+
}
70+
71+
@Override
72+
public long length() {
73+
return end - offset;
74+
}
75+
76+
@Override
77+
public void close() {
78+
if (closed.compareAndSet(false, true)) {
79+
if (isClone == false) {
80+
stats.incrementCloseCount();
81+
cacheFileReference.releaseOnClose();
82+
}
83+
}
84+
}
85+
86+
@Override
87+
protected void readInternal(final byte[] buffer, final int offset, final int length) throws IOException {
88+
final long position = getFilePointer() + this.offset;
89+
90+
int totalBytesRead = 0;
91+
while (totalBytesRead < length) {
92+
final long pos = position + totalBytesRead;
93+
final int off = offset + totalBytesRead;
94+
final int len = length - totalBytesRead;
95+
96+
int bytesRead = 0;
97+
try {
98+
final CacheFile cacheFile = cacheFileReference.get();
99+
if (cacheFile == null) {
100+
throw new AlreadyClosedException("Failed to acquire a non-evicted cache file");
101+
}
102+
103+
try (ReleasableLock ignored = cacheFile.fileLock()) {
104+
bytesRead = cacheFile.fetchRange(pos,
105+
(start, end) -> readCacheFile(cacheFile.getChannel(), end, pos, buffer, off, len),
106+
(start, end) -> writeCacheFile(cacheFile.getChannel(), start, end))
107+
.get();
108+
}
109+
} catch (final Exception e) {
110+
if (e instanceof AlreadyClosedException || (e.getCause() != null && e.getCause() instanceof AlreadyClosedException)) {
111+
try {
112+
// cache file was evicted during the range fetching, read bytes directly from source
113+
bytesRead = readDirectly(pos, pos + len, buffer, off);
114+
continue;
115+
} catch (Exception inner) {
116+
e.addSuppressed(inner);
117+
}
118+
}
119+
throw new IOException("Fail to read data from cache", e);
120+
121+
} finally {
122+
totalBytesRead += bytesRead;
123+
}
124+
}
125+
assert totalBytesRead == length : "partial read operation, read [" + totalBytesRead + "] bytes of [" + length + "]";
126+
stats.incrementBytesRead(lastReadPosition, position, totalBytesRead);
127+
lastReadPosition = position + totalBytesRead;
128+
lastSeekPosition = lastReadPosition;
129+
}
130+
131+
int readCacheFile(FileChannel fc, long end, long position, byte[] buffer, int offset, long length) throws IOException {
132+
assert assertFileChannelOpen(fc);
133+
int bytesRead = Channels.readFromFileChannel(fc, position, buffer, offset, Math.toIntExact(Math.min(length, end - position)));
134+
stats.addCachedBytesRead(bytesRead);
135+
return bytesRead;
136+
}
137+
138+
@SuppressForbidden(reason = "Use positional writes on purpose")
139+
void writeCacheFile(FileChannel fc, long start, long end) throws IOException {
140+
assert assertFileChannelOpen(fc);
141+
final long length = end - start;
142+
final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, length))];
143+
logger.trace(() -> new ParameterizedMessage("writing range [{}-{}] to cache file [{}]", start, end, cacheFileReference));
144+
145+
int bytesCopied = 0;
146+
final long startTimeNanos = directory.statsCurrentTimeNanos();
147+
try (InputStream input = openInputStream(start, length)) {
148+
stats.incrementInnerOpenCount();
149+
long remaining = end - start;
150+
while (remaining > 0) {
151+
final int len = (remaining < copyBuffer.length) ? Math.toIntExact(remaining) : copyBuffer.length;
152+
int bytesRead = input.read(copyBuffer, 0, len);
153+
fc.write(ByteBuffer.wrap(copyBuffer, 0, bytesRead), start + bytesCopied);
154+
bytesCopied += bytesRead;
155+
remaining -= bytesRead;
156+
}
157+
final long endTimeNanos = directory.statsCurrentTimeNanos();
158+
stats.addCachedBytesWritten(bytesCopied, endTimeNanos - startTimeNanos);
159+
}
160+
}
161+
162+
@Override
163+
protected void seekInternal(long pos) throws IOException {
164+
if (pos > length()) {
165+
throw new EOFException("Reading past end of file [position=" + pos + ", length=" + length() + "] for " + toString());
166+
} else if (pos < 0L) {
167+
throw new IOException("Seeking to negative position [" + pos + "] for " + toString());
168+
}
169+
final long position = pos + this.offset;
170+
stats.incrementSeeks(lastSeekPosition, position);
171+
lastSeekPosition = position;
172+
}
173+
174+
@Override
175+
public CacheBufferedIndexInput clone() {
176+
final CacheBufferedIndexInput clone = (CacheBufferedIndexInput) super.clone();
177+
clone.closed = new AtomicBoolean(false);
178+
clone.isClone = true;
179+
return clone;
180+
}
181+
182+
@Override
183+
public IndexInput slice(String sliceDescription, long offset, long length) {
184+
if (offset < 0 || length < 0 || offset + length > this.length()) {
185+
throw new IllegalArgumentException("slice() " + sliceDescription + " out of bounds: offset=" + offset
186+
+ ",length=" + length + ",fileLength=" + this.length() + ": " + this);
187+
}
188+
return new CacheBufferedIndexInput(getFullSliceDescription(sliceDescription), directory, fileInfo, context, stats,
189+
this.offset + offset, length, true, cacheFileReference);
190+
}
191+
192+
@Override
193+
public String toString() {
194+
return "CacheBufferedIndexInput{" +
195+
"cacheFileReference=" + cacheFileReference +
196+
", offset=" + offset +
197+
", end=" + end +
198+
", length=" + length() +
199+
", position=" + getFilePointer() +
200+
'}';
201+
}
202+
203+
private int readDirectly(long start, long end, byte[] buffer, int offset) throws IOException {
204+
final long length = end - start;
205+
final byte[] copyBuffer = new byte[Math.toIntExact(Math.min(COPY_BUFFER_SIZE, length))];
206+
logger.trace(() ->
207+
new ParameterizedMessage("direct reading of range [{}-{}] for cache file [{}]", start, end, cacheFileReference));
208+
209+
int bytesCopied = 0;
210+
final long startTimeNanos = directory.statsCurrentTimeNanos();
211+
try (InputStream input = openInputStream(start, length)) {
212+
stats.incrementInnerOpenCount();
213+
long remaining = end - start;
214+
while (remaining > 0) {
215+
final int len = (remaining < copyBuffer.length) ? (int) remaining : copyBuffer.length;
216+
int bytesRead = input.read(copyBuffer, 0, len);
217+
System.arraycopy(copyBuffer, 0, buffer, offset + bytesCopied, len);
218+
bytesCopied += bytesRead;
219+
remaining -= bytesRead;
220+
}
221+
final long endTimeNanos = directory.statsCurrentTimeNanos();
222+
stats.addDirectBytesRead(bytesCopied, endTimeNanos - startTimeNanos);
223+
}
224+
return bytesCopied;
225+
}
226+
227+
private static class CacheFileReference implements CacheFile.EvictionListener {
228+
229+
private final long fileLength;
230+
private final CacheKey cacheKey;
231+
private final CacheDirectory directory;
232+
private final AtomicReference<CacheFile> cacheFile = new AtomicReference<>(); // null if evicted or not yet acquired
233+
234+
private CacheFileReference(CacheDirectory directory, String fileName, long fileLength) {
235+
this.cacheKey = directory.createCacheKey(fileName);
236+
this.fileLength = fileLength;
237+
this.directory = directory;
238+
}
239+
240+
@Nullable
241+
CacheFile get() throws Exception {
242+
CacheFile currentCacheFile = cacheFile.get();
243+
if (currentCacheFile != null) {
244+
return currentCacheFile;
245+
}
246+
247+
final CacheFile newCacheFile = directory.getCacheFile(cacheKey, fileLength);
248+
synchronized (this) {
249+
currentCacheFile = cacheFile.get();
250+
if (currentCacheFile != null) {
251+
return currentCacheFile;
252+
}
253+
if (newCacheFile.acquire(this)) {
254+
final CacheFile previousCacheFile = cacheFile.getAndSet(newCacheFile);
255+
assert previousCacheFile == null;
256+
return newCacheFile;
257+
}
258+
}
259+
return null;
260+
}
261+
262+
@Override
263+
public void onEviction(final CacheFile evictedCacheFile) {
264+
synchronized (this) {
265+
if (cacheFile.compareAndSet(evictedCacheFile, null)) {
266+
evictedCacheFile.release(this);
267+
}
268+
}
269+
}
270+
271+
void releaseOnClose() {
272+
synchronized (this) {
273+
final CacheFile currentCacheFile = cacheFile.getAndSet(null);
274+
if (currentCacheFile != null) {
275+
currentCacheFile.release(this);
276+
}
277+
}
278+
}
279+
280+
@Override
281+
public String toString() {
282+
return "CacheFileReference{" +
283+
"cacheKey='" + cacheKey + '\'' +
284+
", fileLength=" + fileLength +
285+
", acquired=" + (cacheFile.get() != null) +
286+
'}';
287+
}
288+
}
289+
290+
private static boolean assertFileChannelOpen(FileChannel fileChannel) {
291+
assert fileChannel != null;
292+
assert fileChannel.isOpen();
293+
return true;
294+
}
295+
}

0 commit comments

Comments
 (0)