Skip to content

Commit f320685

Browse files
Deserialize BlobStore Metadata Files in a Streaming Manner (elastic#73149)
We were reading the full file contents up-front here because of the complexity of verifying the footer otherwise. This commit moves the logic for reading metadata blobs (that can become quite sizable in some cases) in a streaming manner by manually doing the footer verification as Lucene's utility methods don't allow for verification on top of a stream.
1 parent a9b782c commit f320685

File tree

5 files changed

+190
-86
lines changed

5 files changed

+190
-86
lines changed

server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.elasticsearch.client.Client;
1717
import org.elasticsearch.cluster.SnapshotsInProgress;
1818
import org.elasticsearch.core.TimeValue;
19-
import org.elasticsearch.common.util.MockBigArrays;
2019
import org.elasticsearch.index.IndexNotFoundException;
2120
import org.elasticsearch.repositories.RepositoryData;
2221
import org.elasticsearch.repositories.ShardSnapshotResult;
@@ -731,8 +730,7 @@ private static BlobStoreIndexShardSnapshots readShardGeneration(
731730
() -> BlobStoreRepository.INDEX_SHARD_SNAPSHOTS_FORMAT.read(
732731
repository.shardContainer(repositoryShardId.index(), repositoryShardId.shardId()),
733732
generation,
734-
NamedXContentRegistry.EMPTY,
735-
MockBigArrays.NON_RECYCLING_INSTANCE
733+
NamedXContentRegistry.EMPTY
736734
)
737735
)
738736
)

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

+7-10
Original file line numberDiff line numberDiff line change
@@ -977,8 +977,7 @@ private void writeUpdatedShardMetaDataAndComputeDeletes(
977977
for (String indexMetaGeneration : indexMetaGenerations) {
978978
executor.execute(ActionRunnable.supply(allShardCountsListener, () -> {
979979
try {
980-
return INDEX_METADATA_FORMAT.read(indexContainer, indexMetaGeneration, namedXContentRegistry, bigArrays)
981-
.getNumberOfShards();
980+
return INDEX_METADATA_FORMAT.read(indexContainer, indexMetaGeneration, namedXContentRegistry).getNumberOfShards();
982981
} catch (Exception ex) {
983982
logger.warn(
984983
() -> new ParameterizedMessage(
@@ -1435,7 +1434,7 @@ private void cleanupOldShardGens(RepositoryData existingRepositoryData, Reposito
14351434
@Override
14361435
public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) {
14371436
try {
1438-
return SNAPSHOT_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry, bigArrays);
1437+
return SNAPSHOT_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry);
14391438
} catch (NoSuchFileException ex) {
14401439
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
14411440
} catch (IOException | NotXContentException ex) {
@@ -1446,7 +1445,7 @@ public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) {
14461445
@Override
14471446
public Metadata getSnapshotGlobalMetadata(final SnapshotId snapshotId) {
14481447
try {
1449-
return GLOBAL_METADATA_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry, bigArrays);
1448+
return GLOBAL_METADATA_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry);
14501449
} catch (NoSuchFileException ex) {
14511450
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
14521451
} catch (IOException ex) {
@@ -1460,8 +1459,7 @@ public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, Sna
14601459
return INDEX_METADATA_FORMAT.read(
14611460
indexContainer(index),
14621461
repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index),
1463-
namedXContentRegistry,
1464-
bigArrays
1462+
namedXContentRegistry
14651463
);
14661464
} catch (NoSuchFileException e) {
14671465
throw new SnapshotMissingException(metadata.name(), snapshotId, e);
@@ -3156,7 +3154,7 @@ private static List<String> unusedBlobs(
31563154
*/
31573155
public BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) {
31583156
try {
3159-
return INDEX_SHARD_SNAPSHOT_FORMAT.read(shardContainer, snapshotId.getUUID(), namedXContentRegistry, bigArrays);
3157+
return INDEX_SHARD_SNAPSHOT_FORMAT.read(shardContainer, snapshotId.getUUID(), namedXContentRegistry);
31603158
} catch (NoSuchFileException ex) {
31613159
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
31623160
} catch (IOException ex) {
@@ -3188,7 +3186,7 @@ private Tuple<BlobStoreIndexShardSnapshots, String> buildBlobStoreIndexShardSnap
31883186
if (generation.equals(ShardGenerations.NEW_SHARD_GEN)) {
31893187
return new Tuple<>(BlobStoreIndexShardSnapshots.EMPTY, ShardGenerations.NEW_SHARD_GEN);
31903188
}
3191-
return new Tuple<>(INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, generation, namedXContentRegistry, bigArrays), generation);
3189+
return new Tuple<>(INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, generation, namedXContentRegistry), generation);
31923190
}
31933191
final Tuple<BlobStoreIndexShardSnapshots, Long> legacyIndex = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
31943192
return new Tuple<>(legacyIndex.v1(), String.valueOf(legacyIndex.v2()));
@@ -3207,8 +3205,7 @@ private Tuple<BlobStoreIndexShardSnapshots, Long> buildBlobStoreIndexShardSnapsh
32073205
final BlobStoreIndexShardSnapshots shardSnapshots = INDEX_SHARD_SNAPSHOTS_FORMAT.read(
32083206
shardContainer,
32093207
Long.toString(latest),
3210-
namedXContentRegistry,
3211-
bigArrays
3208+
namedXContentRegistry
32123209
);
32133210
return new Tuple<>(shardSnapshots, latest);
32143211
} else if (blobs.stream()

server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java

+166-32
Original file line numberDiff line numberDiff line change
@@ -11,40 +11,39 @@
1111
import org.apache.lucene.index.CorruptIndexException;
1212
import org.apache.lucene.index.IndexFormatTooNewException;
1313
import org.apache.lucene.index.IndexFormatTooOldException;
14-
import org.apache.lucene.store.ByteBuffersDataInput;
15-
import org.apache.lucene.store.ByteBuffersIndexInput;
16-
import org.apache.lucene.store.IndexInput;
14+
import org.apache.lucene.store.ChecksumIndexInput;
15+
import org.apache.lucene.store.InputStreamDataInput;
1716
import org.apache.lucene.store.OutputStreamIndexOutput;
18-
import org.apache.lucene.util.BytesRef;
1917
import org.elasticsearch.cluster.metadata.Metadata;
2018
import org.elasticsearch.core.CheckedConsumer;
2119
import org.elasticsearch.core.CheckedFunction;
20+
import org.elasticsearch.common.Numbers;
2221
import org.elasticsearch.common.blobstore.BlobContainer;
22+
import org.elasticsearch.common.bytes.BytesArray;
2323
import org.elasticsearch.common.bytes.BytesReference;
2424
import org.elasticsearch.common.compress.CompressorFactory;
25+
import org.elasticsearch.common.io.Streams;
2526
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
26-
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
2727
import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
2828
import org.elasticsearch.common.util.BigArrays;
2929
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
3030
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
3131
import org.elasticsearch.common.xcontent.ToXContent;
3232
import org.elasticsearch.common.xcontent.XContentBuilder;
3333
import org.elasticsearch.common.xcontent.XContentFactory;
34-
import org.elasticsearch.common.xcontent.XContentHelper;
3534
import org.elasticsearch.common.xcontent.XContentParser;
3635
import org.elasticsearch.common.xcontent.XContentType;
37-
import org.elasticsearch.core.internal.io.Streams;
3836
import org.elasticsearch.gateway.CorruptStateException;
3937
import org.elasticsearch.snapshots.SnapshotInfo;
4038

39+
import java.io.FilterInputStream;
4140
import java.io.IOException;
4241
import java.io.InputStream;
4342
import java.io.OutputStream;
44-
import java.util.Arrays;
4543
import java.util.HashMap;
4644
import java.util.Locale;
4745
import java.util.Map;
46+
import java.util.zip.CRC32;
4847

4948
/**
5049
* Snapshot metadata file format used in v2.0 and above
@@ -93,45 +92,180 @@ public ChecksumBlobStoreFormat(String codec, String blobNameFormat, CheckedFunct
9392
* @param name name to be translated into
9493
* @return parsed blob object
9594
*/
96-
public T read(BlobContainer blobContainer, String name, NamedXContentRegistry namedXContentRegistry, BigArrays bigArrays)
97-
throws IOException {
95+
public T read(BlobContainer blobContainer, String name, NamedXContentRegistry namedXContentRegistry) throws IOException {
9896
String blobName = blobName(name);
99-
try (
100-
ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
101-
InputStream in = blobContainer.readBlob(blobName)
102-
) {
103-
Streams.copy(in, out, false);
104-
return deserialize(blobName, namedXContentRegistry, out.bytes());
97+
try (InputStream in = blobContainer.readBlob(blobName)) {
98+
return deserialize(namedXContentRegistry, in);
10599
}
106100
}
107101

108102
public String blobName(String name) {
109103
return String.format(Locale.ROOT, blobNameFormat, name);
110104
}
111105

112-
public T deserialize(String blobName, NamedXContentRegistry namedXContentRegistry, BytesReference bytes) throws IOException {
113-
final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")";
106+
public T deserialize(NamedXContentRegistry namedXContentRegistry, InputStream input) throws IOException {
107+
final DeserializeMetaBlobInputStream deserializeMetaBlobInputStream = new DeserializeMetaBlobInputStream(input);
114108
try {
115-
final IndexInput indexInput = bytes.length() > 0
116-
? new ByteBuffersIndexInput(new ByteBuffersDataInput(Arrays.asList(BytesReference.toByteBuffers(bytes))), resourceDesc)
117-
: new ByteArrayIndexInput(resourceDesc, BytesRef.EMPTY_BYTES);
118-
CodecUtil.checksumEntireFile(indexInput);
119-
CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION);
120-
long filePointer = indexInput.getFilePointer();
121-
long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer;
109+
CodecUtil.checkHeader(new InputStreamDataInput(deserializeMetaBlobInputStream), codec, VERSION, VERSION);
110+
final InputStream wrappedStream;
111+
if (deserializeMetaBlobInputStream.nextBytesCompressed()) {
112+
wrappedStream = CompressorFactory.COMPRESSOR.threadLocalInputStream(deserializeMetaBlobInputStream);
113+
} else {
114+
wrappedStream = deserializeMetaBlobInputStream;
115+
}
116+
final T result;
122117
try (
123-
XContentParser parser = XContentHelper.createParser(
124-
namedXContentRegistry,
125-
LoggingDeprecationHandler.INSTANCE,
126-
bytes.slice((int) filePointer, (int) contentSize),
127-
XContentType.SMILE
128-
)
118+
XContentParser parser = XContentType.SMILE.xContent()
119+
.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, wrappedStream)
129120
) {
130-
return reader.apply(parser);
121+
result = reader.apply(parser);
131122
}
123+
deserializeMetaBlobInputStream.verifyFooter();
124+
return result;
132125
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
133126
// we trick this into a dedicated exception with the original stacktrace
134127
throw new CorruptStateException(ex);
128+
} catch (Exception e) {
129+
try {
130+
// drain stream fully and check whether the footer is corrupted
131+
Streams.consumeFully(deserializeMetaBlobInputStream);
132+
deserializeMetaBlobInputStream.verifyFooter();
133+
} catch (CorruptStateException cse) {
134+
cse.addSuppressed(e);
135+
throw cse;
136+
} catch (Exception ex) {
137+
e.addSuppressed(ex);
138+
}
139+
throw e;
140+
}
141+
}
142+
143+
/**
144+
* Wrapper input stream for deserializing blobs that come with a Lucene header and footer in a streaming manner. It manually manages
145+
* a read buffer to enable not reading into the last 16 bytes (the footer length) of the buffer via the standard read methods so that
146+
* a parser backed by this stream will only see the blob's body.
147+
*/
148+
private static final class DeserializeMetaBlobInputStream extends FilterInputStream {
149+
150+
// checksum updated with all but the last 8 bytes read from the wrapped stream
151+
private final CRC32 crc32 = new CRC32();
152+
153+
// Only the first buffer.length - 16 bytes are exposed by the read() methods; once the read position reaches 16 bytes from the end
154+
// of the buffer the remaining 16 bytes are moved to the start of the buffer and the rest of the buffer is filled from the stream.
155+
private final byte[] buffer = new byte[1024 * 8];
156+
157+
// the number of bytes in the buffer, in [0, buffer.length], equal to buffer.length unless the last fill hit EOF
158+
private int bufferCount;
159+
160+
// the current read position within the buffer, in [0, bufferCount - 16]
161+
private int bufferPos;
162+
163+
DeserializeMetaBlobInputStream(InputStream in) {
164+
super(in);
165+
}
166+
167+
@Override
168+
public int read() throws IOException {
169+
if (getAvailable() <= 0) {
170+
return -1;
171+
}
172+
return buffer[bufferPos++];
173+
}
174+
175+
@Override
176+
public int read(byte[] b, int off, int len) throws IOException {
177+
int remaining = len;
178+
int read = 0;
179+
while (remaining > 0) {
180+
final int r = doRead(b, off + read, remaining);
181+
if (r <= 0) {
182+
break;
183+
}
184+
read += r;
185+
remaining -= r;
186+
}
187+
if (len > 0 && remaining == len) {
188+
// nothing to read, EOF
189+
return -1;
190+
}
191+
return read;
192+
}
193+
194+
@Override
195+
public void close() throws IOException {
196+
// not closing the wrapped stream
197+
}
198+
199+
private int doRead(byte[] b, int off, int len) throws IOException {
200+
final int available = getAvailable();
201+
if (available < 0) {
202+
return -1;
203+
}
204+
final int read = Math.min(available, len);
205+
System.arraycopy(buffer, bufferPos, b, off, read);
206+
bufferPos += read;
207+
return read;
208+
}
209+
210+
/**
211+
* Verify footer of the bytes read by this stream the same way {@link CodecUtil#checkFooter(ChecksumIndexInput)} would.
212+
*
213+
* @throws CorruptStateException if footer is found to be corrupted
214+
*/
215+
void verifyFooter() throws CorruptStateException {
216+
if (bufferCount - bufferPos != CodecUtil.footerLength()) {
217+
throw new CorruptStateException(
218+
"should have consumed all but 16 bytes from the buffer but saw buffer pos ["
219+
+ bufferPos
220+
+ "] and count ["
221+
+ bufferCount
222+
+ "]"
223+
);
224+
}
225+
crc32.update(buffer, 0, bufferPos + 8);
226+
final int magicFound = Numbers.bytesToInt(buffer, bufferPos);
227+
if (magicFound != CodecUtil.FOOTER_MAGIC) {
228+
throw new CorruptStateException("unexpected footer magic [" + magicFound + "]");
229+
}
230+
final int algorithmFound = Numbers.bytesToInt(buffer, bufferPos + 4);
231+
if (algorithmFound != 0) {
232+
throw new CorruptStateException("unexpected algorithm [" + algorithmFound + "]");
233+
}
234+
final long checksum = crc32.getValue();
235+
final long checksumInFooter = Numbers.bytesToLong(buffer, bufferPos + 8);
236+
if (checksum != checksumInFooter) {
237+
throw new CorruptStateException("checksums do not match read [" + checksum + "] but expected [" + checksumInFooter + "]");
238+
}
239+
}
240+
241+
/**
242+
* @return true if the next bytes in this stream are compressed
243+
*/
244+
boolean nextBytesCompressed() {
245+
// we already have bytes buffered here because we verify the blob's header (far less than the 8k buffer size) before calling
246+
// this method
247+
assert bufferPos > 0 : "buffer position must be greater than 0 but was [" + bufferPos + "]";
248+
return CompressorFactory.COMPRESSOR.isCompressed(new BytesArray(buffer, bufferPos, bufferCount - bufferPos));
249+
}
250+
251+
/**
252+
* @return the number of bytes available in the buffer, possibly refilling the buffer if needed
253+
*/
254+
private int getAvailable() throws IOException {
255+
final int footerLen = CodecUtil.footerLength();
256+
if (bufferCount == 0) {
257+
// first read, fill the buffer
258+
bufferCount = Streams.readFully(in, buffer, 0, buffer.length);
259+
} else if (bufferPos == bufferCount - footerLen) {
260+
// crc and discard all but the last 16 bytes in the buffer that might be the footer bytes
261+
assert bufferCount >= footerLen;
262+
crc32.update(buffer, 0, bufferPos);
263+
System.arraycopy(buffer, bufferPos, buffer, 0, footerLen);
264+
bufferCount = footerLen + Streams.readFully(in, buffer, footerLen, buffer.length - footerLen);
265+
bufferPos = 0;
266+
}
267+
// bytes in the buffer minus 16 bytes that could be the footer
268+
return bufferCount - bufferPos - footerLen;
135269
}
136270
}
137271

0 commit comments

Comments
 (0)