5
5
*/
6
6
package org .elasticsearch .xpack .searchablesnapshots .cache ;
7
7
8
+ import org .apache .logging .log4j .LogManager ;
9
+ import org .apache .logging .log4j .Logger ;
10
+ import org .apache .logging .log4j .message .ParameterizedMessage ;
8
11
import org .apache .lucene .store .AlreadyClosedException ;
9
12
import org .apache .lucene .store .BufferedIndexInput ;
10
13
import org .apache .lucene .store .Directory ;
14
17
import org .elasticsearch .common .Nullable ;
15
18
import org .elasticsearch .common .SuppressForbidden ;
16
19
import org .elasticsearch .common .io .Channels ;
20
+ import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
17
21
import org .elasticsearch .common .util .concurrent .ReleasableLock ;
18
22
import org .elasticsearch .index .shard .ShardId ;
19
23
import org .elasticsearch .repositories .IndexId ;
25
29
import java .nio .channels .FileChannel ;
26
30
import java .nio .file .Files ;
27
31
import java .nio .file .Path ;
32
+ import java .util .Map ;
28
33
import java .util .Objects ;
29
34
import java .util .concurrent .atomic .AtomicBoolean ;
30
35
import java .util .concurrent .atomic .AtomicReference ;
34
39
*/
35
40
public class CacheDirectory extends FilterDirectory {
36
41
42
+ private static final Logger logger = LogManager .getLogger (CacheDirectory .class );
37
43
private static final int COPY_BUFFER_SIZE = 8192 ;
38
44
45
+ private final Map <String , IndexInputStats > stats ;
39
46
private final CacheService cacheService ;
40
47
private final SnapshotId snapshotId ;
41
48
private final IndexId indexId ;
@@ -45,6 +52,7 @@ public class CacheDirectory extends FilterDirectory {
45
52
public CacheDirectory (Directory in , CacheService cacheService , Path cacheDir , SnapshotId snapshotId , IndexId indexId , ShardId shardId )
46
53
throws IOException {
47
54
super (in );
55
+ this .stats = ConcurrentCollections .newConcurrentMapWithAggressiveConcurrency ();
48
56
this .cacheService = Objects .requireNonNull (cacheService );
49
57
this .cacheDir = Files .createDirectories (cacheDir );
50
58
this .snapshotId = Objects .requireNonNull (snapshotId );
@@ -56,6 +64,11 @@ private CacheKey createCacheKey(String fileName) {
56
64
return new CacheKey (snapshotId , indexId , shardId , fileName );
57
65
}
58
66
67
+ // pkg private for tests
68
+ @ Nullable IndexInputStats getStats (String name ) {
69
+ return stats .get (name );
70
+ }
71
+
59
72
public void close () throws IOException {
60
73
super .close ();
61
74
// Ideally we could let the cache evict/remove cached files by itself after the
@@ -66,7 +79,8 @@ public void close() throws IOException {
66
79
@ Override
67
80
public IndexInput openInput (final String name , final IOContext context ) throws IOException {
68
81
ensureOpen ();
69
- return new CacheBufferedIndexInput (name , fileLength (name ), context );
82
+ final long fileLength = fileLength (name );
83
+ return new CacheBufferedIndexInput (name , fileLength , context , stats .computeIfAbsent (name , n -> new IndexInputStats (fileLength )));
70
84
}
71
85
72
86
private class CacheFileReference implements CacheFile .EvictionListener {
@@ -141,22 +155,28 @@ public class CacheBufferedIndexInput extends BufferedIndexInput {
141
155
private final long offset ;
142
156
private final long end ;
143
157
private final CacheFileReference cacheFileReference ;
158
+ private final IndexInputStats stats ;
144
159
145
160
// the following are only mutable so they can be adjusted after cloning
146
161
private AtomicBoolean closed ;
147
162
private boolean isClone ;
148
163
149
- CacheBufferedIndexInput (String fileName , long fileLength , IOContext ioContext ) {
150
- this (new CacheFileReference (fileName , fileLength ), ioContext ,
164
+ // last read position is kept around in order to detect (non)contiguous reads for stats
165
+ private long lastReadPosition ;
166
+
167
+ CacheBufferedIndexInput (String fileName , long fileLength , IOContext ioContext , IndexInputStats stats ) {
168
+ this (new CacheFileReference (fileName , fileLength ), ioContext , stats ,
151
169
"CachedBufferedIndexInput(" + fileName + ")" , 0L , fileLength , false );
170
+ stats .incrementOpenCount ();
152
171
}
153
172
154
- private CacheBufferedIndexInput (CacheFileReference cacheFileReference , IOContext ioContext , String desc , long offset , long length ,
155
- boolean isClone ) {
173
+ private CacheBufferedIndexInput (CacheFileReference cacheFileReference , IOContext ioContext , IndexInputStats stats ,
174
+ String desc , long offset , long length , boolean isClone ) {
156
175
super (desc , ioContext );
157
176
this .ioContext = ioContext ;
158
177
this .offset = offset ;
159
178
this .cacheFileReference = cacheFileReference ;
179
+ this .stats = stats ;
160
180
this .end = offset + length ;
161
181
this .closed = new AtomicBoolean (false );
162
182
this .isClone = isClone ;
@@ -171,6 +191,7 @@ public long length() {
171
191
public void close () {
172
192
if (closed .compareAndSet (false , true )) {
173
193
if (isClone == false ) {
194
+ stats .incrementCloseCount ();
174
195
cacheFileReference .releaseOnClose ();
175
196
}
176
197
}
@@ -180,20 +201,21 @@ public void close() {
180
201
protected void readInternal (final byte [] buffer , final int offset , final int length ) throws IOException {
181
202
final long position = getFilePointer () + this .offset ;
182
203
183
- int bytesRead = 0 ;
184
- while (bytesRead < length ) {
185
- final long pos = position + bytesRead ;
186
- final int off = offset + bytesRead ;
187
- final int len = length - bytesRead ;
204
+ int totalBytesRead = 0 ;
205
+ while (totalBytesRead < length ) {
206
+ final long pos = position + totalBytesRead ;
207
+ final int off = offset + totalBytesRead ;
208
+ final int len = length - totalBytesRead ;
188
209
210
+ int bytesRead = 0 ;
189
211
try {
190
212
final CacheFile cacheFile = cacheFileReference .get ();
191
213
if (cacheFile == null ) {
192
214
throw new AlreadyClosedException ("Failed to acquire a non-evicted cache file" );
193
215
}
194
216
195
217
try (ReleasableLock ignored = cacheFile .fileLock ()) {
196
- bytesRead + = cacheFile .fetchRange (pos ,
218
+ bytesRead = cacheFile .fetchRange (pos ,
197
219
(start , end ) -> readCacheFile (cacheFile .getChannel (), end , pos , buffer , off , len ),
198
220
(start , end ) -> writeCacheFile (cacheFile .getChannel (), start , end ))
199
221
.get ();
@@ -202,33 +224,42 @@ protected void readInternal(final byte[] buffer, final int offset, final int len
202
224
if (e instanceof AlreadyClosedException || (e .getCause () != null && e .getCause () instanceof AlreadyClosedException )) {
203
225
try {
204
226
// cache file was evicted during the range fetching, read bytes directly from source
205
- bytesRead + = readDirectly (pos , pos + len , buffer , off );
227
+ bytesRead = readDirectly (pos , pos + len , buffer , off );
206
228
continue ;
207
229
} catch (Exception inner ) {
208
230
e .addSuppressed (inner );
209
231
}
210
232
}
211
233
throw new IOException ("Fail to read data from cache" , e );
212
234
235
+ } finally {
236
+ totalBytesRead += bytesRead ;
213
237
}
214
238
}
215
- assert bytesRead == length : "partial read operation, read [" + bytesRead + "] bytes of [" + length + "]" ;
239
+ assert totalBytesRead == length : "partial read operation, read [" + totalBytesRead + "] bytes of [" + length + "]" ;
240
+ stats .incrementBytesRead (lastReadPosition , position , totalBytesRead );
241
+ lastReadPosition = position + totalBytesRead ;
216
242
}
217
243
218
244
int readCacheFile (FileChannel fc , long end , long position , byte [] buffer , int offset , long length ) throws IOException {
219
245
assert assertFileChannelOpen (fc );
220
- return Channels .readFromFileChannel (fc , position , buffer , offset , Math .toIntExact (Math .min (length , end - position )));
246
+ int bytesRead = Channels .readFromFileChannel (fc , position , buffer , offset , Math .toIntExact (Math .min (length , end - position )));
247
+ stats .addCachedBytesRead (bytesRead );
248
+ return bytesRead ;
221
249
}
222
250
223
251
@ SuppressForbidden (reason = "Use positional writes on purpose" )
224
252
void writeCacheFile (FileChannel fc , long start , long end ) throws IOException {
225
253
assert assertFileChannelOpen (fc );
226
254
final byte [] copyBuffer = new byte [Math .toIntExact (Math .min (COPY_BUFFER_SIZE , end - start ))];
255
+ logger .trace (() -> new ParameterizedMessage ("writing range [{}-{}] to cache file [{}]" , start , end , cacheFileReference ));
256
+
257
+ int bytesCopied = 0 ;
227
258
try (IndexInput input = in .openInput (cacheFileReference .getFileName (), ioContext )) {
259
+ stats .incrementInnerOpenCount ();
228
260
if (start > 0 ) {
229
261
input .seek (start );
230
262
}
231
- int bytesCopied = 0 ;
232
263
long remaining = end - start ;
233
264
while (remaining > 0 ) {
234
265
final int size = (remaining < copyBuffer .length ) ? Math .toIntExact (remaining ) : copyBuffer .length ;
@@ -237,6 +268,7 @@ void writeCacheFile(FileChannel fc, long start, long end) throws IOException {
237
268
bytesCopied += size ;
238
269
remaining -= size ;
239
270
}
271
+ stats .addCachedBytesWritten (bytesCopied );
240
272
}
241
273
}
242
274
@@ -247,6 +279,7 @@ protected void seekInternal(long pos) throws IOException {
247
279
} else if (pos < 0L ) {
248
280
throw new IOException ("Seeking to negative position [" + pos + "] for " + toString ());
249
281
}
282
+ stats .incrementSeeks (getFilePointer (), pos );
250
283
}
251
284
252
285
@ Override
@@ -263,7 +296,7 @@ public IndexInput slice(String sliceDescription, long offset, long length) {
263
296
throw new IllegalArgumentException ("slice() " + sliceDescription + " out of bounds: offset=" + offset
264
297
+ ",length=" + length + ",fileLength=" + this .length () + ": " + this );
265
298
}
266
- return new CacheBufferedIndexInput (cacheFileReference , ioContext ,
299
+ return new CacheBufferedIndexInput (cacheFileReference , ioContext , stats ,
267
300
getFullSliceDescription (sliceDescription ), this .offset + offset , length , true );
268
301
}
269
302
@@ -280,9 +313,12 @@ public String toString() {
280
313
281
314
private int readDirectly (long start , long end , byte [] buffer , int offset ) throws IOException {
282
315
final byte [] copyBuffer = new byte [Math .toIntExact (Math .min (COPY_BUFFER_SIZE , end - start ))];
316
+ logger .trace (() ->
317
+ new ParameterizedMessage ("direct reading of range [{}-{}] for cache file [{}]" , start , end , cacheFileReference ));
283
318
284
319
int bytesCopied = 0 ;
285
320
try (IndexInput input = in .openInput (cacheFileReference .getFileName (), ioContext )) {
321
+ stats .incrementInnerOpenCount ();
286
322
if (start > 0 ) {
287
323
input .seek (start );
288
324
}
@@ -294,6 +330,7 @@ private int readDirectly(long start, long end, byte[] buffer, int offset) throws
294
330
bytesCopied += len ;
295
331
remaining -= len ;
296
332
}
333
+ stats .addDirectBytesRead (bytesCopied );
297
334
}
298
335
return bytesCopied ;
299
336
}
0 commit comments