5
5
*/
6
6
package org .elasticsearch .index .store ;
7
7
8
+ import org .apache .logging .log4j .LogManager ;
9
+ import org .apache .logging .log4j .Logger ;
10
+ import org .apache .logging .log4j .message .ParameterizedMessage ;
8
11
import org .apache .lucene .index .IndexFileNames ;
9
12
import org .apache .lucene .store .BaseDirectory ;
10
13
import org .apache .lucene .store .Directory ;
18
21
import org .elasticsearch .common .blobstore .BlobContainer ;
19
22
import org .elasticsearch .common .lucene .store .ByteArrayIndexInput ;
20
23
import org .elasticsearch .common .settings .Settings ;
24
+ import org .elasticsearch .common .unit .TimeValue ;
21
25
import org .elasticsearch .common .util .LazyInitializable ;
26
+ import org .elasticsearch .common .util .concurrent .AbstractRunnable ;
22
27
import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
28
+ import org .elasticsearch .common .util .concurrent .CountDown ;
29
+ import org .elasticsearch .core .internal .io .IOUtils ;
23
30
import org .elasticsearch .index .IndexSettings ;
24
31
import org .elasticsearch .index .shard .ShardId ;
25
32
import org .elasticsearch .index .shard .ShardPath ;
47
54
import java .util .Map ;
48
55
import java .util .Objects ;
49
56
import java .util .Set ;
57
+ import java .util .concurrent .Executor ;
50
58
import java .util .concurrent .atomic .AtomicBoolean ;
51
59
import java .util .function .LongSupplier ;
52
60
import java .util .function .Supplier ;
61
+ import java .util .stream .Collectors ;
53
62
54
63
import static org .apache .lucene .store .BufferedIndexInput .bufferSize ;
55
64
import static org .elasticsearch .xpack .searchablesnapshots .SearchableSnapshots .SNAPSHOT_CACHE_ENABLED_SETTING ;
56
65
import static org .elasticsearch .xpack .searchablesnapshots .SearchableSnapshots .SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING ;
66
+ import static org .elasticsearch .xpack .searchablesnapshots .SearchableSnapshots .SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING ;
57
67
import static org .elasticsearch .xpack .searchablesnapshots .SearchableSnapshots .SNAPSHOT_INDEX_ID_SETTING ;
58
68
import static org .elasticsearch .xpack .searchablesnapshots .SearchableSnapshots .SNAPSHOT_REPOSITORY_SETTING ;
59
69
import static org .elasticsearch .xpack .searchablesnapshots .SearchableSnapshots .SNAPSHOT_SNAPSHOT_ID_SETTING ;
60
70
import static org .elasticsearch .xpack .searchablesnapshots .SearchableSnapshots .SNAPSHOT_SNAPSHOT_NAME_SETTING ;
61
71
import static org .elasticsearch .xpack .searchablesnapshots .SearchableSnapshots .SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING ;
72
+ import static org .elasticsearch .xpack .searchablesnapshots .SearchableSnapshotsConstants .SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME ;
62
73
63
74
/**
64
75
* Implementation of {@link Directory} that exposes files from a snapshot as a Lucene directory. Because snapshot are immutable this
73
84
*/
74
85
public class SearchableSnapshotDirectory extends BaseDirectory {
75
86
87
+ private static final Logger logger = LogManager .getLogger (SearchableSnapshotDirectory .class );
88
+
76
89
private final Supplier <BlobContainer > blobContainerSupplier ;
77
90
private final Supplier <BlobStoreIndexShardSnapshot > snapshotSupplier ;
78
91
private final SnapshotId snapshotId ;
79
92
private final IndexId indexId ;
80
93
private final ShardId shardId ;
81
94
private final LongSupplier statsCurrentTimeNanosSupplier ;
82
95
private final Map <String , IndexInputStats > stats ;
96
+ private final ThreadPool threadPool ;
83
97
private final CacheService cacheService ;
84
98
private final boolean useCache ;
99
+ private final boolean prewarmCache ;
85
100
private final Set <String > excludedFileTypes ;
86
101
private final long uncachedChunkSize ; // if negative use BlobContainer#readBlobPreferredLength, see #getUncachedChunkSize()
87
102
private final Path cacheDir ;
@@ -101,7 +116,8 @@ public SearchableSnapshotDirectory(
101
116
Settings indexSettings ,
102
117
LongSupplier currentTimeNanosSupplier ,
103
118
CacheService cacheService ,
104
- Path cacheDir
119
+ Path cacheDir ,
120
+ ThreadPool threadPool
105
121
) {
106
122
super (new SingleInstanceLockFactory ());
107
123
this .snapshotSupplier = Objects .requireNonNull (snapshot );
@@ -115,8 +131,10 @@ public SearchableSnapshotDirectory(
115
131
this .cacheDir = Objects .requireNonNull (cacheDir );
116
132
this .closed = new AtomicBoolean (false );
117
133
this .useCache = SNAPSHOT_CACHE_ENABLED_SETTING .get (indexSettings );
134
+ this .prewarmCache = useCache ? SNAPSHOT_CACHE_PREWARM_ENABLED_SETTING .get (indexSettings ) : false ;
118
135
this .excludedFileTypes = new HashSet <>(SNAPSHOT_CACHE_EXCLUDED_FILE_TYPES_SETTING .get (indexSettings ));
119
136
this .uncachedChunkSize = SNAPSHOT_UNCACHED_CHUNK_SIZE_SETTING .get (indexSettings ).getBytes ();
137
+ this .threadPool = threadPool ;
120
138
this .loaded = false ;
121
139
assert invariant ();
122
140
}
@@ -142,6 +160,7 @@ protected final boolean assertCurrentThreadMayLoadSnapshot() {
142
160
* @return true if the snapshot was loaded by executing this method, false otherwise
143
161
*/
144
162
public boolean loadSnapshot () {
163
+ assert assertCurrentThreadMayLoadSnapshot ();
145
164
boolean alreadyLoaded = this .loaded ;
146
165
if (alreadyLoaded == false ) {
147
166
synchronized (this ) {
@@ -150,10 +169,10 @@ public boolean loadSnapshot() {
150
169
this .blobContainer = blobContainerSupplier .get ();
151
170
this .snapshot = snapshotSupplier .get ();
152
171
this .loaded = true ;
172
+ prewarmCache ();
153
173
}
154
174
}
155
175
}
156
- assert assertCurrentThreadMayLoadSnapshot ();
157
176
assert invariant ();
158
177
return alreadyLoaded == false ;
159
178
}
@@ -300,7 +319,7 @@ public IndexInput openInput(final String name, final IOContext context) throws I
300
319
301
320
final IndexInputStats inputStats = stats .computeIfAbsent (name , n -> createIndexInputStats (fileInfo .length ()));
302
321
if (useCache && isExcludedFromCache (name ) == false ) {
303
- return new CachedBlobContainerIndexInput (this , fileInfo , context , inputStats );
322
+ return new CachedBlobContainerIndexInput (this , fileInfo , context , inputStats , cacheService . getRangeSize () );
304
323
} else {
305
324
return new DirectBlobContainerIndexInput (
306
325
blobContainer (),
@@ -331,12 +350,86 @@ public String toString() {
331
350
return this .getClass ().getSimpleName () + "@snapshotId=" + snapshotId + " lockFactory=" + lockFactory ;
332
351
}
333
352
353
+ private void prewarmCache () {
354
+ if (prewarmCache ) {
355
+ final List <BlobStoreIndexShardSnapshot .FileInfo > cacheFiles = snapshot ().indexFiles ()
356
+ .stream ()
357
+ .filter (file -> file .metadata ().hashEqualsContents () == false )
358
+ .filter (file -> isExcludedFromCache (file .physicalName ()) == false )
359
+ .collect (Collectors .toList ());
360
+
361
+ final Executor executor = threadPool .executor (SEARCHABLE_SNAPSHOTS_THREAD_POOL_NAME );
362
+ logger .debug ("{} warming shard cache for [{}] files" , shardId , cacheFiles .size ());
363
+
364
+ for (BlobStoreIndexShardSnapshot .FileInfo cacheFile : cacheFiles ) {
365
+ final String fileName = cacheFile .physicalName ();
366
+ try {
367
+ final IndexInput input = openInput (fileName , CachedBlobContainerIndexInput .CACHE_WARMING_CONTEXT );
368
+ assert input instanceof CachedBlobContainerIndexInput : "expected cached index input but got " + input .getClass ();
369
+
370
+ final long numberOfParts = cacheFile .numberOfParts ();
371
+ final CountDown countDown = new CountDown (Math .toIntExact (numberOfParts ));
372
+ for (long p = 0 ; p < numberOfParts ; p ++) {
373
+ final int part = Math .toIntExact (p );
374
+ // TODO use multiple workers to warm each part instead of filling the thread pool
375
+ executor .execute (new AbstractRunnable () {
376
+ @ Override
377
+ protected void doRun () throws Exception {
378
+ ensureOpen ();
379
+
380
+ logger .trace ("warming cache for [{}] part [{}/{}]" , fileName , part , numberOfParts );
381
+ final long startTimeInNanos = statsCurrentTimeNanosSupplier .getAsLong ();
382
+
383
+ final CachedBlobContainerIndexInput cachedIndexInput = (CachedBlobContainerIndexInput ) input .clone ();
384
+ final int bytesRead = cachedIndexInput .prefetchPart (part ); // TODO does not include any rate limitation
385
+ assert bytesRead == cacheFile .partBytes (part );
386
+
387
+ logger .trace (
388
+ () -> new ParameterizedMessage (
389
+ "part [{}/{}] of [{}] warmed in [{}] ms" ,
390
+ part ,
391
+ numberOfParts ,
392
+ fileName ,
393
+ TimeValue .timeValueNanos (statsCurrentTimeNanosSupplier .getAsLong () - startTimeInNanos ).millis ()
394
+ )
395
+ );
396
+ }
397
+
398
+ @ Override
399
+ public void onFailure (Exception e ) {
400
+ logger .trace (
401
+ () -> new ParameterizedMessage (
402
+ "failed to warm cache for [{}] part [{}/{}]" ,
403
+ fileName ,
404
+ part ,
405
+ numberOfParts
406
+ ),
407
+ e
408
+ );
409
+ }
410
+
411
+ @ Override
412
+ public void onAfter () {
413
+ if (countDown .countDown ()) {
414
+ IOUtils .closeWhileHandlingException (input );
415
+ }
416
+ }
417
+ });
418
+ }
419
+ } catch (IOException e ) {
420
+ logger .trace (() -> new ParameterizedMessage ("failed to warm cache for [{}]" , fileName ), e );
421
+ }
422
+ }
423
+ }
424
+ }
425
+
334
426
public static Directory create (
335
427
RepositoriesService repositories ,
336
428
CacheService cache ,
337
429
IndexSettings indexSettings ,
338
430
ShardPath shardPath ,
339
- LongSupplier currentTimeNanosSupplier
431
+ LongSupplier currentTimeNanosSupplier ,
432
+ ThreadPool threadPool
340
433
) throws IOException {
341
434
342
435
final Repository repository = repositories .repository (SNAPSHOT_REPOSITORY_SETTING .get (indexSettings .getSettings ()));
@@ -371,7 +464,8 @@ public static Directory create(
371
464
indexSettings .getSettings (),
372
465
currentTimeNanosSupplier ,
373
466
cache ,
374
- cacheDir
467
+ cacheDir ,
468
+ threadPool
375
469
)
376
470
);
377
471
}
0 commit comments