Skip to content

Commit b866aaf

Browse files
committed
Use int for number of parts in blob store (#61618)
Today we use `long` to represent the number of parts of a blob. There's no need for this extra range, it forces us to do some casting elsewhere, and indeed when snapshotting we iterate over the parts using an `int` which would be an infinite loop in case of overflow anyway: for (int i = 0; i < fileInfo.numberOfParts(); i++) { This commit changes the representation of the number of parts of a blob to an `int`.
1 parent aac9eb6 commit b866aaf

File tree

9 files changed

+49
-45
lines changed

9 files changed

+49
-45
lines changed

server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshot.java

+18-12
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.ArrayList;
3737
import java.util.Collections;
3838
import java.util.List;
39+
import java.util.stream.IntStream;
3940

4041
/**
4142
* Shard snapshot metadata
@@ -50,7 +51,7 @@ public static class FileInfo {
5051
private final String name;
5152
private final ByteSizeValue partSize;
5253
private final long partBytes;
53-
private final long numberOfParts;
54+
private final int numberOfParts;
5455
private final StoreFileMetadata metadata;
5556

5657
/**
@@ -69,17 +70,19 @@ public FileInfo(String name, StoreFileMetadata metadata, ByteSizeValue partSize)
6970
partBytes = partSize.getBytes();
7071
}
7172

72-
long totalLength = metadata.length();
73-
long numberOfParts = totalLength / partBytes;
74-
if (totalLength % partBytes > 0) {
75-
numberOfParts++;
76-
}
77-
if (numberOfParts == 0) {
78-
numberOfParts++;
73+
if (metadata.length() == 0) {
74+
numberOfParts = 1;
75+
} else {
76+
long longNumberOfParts = 1L + (metadata.length() - 1L) / partBytes; // ceil(len/partBytes), but beware of long overflow
77+
numberOfParts = (int)longNumberOfParts;
78+
if (numberOfParts != longNumberOfParts) { // also beware of int overflow, although 2^32 parts is already ludicrous
79+
throw new IllegalArgumentException("part size [" + partSize + "] too small for file [" + metadata + "]");
80+
}
7981
}
80-
this.numberOfParts = numberOfParts;
82+
8183
this.partSize = partSize;
8284
this.partBytes = partBytes;
85+
assert IntStream.range(0, numberOfParts).mapToLong(this::partBytes).sum() == metadata.length();
8386
}
8487

8588
/**
@@ -97,7 +100,7 @@ public String name() {
97100
* @param part part number
98101
* @return part name
99102
*/
100-
public String partName(long part) {
103+
public String partName(int part) {
101104
if (numberOfParts > 1) {
102105
return name + ".part" + part;
103106
} else {
@@ -151,6 +154,7 @@ public ByteSizeValue partSize() {
151154
* @return the size (in bytes) of a given part
152155
*/
153156
public long partBytes(int part) {
157+
assert 0 <= part && part < numberOfParts : part + " vs " + numberOfParts;
154158
if (numberOfParts == 1) {
155159
return length();
156160
}
@@ -159,15 +163,17 @@ public long partBytes(int part) {
159163
return partBytes;
160164
}
161165
// Last part size is deducted from the length and the number of parts
162-
return length() - (partBytes * (numberOfParts-1));
166+
final long lastPartBytes = length() - (this.partBytes * (numberOfParts - 1));
167+
assert 0 < lastPartBytes && lastPartBytes <= partBytes : lastPartBytes + " vs " + partBytes;
168+
return lastPartBytes;
163169
}
164170

165171
/**
166172
* Returns number of parts
167173
*
168174
* @return number of parts
169175
*/
170-
public long numberOfParts() {
176+
public int numberOfParts() {
171177
return numberOfParts;
172178
}
173179

server/src/main/java/org/elasticsearch/index/snapshots/blobstore/SlicedInputStream.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -27,21 +27,21 @@
2727
* A {@link SlicedInputStream} is a logical
2828
* concatenation one or more input streams. In contrast to the JDKs
2929
* {@link java.io.SequenceInputStream} this stream doesn't require the instantiation
30-
* of all logical sub-streams ahead of time. Instead, {@link #openSlice(long)} is called
30+
* of all logical sub-streams ahead of time. Instead, {@link #openSlice(int)} is called
3131
* if a new slice is required. Each slice is closed once it's been fully consumed or if
3232
* close is called before.
3333
*/
3434
public abstract class SlicedInputStream extends InputStream {
35-
private long slice = 0;
35+
private int slice = 0;
3636
private InputStream currentStream;
37-
private final long numSlices;
37+
private final int numSlices;
3838
private boolean initialized = false;
3939

4040
/**
4141
* Creates a new SlicedInputStream
4242
* @param numSlices the number of slices to consume
4343
*/
44-
protected SlicedInputStream(final long numSlices) {
44+
protected SlicedInputStream(final int numSlices) {
4545
this.numSlices = numSlices;
4646
}
4747

@@ -60,7 +60,7 @@ private InputStream nextStream() throws IOException {
6060
/**
6161
* Called for each logical slice given a zero based slice ordinal.
6262
*/
63-
protected abstract InputStream openSlice(long slice) throws IOException;
63+
protected abstract InputStream openSlice(int slice) throws IOException;
6464

6565
private InputStream currentStream() throws IOException {
6666
if (currentStream == null) {

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -2098,7 +2098,7 @@ private void restoreFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Store st
20982098
} else {
20992099
try (InputStream stream = maybeRateLimitRestores(new SlicedInputStream(fileInfo.numberOfParts()) {
21002100
@Override
2101-
protected InputStream openSlice(long slice) throws IOException {
2101+
protected InputStream openSlice(int slice) throws IOException {
21022102
return container.readBlob(fileInfo.partName(slice));
21032103
}
21042104
})) {

server/src/test/java/org/elasticsearch/index/snapshots/blobstore/FileInfoTests.java

-1
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,5 @@ public void testGetPartSize() {
170170
}
171171
assertEquals(numBytes, metadata.length());
172172
}
173-
174173
}
175174
}

server/src/test/java/org/elasticsearch/index/snapshots/blobstore/SlicedInputStreamTests.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,9 @@ public void testReadRandom() throws IOException {
6060
}
6161

6262
SlicedInputStream input = new SlicedInputStream(parts) {
63-
6463
@Override
65-
protected InputStream openSlice(long slice) throws IOException {
66-
return streams[(int)slice];
64+
protected InputStream openSlice(int slice) throws IOException {
65+
return streams[slice];
6766
}
6867
};
6968
random = new Random(seed);

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/SearchableSnapshotDirectory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ private void prewarmCache() {
439439
final IndexInput input = openInput(file.physicalName(), CachedBlobContainerIndexInput.CACHE_WARMING_CONTEXT);
440440
assert input instanceof CachedBlobContainerIndexInput : "expected cached index input but got " + input.getClass();
441441

442-
final int numberOfParts = Math.toIntExact(file.numberOfParts());
442+
final int numberOfParts = file.numberOfParts();
443443
final StepListener<Collection<Void>> fileCompletionListener = new StepListener<>();
444444
fileCompletionListener.whenComplete(voids -> input.close(), e -> IOUtils.closeWhileHandlingException(input));
445445
fileCompletionListener.whenComplete(voids -> completionListener.onResponse(null), completionListener::onFailure);

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/cache/CachedBlobContainerIndexInput.java

+13-13
Original file line numberDiff line numberDiff line change
@@ -655,23 +655,23 @@ private InputStream openInputStreamFromBlobStore(final long position, final long
655655
+ fileInfo
656656
+ "]";
657657
stats.addBlobStoreBytesRequested(length);
658-
return blobContainer.readBlob(fileInfo.partName(0L), position, length);
658+
return blobContainer.readBlob(fileInfo.partName(0), position, length);
659659
} else {
660-
final long startPart = getPartNumberForPosition(position);
661-
final long endPart = getPartNumberForPosition(position + length - 1);
660+
final int startPart = getPartNumberForPosition(position);
661+
final int endPart = getPartNumberForPosition(position + length - 1);
662662

663-
for (long currentPart = startPart; currentPart <= endPart; currentPart++) {
663+
for (int currentPart = startPart; currentPart <= endPart; currentPart++) {
664664
final long startInPart = (currentPart == startPart) ? getRelativePositionInPart(position) : 0L;
665665
final long endInPart = (currentPart == endPart)
666666
? getRelativePositionInPart(position + length - 1) + 1
667667
: getLengthOfPart(currentPart);
668668
stats.addBlobStoreBytesRequested(endInPart - startInPart);
669669
}
670670

671-
return new SlicedInputStream(endPart - startPart + 1L) {
671+
return new SlicedInputStream(endPart - startPart + 1) {
672672
@Override
673-
protected InputStream openSlice(long slice) throws IOException {
674-
final long currentPart = startPart + slice;
673+
protected InputStream openSlice(int slice) throws IOException {
674+
final int currentPart = startPart + slice;
675675
final long startInPart = (currentPart == startPart) ? getRelativePositionInPart(position) : 0L;
676676
final long endInPart = (currentPart == endPart)
677677
? getRelativePositionInPart(position + length - 1) + 1
@@ -685,11 +685,11 @@ protected InputStream openSlice(long slice) throws IOException {
685685
/**
686686
* Compute the part number that contains the byte at the given position in the corresponding Lucene file.
687687
*/
688-
private long getPartNumberForPosition(long position) {
688+
private int getPartNumberForPosition(long position) {
689689
ensureValidPosition(position);
690-
final long part = position / fileInfo.partSize().getBytes();
690+
final int part = Math.toIntExact(position / fileInfo.partSize().getBytes());
691691
assert part <= fileInfo.numberOfParts() : "part number [" + part + "] exceeds number of parts: " + fileInfo.numberOfParts();
692-
assert part >= 0L : "part number [" + part + "] is negative";
692+
assert part >= 0 : "part number [" + part + "] is negative";
693693
return part;
694694
}
695695

@@ -700,13 +700,13 @@ private long getPartNumberForPosition(long position) {
700700
private long getRelativePositionInPart(long position) {
701701
ensureValidPosition(position);
702702
final long pos = position % fileInfo.partSize().getBytes();
703-
assert pos < fileInfo.partBytes((int) getPartNumberForPosition(pos)) : "position in part [" + pos + "] exceeds part's length";
703+
assert pos < fileInfo.partBytes(getPartNumberForPosition(pos)) : "position in part [" + pos + "] exceeds part's length";
704704
assert pos >= 0L : "position in part [" + pos + "] is negative";
705705
return pos;
706706
}
707707

708-
private long getLengthOfPart(long part) {
709-
return fileInfo.partBytes(toIntBytes(part));
708+
private long getLengthOfPart(int part) {
709+
return fileInfo.partBytes(part);
710710
}
711711

712712
private void ensureValidPosition(long position) {

x-pack/plugin/searchable-snapshots/src/main/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInput.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -105,18 +105,18 @@ private DirectBlobContainerIndexInput(
105105
@Override
106106
protected void readInternal(ByteBuffer b) throws IOException {
107107
ensureOpen();
108-
if (fileInfo.numberOfParts() == 1L) {
108+
if (fileInfo.numberOfParts() == 1) {
109109
readInternalBytes(0, position, b, b.remaining());
110110
} else {
111111
while (b.hasRemaining()) {
112112
int currentPart = Math.toIntExact(position / fileInfo.partSize().getBytes());
113-
int remainingBytesInPart;
113+
long remainingBytesInPart;
114114
if (currentPart < (fileInfo.numberOfParts() - 1)) {
115-
remainingBytesInPart = toIntBytes(((currentPart + 1L) * fileInfo.partSize().getBytes()) - position);
115+
remainingBytesInPart = ((currentPart + 1) * fileInfo.partSize().getBytes()) - position;
116116
} else {
117117
remainingBytesInPart = toIntBytes(fileInfo.length() - position);
118118
}
119-
final int read = Math.min(b.remaining(), remainingBytesInPart);
119+
final int read = toIntBytes(Math.min(b.remaining(), remainingBytesInPart));
120120
readInternalBytes(currentPart, position % fileInfo.partSize().getBytes(), b, read);
121121
}
122122
}
@@ -211,8 +211,8 @@ private int readFromNewSequentialStream(int part, long pos, ByteBuffer b, int le
211211
// it and keep it open for future reads
212212
final InputStream inputStream = openBlobStream(part, pos, streamLength);
213213
streamForSequentialReads = new StreamForSequentialReads(new FilterInputStream(inputStream) {
214-
private LongAdder bytesRead = new LongAdder();
215-
private LongAdder timeNanos = new LongAdder();
214+
private final LongAdder bytesRead = new LongAdder();
215+
private final LongAdder timeNanos = new LongAdder();
216216

217217
private int onOptimizedRead(CheckedSupplier<Integer, IOException> read) throws IOException {
218218
final long startTimeNanos = stats.currentTimeNanos();

x-pack/plugin/searchable-snapshots/src/test/java/org/elasticsearch/index/store/direct/DirectBlobContainerIndexInputTests.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -79,18 +79,18 @@ private DirectBlobContainerIndexInput createIndexInput(final byte[] input, long
7979
onReadBlob.run();
8080

8181
final InputStream stream;
82-
if (fileInfo.numberOfParts() == 1L) {
82+
if (fileInfo.numberOfParts() == 1) {
8383
assertThat("Unexpected blob name [" + name + "]", name, equalTo(fileInfo.name()));
8484
stream = new ByteArrayInputStream(input, toIntBytes(position), toIntBytes(length));
8585

8686
} else {
8787
assertThat("Unexpected blob name [" + name + "]", name, allOf(startsWith(fileInfo.name()), containsString(".part")));
8888

89-
long partNumber = Long.parseLong(name.substring(name.indexOf(".part") + ".part".length()));
89+
int partNumber = Integer.parseInt(name.substring(name.indexOf(".part") + ".part".length()));
9090
assertThat(
9191
"Unexpected part number [" + partNumber + "] for [" + name + "]",
9292
partNumber,
93-
allOf(greaterThanOrEqualTo(0L), lessThan(fileInfo.numberOfParts()))
93+
allOf(greaterThanOrEqualTo(0), lessThan(fileInfo.numberOfParts()))
9494
);
9595

9696
stream = new ByteArrayInputStream(input, toIntBytes(partNumber * partSize + position), toIntBytes(length));

0 commit comments

Comments
 (0)