Skip to content

Commit 8c732d0

Browse files
authored
Uncouple CacheDirectory from SearchableSnapshotDirectory (#53860)
This commit is a first step forward merging CacheDirectory into SearchableSnapshotDirectory. It changes the cache directory so that it does not rely on the searchable snapshot directory anymore and instead read the bytes directly from the BlobContainer. It also adds two more base classes that group common class attributes for directories and index inputs.
1 parent b7c4c31 commit 8c732d0

File tree

11 files changed

+487
-285
lines changed

11 files changed

+487
-285
lines changed

test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/BlobContainerWrapper.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,16 @@ public InputStream readBlob(String name) throws IOException {
4545
return delegate.readBlob(name);
4646
}
4747

48+
@Override
49+
public InputStream readBlob(String blobName, long position, long length) throws IOException {
50+
return delegate.readBlob(blobName, position, length);
51+
}
52+
53+
@Override
54+
public long readBlobPreferredLength() {
55+
return delegate.readBlobPreferredLength();
56+
}
57+
4858
@Override
4959
public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists) throws IOException {
5060
delegate.writeBlob(blobName, inputStream, blobSize, failIfAlreadyExists);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.index.store;
7+
8+
import org.apache.lucene.store.BaseDirectory;
9+
import org.apache.lucene.store.IOContext;
10+
import org.apache.lucene.store.IndexOutput;
11+
import org.apache.lucene.store.SingleInstanceLockFactory;
12+
import org.elasticsearch.common.blobstore.BlobContainer;
13+
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
14+
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
15+
16+
import java.io.FileNotFoundException;
17+
import java.io.IOException;
18+
import java.util.Collection;
19+
import java.util.Objects;
20+
import java.util.Set;
21+
import java.util.concurrent.atomic.AtomicBoolean;
22+
23+
public abstract class BaseSearchableSnapshotDirectory extends BaseDirectory {
24+
25+
protected final BlobStoreIndexShardSnapshot snapshot;
26+
protected final BlobContainer blobContainer;
27+
private final AtomicBoolean closed;
28+
29+
public BaseSearchableSnapshotDirectory(BlobContainer blobContainer, BlobStoreIndexShardSnapshot snapshot) {
30+
super(new SingleInstanceLockFactory());
31+
this.snapshot = Objects.requireNonNull(snapshot);
32+
this.blobContainer = Objects.requireNonNull(blobContainer);
33+
this.closed = new AtomicBoolean(false);
34+
}
35+
36+
protected final FileInfo fileInfo(final String name) throws FileNotFoundException {
37+
return snapshot.indexFiles()
38+
.stream()
39+
.filter(fileInfo -> fileInfo.physicalName().equals(name))
40+
.findFirst()
41+
.orElseThrow(() -> new FileNotFoundException(name));
42+
}
43+
44+
@Override
45+
public final String[] listAll() {
46+
ensureOpen();
47+
return snapshot.indexFiles().stream().map(FileInfo::physicalName).sorted(String::compareTo).toArray(String[]::new);
48+
}
49+
50+
@Override
51+
public final long fileLength(final String name) throws IOException {
52+
ensureOpen();
53+
return fileInfo(name).length();
54+
}
55+
56+
@Override
57+
public Set<String> getPendingDeletions() {
58+
throw unsupportedException();
59+
}
60+
61+
@Override
62+
public void sync(Collection<String> names) {
63+
throw unsupportedException();
64+
}
65+
66+
@Override
67+
public void syncMetaData() {
68+
throw unsupportedException();
69+
}
70+
71+
@Override
72+
public void deleteFile(String name) {
73+
throw unsupportedException();
74+
}
75+
76+
@Override
77+
public IndexOutput createOutput(String name, IOContext context) {
78+
throw unsupportedException();
79+
}
80+
81+
@Override
82+
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) {
83+
throw unsupportedException();
84+
}
85+
86+
@Override
87+
public void rename(String source, String dest) {
88+
throw unsupportedException();
89+
}
90+
91+
private static UnsupportedOperationException unsupportedException() {
92+
assert false : "this operation is not supported and should have not be called";
93+
return new UnsupportedOperationException("Searchable snapshot directory does not support this operation");
94+
}
95+
96+
@Override
97+
public final void close() {
98+
if (closed.compareAndSet(false, true)) {
99+
isOpen = false;
100+
innerClose();
101+
}
102+
}
103+
104+
protected void innerClose() {}
105+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.index.store;
7+
8+
import org.apache.lucene.store.BufferedIndexInput;
9+
import org.apache.lucene.store.IOContext;
10+
import org.apache.lucene.util.BytesRef;
11+
import org.elasticsearch.common.blobstore.BlobContainer;
12+
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
13+
import org.elasticsearch.index.snapshots.blobstore.SlicedInputStream;
14+
15+
import java.io.ByteArrayInputStream;
16+
import java.io.IOException;
17+
import java.io.InputStream;
18+
import java.util.Objects;
19+
20+
public abstract class BaseSearchableSnapshotIndexInput extends BufferedIndexInput {
21+
22+
protected final BlobContainer blobContainer;
23+
protected final FileInfo fileInfo;
24+
protected final IOContext context;
25+
26+
public BaseSearchableSnapshotIndexInput(String resourceDesc, BlobContainer blobContainer, FileInfo fileInfo, IOContext context) {
27+
super(resourceDesc, context);
28+
this.blobContainer = Objects.requireNonNull(blobContainer);
29+
this.fileInfo = Objects.requireNonNull(fileInfo);
30+
this.context = Objects.requireNonNull(context);
31+
}
32+
33+
public BaseSearchableSnapshotIndexInput(
34+
String resourceDesc,
35+
BlobContainer blobContainer,
36+
FileInfo fileInfo,
37+
IOContext context,
38+
int bufferSize
39+
) {
40+
this(resourceDesc, blobContainer, fileInfo, context);
41+
setBufferSize(bufferSize);
42+
}
43+
44+
protected InputStream openInputStream(final long position, final long length) throws IOException {
45+
// TODO move this at the Directory level
46+
if (fileInfo.metadata().hashEqualsContents()) {
47+
// extract blob content from metadata hash
48+
final BytesRef data = fileInfo.metadata().hash();
49+
if ((position < 0L) || (length < 0L) || (position + length > data.bytes.length)) {
50+
throw new IllegalArgumentException(
51+
"Invalid arguments (pos=" + position + ", length=" + length + ") for hash content (length=" + data.bytes.length + ')'
52+
);
53+
}
54+
return new ByteArrayInputStream(data.bytes, Math.toIntExact(position), Math.toIntExact(length));
55+
}
56+
57+
final long startPart = getPartNumberForPosition(position);
58+
final long endPart = getPartNumberForPosition(position + length);
59+
if ((startPart == endPart) || fileInfo.numberOfParts() == 1L) {
60+
return blobContainer.readBlob(fileInfo.partName(startPart), getRelativePositionInPart(position), length);
61+
} else {
62+
return new SlicedInputStream(endPart - startPart + 1L) {
63+
@Override
64+
protected InputStream openSlice(long slice) throws IOException {
65+
final long currentPart = startPart + slice;
66+
return blobContainer.readBlob(
67+
fileInfo.partName(currentPart),
68+
(currentPart == startPart) ? getRelativePositionInPart(position) : 0L,
69+
(currentPart == endPart) ? getRelativePositionInPart(length) : getLengthOfPart(currentPart)
70+
);
71+
}
72+
};
73+
}
74+
}
75+
76+
private long getPartNumberForPosition(long position) {
77+
ensureValidPosition(position);
78+
final long part = position / fileInfo.partSize().getBytes();
79+
assert part <= fileInfo.numberOfParts() : "part number [" + part + "] exceeds number of parts: " + fileInfo.numberOfParts();
80+
assert part >= 0L : "part number [" + part + "] is negative";
81+
return part;
82+
}
83+
84+
private long getRelativePositionInPart(long position) {
85+
ensureValidPosition(position);
86+
final long pos = position % fileInfo.partSize().getBytes();
87+
assert pos < fileInfo.partBytes((int) getPartNumberForPosition(pos)) : "position in part [" + pos + "] exceeds part's length";
88+
assert pos >= 0L : "position in part [" + pos + "] is negative";
89+
return pos;
90+
}
91+
92+
private long getLengthOfPart(long part) {
93+
return fileInfo.partBytes(Math.toIntExact(part));
94+
}
95+
96+
private void ensureValidPosition(long position) {
97+
if (position < 0L || position > fileInfo.length()) {
98+
throw new IllegalArgumentException("Position [" + position + "] is invalid");
99+
}
100+
}
101+
}

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java

Lines changed: 7 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,14 @@
55
*/
66
package org.elasticsearch.index.store;
77

8-
import org.apache.lucene.store.BaseDirectory;
98
import org.apache.lucene.store.BufferedIndexInput;
109
import org.apache.lucene.store.Directory;
1110
import org.apache.lucene.store.IOContext;
1211
import org.apache.lucene.store.IndexInput;
13-
import org.apache.lucene.store.IndexOutput;
14-
import org.apache.lucene.store.SingleInstanceLockFactory;
1512
import org.elasticsearch.common.blobstore.BlobContainer;
1613
import org.elasticsearch.index.IndexSettings;
1714
import org.elasticsearch.index.shard.ShardPath;
1815
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
19-
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
2016
import org.elasticsearch.repositories.IndexId;
2117
import org.elasticsearch.repositories.RepositoriesService;
2218
import org.elasticsearch.repositories.Repository;
@@ -25,12 +21,8 @@
2521
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheDirectory;
2622
import org.elasticsearch.xpack.searchablesnapshots.cache.CacheService;
2723

28-
import java.io.FileNotFoundException;
2924
import java.io.IOException;
3025
import java.nio.file.Path;
31-
import java.util.Collection;
32-
import java.util.Objects;
33-
import java.util.Set;
3426
import java.util.function.LongSupplier;
3527

3628
import static org.elasticsearch.xpack.searchablesnapshots.SearchableSnapshots.SNAPSHOT_CACHE_ENABLED_SETTING;
@@ -50,95 +42,24 @@
5042
* shard files and what it stored in the snapshot the {@link BlobStoreIndexShardSnapshot} is used to map a physical file name as expected by
5143
* Lucene with the one (or the ones) corresponding blob(s) in the snapshot.
5244
*/
53-
public class SearchableSnapshotDirectory extends BaseDirectory {
54-
55-
private final BlobStoreIndexShardSnapshot snapshot;
56-
private final BlobContainer blobContainer;
45+
public class SearchableSnapshotDirectory extends BaseSearchableSnapshotDirectory {
5746

5847
SearchableSnapshotDirectory(final BlobStoreIndexShardSnapshot snapshot, final BlobContainer blobContainer) {
59-
super(new SingleInstanceLockFactory());
60-
this.snapshot = Objects.requireNonNull(snapshot);
61-
this.blobContainer = Objects.requireNonNull(blobContainer);
62-
}
63-
64-
private FileInfo fileInfo(final String name) throws FileNotFoundException {
65-
return snapshot.indexFiles().stream()
66-
.filter(fileInfo -> fileInfo.physicalName().equals(name))
67-
.findFirst()
68-
.orElseThrow(() -> new FileNotFoundException(name));
69-
}
70-
71-
@Override
72-
public String[] listAll() throws IOException {
73-
ensureOpen();
74-
return snapshot.indexFiles().stream()
75-
.map(FileInfo::physicalName)
76-
.sorted(String::compareTo)
77-
.toArray(String[]::new);
78-
}
79-
80-
@Override
81-
public long fileLength(final String name) throws IOException {
82-
ensureOpen();
83-
return fileInfo(name).length();
48+
super(blobContainer, snapshot);
8449
}
8550

8651
@Override
8752
public IndexInput openInput(final String name, final IOContext context) throws IOException {
8853
ensureOpen();
89-
return new SearchableSnapshotIndexInput(blobContainer, fileInfo(name), blobContainer.readBlobPreferredLength(),
54+
return new SearchableSnapshotIndexInput(blobContainer, fileInfo(name), context, blobContainer.readBlobPreferredLength(),
9055
BufferedIndexInput.BUFFER_SIZE);
9156
}
9257

93-
@Override
94-
public void close() {
95-
isOpen = false;
96-
}
97-
9858
@Override
9959
public String toString() {
10060
return this.getClass().getSimpleName() + "@" + snapshot.snapshot() + " lockFactory=" + lockFactory;
10161
}
10262

103-
@Override
104-
public Set<String> getPendingDeletions() {
105-
throw unsupportedException();
106-
}
107-
108-
@Override
109-
public void sync(Collection<String> names) {
110-
throw unsupportedException();
111-
}
112-
113-
@Override
114-
public void syncMetaData() {
115-
throw unsupportedException();
116-
}
117-
118-
@Override
119-
public void deleteFile(String name) {
120-
throw unsupportedException();
121-
}
122-
123-
@Override
124-
public IndexOutput createOutput(String name, IOContext context) {
125-
throw unsupportedException();
126-
}
127-
128-
@Override
129-
public IndexOutput createTempOutput(String prefix, String suffix, IOContext context) {
130-
throw unsupportedException();
131-
}
132-
133-
@Override
134-
public void rename(String source, String dest) {
135-
throw unsupportedException();
136-
}
137-
138-
private static UnsupportedOperationException unsupportedException() {
139-
return new UnsupportedOperationException("Searchable snapshot directory does not support this operation");
140-
}
141-
14263
public static Directory create(RepositoriesService repositories,
14364
CacheService cache,
14465
IndexSettings indexSettings,
@@ -158,11 +79,13 @@ public static Directory create(RepositoriesService repositories,
15879
SNAPSHOT_SNAPSHOT_ID_SETTING.get(indexSettings.getSettings()));
15980
final BlobStoreIndexShardSnapshot snapshot = blobStoreRepository.loadShardSnapshot(blobContainer, snapshotId);
16081

161-
Directory directory = new SearchableSnapshotDirectory(snapshot, blobContainer);
82+
final Directory directory;
16283
if (SNAPSHOT_CACHE_ENABLED_SETTING.get(indexSettings.getSettings())) {
16384
final Path cacheDir = shardPath.getDataPath().resolve("snapshots").resolve(snapshotId.getUUID());
164-
directory = new CacheDirectory(directory, cache, cacheDir, snapshotId, indexId, shardPath.getShardId(),
85+
directory = new CacheDirectory(snapshot, blobContainer, cache, cacheDir, snapshotId, indexId, shardPath.getShardId(),
16586
currentTimeNanosSupplier);
87+
} else {
88+
directory = new SearchableSnapshotDirectory(snapshot, blobContainer);
16689
}
16790
return new InMemoryNoOpCommitDirectory(directory);
16891
}

0 commit comments

Comments
 (0)