5
5
*/
6
6
package org .elasticsearch .index .store ;
7
7
8
+ import org .apache .lucene .store .BaseDirectory ;
8
9
import org .apache .lucene .store .BufferedIndexInput ;
9
10
import org .apache .lucene .store .Directory ;
10
11
import org .apache .lucene .store .IOContext ;
11
12
import org .apache .lucene .store .IndexInput ;
13
+ import org .apache .lucene .store .IndexOutput ;
14
+ import org .apache .lucene .store .SingleInstanceLockFactory ;
15
+ import org .elasticsearch .common .Nullable ;
12
16
import org .elasticsearch .common .blobstore .BlobContainer ;
17
+ import org .elasticsearch .common .settings .Settings ;
18
+ import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
13
19
import org .elasticsearch .index .IndexSettings ;
20
+ import org .elasticsearch .index .shard .ShardId ;
14
21
import org .elasticsearch .index .shard .ShardPath ;
15
22
import org .elasticsearch .index .snapshots .blobstore .BlobStoreIndexShardSnapshot ;
16
23
import org .elasticsearch .repositories .IndexId ;
17
24
import org .elasticsearch .repositories .RepositoriesService ;
18
25
import org .elasticsearch .repositories .Repository ;
19
26
import org .elasticsearch .repositories .blobstore .BlobStoreRepository ;
20
27
import org .elasticsearch .snapshots .SnapshotId ;
21
- import org .elasticsearch .xpack .searchablesnapshots .cache .CacheDirectory ;
28
+ import org .elasticsearch .xpack .searchablesnapshots .cache .CacheBufferedIndexInput ;
29
+ import org .elasticsearch .xpack .searchablesnapshots .cache .CacheFile ;
30
+ import org .elasticsearch .xpack .searchablesnapshots .cache .CacheKey ;
22
31
import org .elasticsearch .xpack .searchablesnapshots .cache .CacheService ;
32
+ import org .elasticsearch .xpack .searchablesnapshots .cache .IndexInputStats ;
23
33
34
+ import java .io .FileNotFoundException ;
24
35
import java .io .IOException ;
36
+ import java .nio .file .Files ;
25
37
import java .nio .file .Path ;
38
+ import java .util .Collection ;
39
+ import java .util .Collections ;
40
+ import java .util .Map ;
41
+ import java .util .Objects ;
42
+ import java .util .Set ;
43
+ import java .util .concurrent .atomic .AtomicBoolean ;
26
44
import java .util .function .LongSupplier ;
27
45
28
46
import static org .elasticsearch .xpack .searchablesnapshots .SearchableSnapshots .SNAPSHOT_CACHE_ENABLED_SETTING ;
42
60
* shard files and what it stored in the snapshot the {@link BlobStoreIndexShardSnapshot} is used to map a physical file name as expected by
43
61
* Lucene with the one (or the ones) corresponding blob(s) in the snapshot.
44
62
*/
45
- public class SearchableSnapshotDirectory extends BaseSearchableSnapshotDirectory {
63
+ public class SearchableSnapshotDirectory extends BaseDirectory {
46
64
47
- SearchableSnapshotDirectory (final BlobStoreIndexShardSnapshot snapshot , final BlobContainer blobContainer ) {
48
- super (blobContainer , snapshot );
65
+ private final BlobStoreIndexShardSnapshot snapshot ;
66
+ private final BlobContainer blobContainer ;
67
+ private final SnapshotId snapshotId ;
68
+ private final IndexId indexId ;
69
+ private final ShardId shardId ;
70
+ private final LongSupplier statsCurrentTimeNanosSupplier ;
71
+ private final Map <String , IndexInputStats > stats ;
72
+ private final CacheService cacheService ;
73
+ private final boolean useCache ;
74
+ private final Path cacheDir ;
75
+ private final AtomicBoolean closed ;
76
+
77
+ public SearchableSnapshotDirectory (
78
+ BlobContainer blobContainer ,
79
+ BlobStoreIndexShardSnapshot snapshot ,
80
+ SnapshotId snapshotId ,
81
+ IndexId indexId ,
82
+ ShardId shardId ,
83
+ Settings indexSettings ,
84
+ LongSupplier currentTimeNanosSupplier ,
85
+ CacheService cacheService ,
86
+ Path cacheDir
87
+ ) {
88
+ super (new SingleInstanceLockFactory ());
89
+ this .snapshot = Objects .requireNonNull (snapshot );
90
+ this .blobContainer = Objects .requireNonNull (blobContainer );
91
+ this .snapshotId = Objects .requireNonNull (snapshotId );
92
+ this .indexId = Objects .requireNonNull (indexId );
93
+ this .shardId = Objects .requireNonNull (shardId );
94
+ this .stats = ConcurrentCollections .newConcurrentMapWithAggressiveConcurrency ();
95
+ this .statsCurrentTimeNanosSupplier = Objects .requireNonNull (currentTimeNanosSupplier );
96
+ this .cacheService = Objects .requireNonNull (cacheService );
97
+ this .cacheDir = Objects .requireNonNull (cacheDir );
98
+ this .closed = new AtomicBoolean (false );
99
+ this .useCache = SNAPSHOT_CACHE_ENABLED_SETTING .get (indexSettings );
100
+ }
101
+
102
+ public BlobContainer blobContainer () {
103
+ return blobContainer ;
104
+ }
105
+
106
+ public SnapshotId getSnapshotId () {
107
+ return snapshotId ;
108
+ }
109
+
110
+ public IndexId getIndexId () {
111
+ return indexId ;
112
+ }
113
+
114
+ public ShardId getShardId () {
115
+ return shardId ;
116
+ }
117
+
118
+ public Map <String , IndexInputStats > getStats () {
119
+ return Collections .unmodifiableMap (stats );
120
+ }
121
+
122
+ @ Nullable
123
+ public IndexInputStats getStats (String fileName ) {
124
+ return stats .get (fileName );
125
+ }
126
+
127
+ public long statsCurrentTimeNanos () {
128
+ return statsCurrentTimeNanosSupplier .getAsLong ();
129
+ }
130
+
131
+ private BlobStoreIndexShardSnapshot .FileInfo fileInfo (final String name ) throws FileNotFoundException {
132
+ return snapshot .indexFiles ()
133
+ .stream ()
134
+ .filter (fileInfo -> fileInfo .physicalName ().equals (name ))
135
+ .findFirst ()
136
+ .orElseThrow (() -> new FileNotFoundException (name ));
137
+ }
138
+
139
+ @ Override
140
+ public final String [] listAll () {
141
+ ensureOpen ();
142
+ return snapshot .indexFiles ()
143
+ .stream ()
144
+ .map (BlobStoreIndexShardSnapshot .FileInfo ::physicalName )
145
+ .sorted (String ::compareTo )
146
+ .toArray (String []::new );
147
+ }
148
+
149
+ @ Override
150
+ public final long fileLength (final String name ) throws IOException {
151
+ ensureOpen ();
152
+ return fileInfo (name ).length ();
153
+ }
154
+
155
+ @ Override
156
+ public Set <String > getPendingDeletions () {
157
+ throw unsupportedException ();
158
+ }
159
+
160
+ @ Override
161
+ public void sync (Collection <String > names ) {
162
+ throw unsupportedException ();
163
+ }
164
+
165
+ @ Override
166
+ public void syncMetaData () {
167
+ throw unsupportedException ();
168
+ }
169
+
170
+ @ Override
171
+ public void deleteFile (String name ) {
172
+ throw unsupportedException ();
173
+ }
174
+
175
+ @ Override
176
+ public IndexOutput createOutput (String name , IOContext context ) {
177
+ throw unsupportedException ();
178
+ }
179
+
180
+ @ Override
181
+ public IndexOutput createTempOutput (String prefix , String suffix , IOContext context ) {
182
+ throw unsupportedException ();
183
+ }
184
+
185
+ @ Override
186
+ public void rename (String source , String dest ) {
187
+ throw unsupportedException ();
188
+ }
189
+
190
+ private static UnsupportedOperationException unsupportedException () {
191
+ assert false : "this operation is not supported and should have not be called" ;
192
+ return new UnsupportedOperationException ("Searchable snapshot directory does not support this operation" );
193
+ }
194
+
195
+ @ Override
196
+ public final void close () {
197
+ if (closed .compareAndSet (false , true )) {
198
+ isOpen = false ;
199
+ // Ideally we could let the cache evict/remove cached files by itself after the
200
+ // directory has been closed.
201
+ clearCache ();
202
+ }
203
+ }
204
+
205
+ public void clearCache () {
206
+ cacheService .removeFromCache (cacheKey -> cacheKey .belongsTo (snapshotId , indexId , shardId ));
207
+ }
208
+
209
+ protected IndexInputStats createIndexInputStats (final long fileLength ) {
210
+ return new IndexInputStats (fileLength );
211
+ }
212
+
213
+ public CacheKey createCacheKey (String fileName ) {
214
+ return new CacheKey (snapshotId , indexId , shardId , fileName );
215
+ }
216
+
217
+ public CacheFile getCacheFile (CacheKey cacheKey , long fileLength ) throws Exception {
218
+ return cacheService .get (cacheKey , fileLength , cacheDir );
49
219
}
50
220
51
221
@ Override
52
222
public IndexInput openInput (final String name , final IOContext context ) throws IOException {
53
223
ensureOpen ();
54
- return new SearchableSnapshotIndexInput (blobContainer , fileInfo (name ), context , blobContainer .readBlobPreferredLength (),
55
- BufferedIndexInput .BUFFER_SIZE );
224
+ final BlobStoreIndexShardSnapshot .FileInfo fileInfo = fileInfo (name );
225
+ final IndexInputStats inputStats = stats .computeIfAbsent (name , n -> createIndexInputStats (fileInfo .length ()));
226
+ if (useCache ) {
227
+ return new CacheBufferedIndexInput (this , fileInfo , context , inputStats );
228
+ } else {
229
+ long preferredLength = blobContainer .readBlobPreferredLength ();
230
+ return new SearchableSnapshotIndexInput (blobContainer , fileInfo , context , preferredLength , BufferedIndexInput .BUFFER_SIZE );
231
+ }
56
232
}
57
233
58
234
@ Override
@@ -75,18 +251,27 @@ public static Directory create(RepositoriesService repositories,
75
251
final IndexId indexId = new IndexId (indexSettings .getIndex ().getName (), SNAPSHOT_INDEX_ID_SETTING .get (indexSettings .getSettings ()));
76
252
final BlobContainer blobContainer = blobStoreRepository .shardContainer (indexId , shardPath .getShardId ().id ());
77
253
78
- final SnapshotId snapshotId = new SnapshotId (SNAPSHOT_SNAPSHOT_NAME_SETTING .get (indexSettings .getSettings ()),
79
- SNAPSHOT_SNAPSHOT_ID_SETTING .get (indexSettings .getSettings ()));
254
+ final SnapshotId snapshotId = new SnapshotId (
255
+ SNAPSHOT_SNAPSHOT_NAME_SETTING .get (indexSettings .getSettings ()),
256
+ SNAPSHOT_SNAPSHOT_ID_SETTING .get (indexSettings .getSettings ())
257
+ );
80
258
final BlobStoreIndexShardSnapshot snapshot = blobStoreRepository .loadShardSnapshot (blobContainer , snapshotId );
81
259
82
- final Directory directory ;
83
- if (SNAPSHOT_CACHE_ENABLED_SETTING .get (indexSettings .getSettings ())) {
84
- final Path cacheDir = shardPath .getDataPath ().resolve ("snapshots" ).resolve (snapshotId .getUUID ());
85
- directory = new CacheDirectory (snapshot , blobContainer , cache , cacheDir , snapshotId , indexId , shardPath .getShardId (),
86
- currentTimeNanosSupplier );
87
- } else {
88
- directory = new SearchableSnapshotDirectory (snapshot , blobContainer );
89
- }
90
- return new InMemoryNoOpCommitDirectory (directory );
260
+ final Path cacheDir = shardPath .getDataPath ().resolve ("snapshots" ).resolve (snapshotId .getUUID ());
261
+ Files .createDirectories (cacheDir );
262
+
263
+ return new InMemoryNoOpCommitDirectory (
264
+ new SearchableSnapshotDirectory (
265
+ blobContainer ,
266
+ snapshot ,
267
+ snapshotId ,
268
+ indexId ,
269
+ shardPath .getShardId (),
270
+ indexSettings .getSettings (),
271
+ currentTimeNanosSupplier ,
272
+ cache ,
273
+ cacheDir
274
+ )
275
+ );
91
276
}
92
277
}
0 commit comments