Skip to content

Commit 6dd2a2a

Browse files
Deserialize BlobStore Metadata Files in a Streaming Manner (#73149)
We were reading the full file contents up-front here because of the complexity of verifying the footer otherwise. This commit moves the logic for reading metadata blobs (that can become quite sizable in some cases) in a streaming manner by manually doing the footer verification as Lucene's utility methods don't allow for verification on top of a stream.
1 parent d5bf72a commit 6dd2a2a

File tree

6 files changed

+191
-68
lines changed

6 files changed

+191
-68
lines changed

server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.elasticsearch.client.Client;
1717
import org.elasticsearch.cluster.SnapshotsInProgress;
1818
import org.elasticsearch.common.unit.TimeValue;
19-
import org.elasticsearch.common.util.MockBigArrays;
2019
import org.elasticsearch.index.IndexNotFoundException;
2120
import org.elasticsearch.repositories.RepositoryData;
2221
import org.elasticsearch.repositories.ShardSnapshotResult;
@@ -677,7 +676,7 @@ private static BlobStoreIndexShardSnapshots readShardGeneration(BlobStoreReposit
677676
String generation) {
678677
return PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.supply(f,
679678
() -> BlobStoreRepository.INDEX_SHARD_SNAPSHOTS_FORMAT.read(repository.shardContainer(repositoryShardId.index(),
680-
repositoryShardId.shardId()), generation, NamedXContentRegistry.EMPTY, MockBigArrays.NON_RECYCLING_INSTANCE))));
679+
repositoryShardId.shardId()), generation, NamedXContentRegistry.EMPTY))));
681680
}
682681

683682
private static BlobStoreIndexShardSnapshot readShardSnapshot(BlobStoreRepository repository, RepositoryShardId repositoryShardId,

server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java

+8-8
Original file line numberDiff line numberDiff line change
@@ -862,8 +862,8 @@ private void writeUpdatedShardMetaDataAndComputeDeletes(Collection<SnapshotId> s
862862
for (String indexMetaGeneration : indexMetaGenerations) {
863863
executor.execute(ActionRunnable.supply(allShardCountsListener, () -> {
864864
try {
865-
return INDEX_METADATA_FORMAT.read(indexContainer, indexMetaGeneration, namedXContentRegistry,
866-
bigArrays).getNumberOfShards();
865+
return INDEX_METADATA_FORMAT.read(indexContainer, indexMetaGeneration, namedXContentRegistry
866+
).getNumberOfShards();
867867
} catch (Exception ex) {
868868
logger.warn(() -> new ParameterizedMessage(
869869
"[{}] [{}] failed to read metadata for index", indexMetaGeneration, indexId.getName()), ex);
@@ -1220,7 +1220,7 @@ private void cleanupOldShardGens(RepositoryData existingRepositoryData, Reposito
12201220
@Override
12211221
public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) {
12221222
try {
1223-
return SNAPSHOT_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry, bigArrays);
1223+
return SNAPSHOT_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry);
12241224
} catch (NoSuchFileException ex) {
12251225
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
12261226
} catch (IOException | NotXContentException ex) {
@@ -1231,7 +1231,7 @@ public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) {
12311231
@Override
12321232
public Metadata getSnapshotGlobalMetadata(final SnapshotId snapshotId) {
12331233
try {
1234-
return GLOBAL_METADATA_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry, bigArrays);
1234+
return GLOBAL_METADATA_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry);
12351235
} catch (NoSuchFileException ex) {
12361236
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
12371237
} catch (IOException ex) {
@@ -1243,7 +1243,7 @@ public Metadata getSnapshotGlobalMetadata(final SnapshotId snapshotId) {
12431243
public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) throws IOException {
12441244
try {
12451245
return INDEX_METADATA_FORMAT.read(indexContainer(index),
1246-
repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index), namedXContentRegistry, bigArrays);
1246+
repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index), namedXContentRegistry);
12471247
} catch (NoSuchFileException e) {
12481248
throw new SnapshotMissingException(metadata.name(), snapshotId, e);
12491249
}
@@ -2722,7 +2722,7 @@ private static List<String> unusedBlobs(Set<String> blobs, Set<String> surviving
27222722
*/
27232723
public BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) {
27242724
try {
2725-
return INDEX_SHARD_SNAPSHOT_FORMAT.read(shardContainer, snapshotId.getUUID(), namedXContentRegistry, bigArrays);
2725+
return INDEX_SHARD_SNAPSHOT_FORMAT.read(shardContainer, snapshotId.getUUID(), namedXContentRegistry);
27262726
} catch (NoSuchFileException ex) {
27272727
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
27282728
} catch (IOException ex) {
@@ -2748,7 +2748,7 @@ private Tuple<BlobStoreIndexShardSnapshots, String> buildBlobStoreIndexShardSnap
27482748
if (generation.equals(ShardGenerations.NEW_SHARD_GEN)) {
27492749
return new Tuple<>(BlobStoreIndexShardSnapshots.EMPTY, ShardGenerations.NEW_SHARD_GEN);
27502750
}
2751-
return new Tuple<>(INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, generation, namedXContentRegistry, bigArrays), generation);
2751+
return new Tuple<>(INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, generation, namedXContentRegistry), generation);
27522752
}
27532753
final Tuple<BlobStoreIndexShardSnapshots, Long> legacyIndex = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
27542754
return new Tuple<>(legacyIndex.v1(), String.valueOf(legacyIndex.v2()));
@@ -2765,7 +2765,7 @@ private Tuple<BlobStoreIndexShardSnapshots, Long> buildBlobStoreIndexShardSnapsh
27652765
long latest = latestGeneration(blobs);
27662766
if (latest >= 0) {
27672767
final BlobStoreIndexShardSnapshots shardSnapshots =
2768-
INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, Long.toString(latest), namedXContentRegistry, bigArrays);
2768+
INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, Long.toString(latest), namedXContentRegistry);
27692769
return new Tuple<>(shardSnapshots, latest);
27702770
} else if (blobs.stream().anyMatch(b -> b.startsWith(SNAPSHOT_PREFIX) || b.startsWith(INDEX_FILE_PREFIX)
27712771
|| b.startsWith(UPLOADED_DATA_BLOB_PREFIX))) {

server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java

+162-26
Original file line numberDiff line numberDiff line change
@@ -11,40 +11,39 @@
1111
import org.apache.lucene.index.CorruptIndexException;
1212
import org.apache.lucene.index.IndexFormatTooNewException;
1313
import org.apache.lucene.index.IndexFormatTooOldException;
14-
import org.apache.lucene.store.ByteBuffersDataInput;
15-
import org.apache.lucene.store.ByteBuffersIndexInput;
16-
import org.apache.lucene.store.IndexInput;
14+
import org.apache.lucene.store.ChecksumIndexInput;
15+
import org.apache.lucene.store.InputStreamDataInput;
1716
import org.apache.lucene.store.OutputStreamIndexOutput;
18-
import org.apache.lucene.util.BytesRef;
1917
import org.elasticsearch.cluster.metadata.Metadata;
2018
import org.elasticsearch.common.CheckedConsumer;
2119
import org.elasticsearch.common.CheckedFunction;
20+
import org.elasticsearch.common.Numbers;
2221
import org.elasticsearch.common.blobstore.BlobContainer;
22+
import org.elasticsearch.common.bytes.BytesArray;
2323
import org.elasticsearch.common.bytes.BytesReference;
2424
import org.elasticsearch.common.compress.CompressorFactory;
25+
import org.elasticsearch.common.io.Streams;
2526
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
26-
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
2727
import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
2828
import org.elasticsearch.common.util.BigArrays;
2929
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
3030
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
3131
import org.elasticsearch.common.xcontent.ToXContent;
3232
import org.elasticsearch.common.xcontent.XContentBuilder;
3333
import org.elasticsearch.common.xcontent.XContentFactory;
34-
import org.elasticsearch.common.xcontent.XContentHelper;
3534
import org.elasticsearch.common.xcontent.XContentParser;
3635
import org.elasticsearch.common.xcontent.XContentType;
37-
import org.elasticsearch.core.internal.io.Streams;
3836
import org.elasticsearch.gateway.CorruptStateException;
3937
import org.elasticsearch.snapshots.SnapshotInfo;
4038

39+
import java.io.FilterInputStream;
4140
import java.io.IOException;
4241
import java.io.InputStream;
4342
import java.io.OutputStream;
44-
import java.util.Arrays;
4543
import java.util.HashMap;
4644
import java.util.Locale;
4745
import java.util.Map;
46+
import java.util.zip.CRC32;
4847

4948
/**
5049
* Snapshot metadata file format used in v2.0 and above
@@ -93,37 +92,174 @@ public ChecksumBlobStoreFormat(String codec, String blobNameFormat, CheckedFunct
9392
* @param name name to be translated into
9493
* @return parsed blob object
9594
*/
96-
public T read(BlobContainer blobContainer, String name, NamedXContentRegistry namedXContentRegistry,
97-
BigArrays bigArrays) throws IOException {
95+
public T read(BlobContainer blobContainer, String name, NamedXContentRegistry namedXContentRegistry) throws IOException {
9896
String blobName = blobName(name);
99-
try (ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
100-
InputStream in = blobContainer.readBlob(blobName)) {
101-
Streams.copy(in, out, false);
102-
return deserialize(blobName, namedXContentRegistry, out.bytes());
97+
try (InputStream in = blobContainer.readBlob(blobName)) {
98+
return deserialize(namedXContentRegistry, in);
10399
}
104100
}
105101

106102
public String blobName(String name) {
107103
return String.format(Locale.ROOT, blobNameFormat, name);
108104
}
109105

110-
public T deserialize(String blobName, NamedXContentRegistry namedXContentRegistry, BytesReference bytes) throws IOException {
111-
final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")";
106+
public T deserialize(NamedXContentRegistry namedXContentRegistry, InputStream input) throws IOException {
107+
final DeserializeMetaBlobInputStream deserializeMetaBlobInputStream = new DeserializeMetaBlobInputStream(input);
112108
try {
113-
final IndexInput indexInput = bytes.length() > 0 ? new ByteBuffersIndexInput(
114-
new ByteBuffersDataInput(Arrays.asList(BytesReference.toByteBuffers(bytes))), resourceDesc)
115-
: new ByteArrayIndexInput(resourceDesc, BytesRef.EMPTY_BYTES);
116-
CodecUtil.checksumEntireFile(indexInput);
117-
CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION);
118-
long filePointer = indexInput.getFilePointer();
119-
long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer;
120-
try (XContentParser parser = XContentHelper.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE,
121-
bytes.slice((int) filePointer, (int) contentSize), XContentType.SMILE)) {
122-
return reader.apply(parser);
109+
CodecUtil.checkHeader(new InputStreamDataInput(deserializeMetaBlobInputStream), codec, VERSION, VERSION);
110+
final InputStream wrappedStream;
111+
if (deserializeMetaBlobInputStream.nextBytesCompressed()) {
112+
wrappedStream = CompressorFactory.COMPRESSOR.threadLocalInputStream(deserializeMetaBlobInputStream);
113+
} else {
114+
wrappedStream = deserializeMetaBlobInputStream;
123115
}
116+
final T result;
117+
try (XContentParser parser = XContentType.SMILE.xContent().createParser(
118+
namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, wrappedStream)) {
119+
result = reader.apply(parser);
120+
}
121+
deserializeMetaBlobInputStream.verifyFooter();
122+
return result;
124123
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
125124
// we trick this into a dedicated exception with the original stacktrace
126125
throw new CorruptStateException(ex);
126+
} catch (Exception e) {
127+
try {
128+
// drain stream fully and check whether the footer is corrupted
129+
Streams.consumeFully(deserializeMetaBlobInputStream);
130+
deserializeMetaBlobInputStream.verifyFooter();
131+
} catch (CorruptStateException cse) {
132+
cse.addSuppressed(e);
133+
throw cse;
134+
} catch (Exception ex) {
135+
e.addSuppressed(ex);
136+
}
137+
throw e;
138+
}
139+
}
140+
141+
/**
142+
* Wrapper input stream for deserializing blobs that come with a Lucene header and footer in a streaming manner. It manually manages
143+
* a read buffer to enable not reading into the last 16 bytes (the footer length) of the buffer via the standard read methods so that
144+
* a parser backed by this stream will only see the blob's body.
145+
*/
146+
private static final class DeserializeMetaBlobInputStream extends FilterInputStream {
147+
148+
// checksum updated with all but the last 8 bytes read from the wrapped stream
149+
private final CRC32 crc32 = new CRC32();
150+
151+
// Only the first buffer.length - 16 bytes are exposed by the read() methods; once the read position reaches 16 bytes from the end
152+
// of the buffer the remaining 16 bytes are moved to the start of the buffer and the rest of the buffer is filled from the stream.
153+
private final byte[] buffer = new byte[1024 * 8];
154+
155+
// the number of bytes in the buffer, in [0, buffer.length], equal to buffer.length unless the last fill hit EOF
156+
private int bufferCount;
157+
158+
// the current read position within the buffer, in [0, bufferCount - 16]
159+
private int bufferPos;
160+
161+
DeserializeMetaBlobInputStream(InputStream in) {
162+
super(in);
163+
}
164+
165+
@Override
166+
public int read() throws IOException {
167+
if (getAvailable() <= 0) {
168+
return -1;
169+
}
170+
return buffer[bufferPos++];
171+
}
172+
173+
@Override
174+
public int read(byte[] b, int off, int len) throws IOException {
175+
int remaining = len;
176+
int read = 0;
177+
while (remaining > 0) {
178+
final int r = doRead(b, off + read, remaining);
179+
if (r <= 0) {
180+
break;
181+
}
182+
read += r;
183+
remaining -= r;
184+
}
185+
if (len > 0 && remaining == len) {
186+
// nothing to read, EOF
187+
return -1;
188+
}
189+
return read;
190+
}
191+
192+
@Override
193+
public void close() throws IOException {
194+
// not closing the wrapped stream
195+
}
196+
197+
private int doRead(byte[] b, int off, int len) throws IOException {
198+
final int available = getAvailable();
199+
if (available < 0) {
200+
return -1;
201+
}
202+
final int read = Math.min(available, len);
203+
System.arraycopy(buffer, bufferPos, b, off, read);
204+
bufferPos += read;
205+
return read;
206+
}
207+
208+
/**
209+
* Verify footer of the bytes read by this stream the same way {@link CodecUtil#checkFooter(ChecksumIndexInput)} would.
210+
*
211+
* @throws CorruptStateException if footer is found to be corrupted
212+
*/
213+
void verifyFooter() throws CorruptStateException {
214+
if (bufferCount - bufferPos != CodecUtil.footerLength()) {
215+
throw new CorruptStateException(
216+
"should have consumed all but 16 bytes from the buffer but saw buffer pos [" + bufferPos + "] and count ["
217+
+ bufferCount + "]");
218+
}
219+
crc32.update(buffer, 0, bufferPos + 8);
220+
final int magicFound = Numbers.bytesToInt(buffer, bufferPos);
221+
if (magicFound != CodecUtil.FOOTER_MAGIC) {
222+
throw new CorruptStateException("unexpected footer magic [" + magicFound + "]");
223+
}
224+
final int algorithmFound = Numbers.bytesToInt(buffer, bufferPos + 4);
225+
if (algorithmFound != 0) {
226+
throw new CorruptStateException("unexpected algorithm [" + algorithmFound + "]");
227+
}
228+
final long checksum = crc32.getValue();
229+
final long checksumInFooter = Numbers.bytesToLong(buffer, bufferPos + 8);
230+
if (checksum != checksumInFooter) {
231+
throw new CorruptStateException("checksums do not match read [" + checksum + "] but expected [" + checksumInFooter + "]");
232+
}
233+
}
234+
235+
/**
236+
* @return true if the next bytes in this stream are compressed
237+
*/
238+
boolean nextBytesCompressed() {
239+
// we already have bytes buffered here because we verify the blob's header (far less than the 8k buffer size) before calling
240+
// this method
241+
assert bufferPos > 0 : "buffer position must be greater than 0 but was [" + bufferPos + "]";
242+
return CompressorFactory.COMPRESSOR.isCompressed(new BytesArray(buffer, bufferPos, bufferCount - bufferPos));
243+
}
244+
245+
/**
246+
* @return the number of bytes available in the buffer, possibly refilling the buffer if needed
247+
*/
248+
private int getAvailable() throws IOException {
249+
final int footerLen = CodecUtil.footerLength();
250+
if (bufferCount == 0) {
251+
// first read, fill the buffer
252+
bufferCount = Streams.readFully(in, buffer, 0, buffer.length);
253+
} else if (bufferPos == bufferCount - footerLen) {
254+
// crc and discard all but the last 16 bytes in the buffer that might be the footer bytes
255+
assert bufferCount >= footerLen;
256+
crc32.update(buffer, 0, bufferPos);
257+
System.arraycopy(buffer, bufferPos, buffer, 0, footerLen);
258+
bufferCount = footerLen + Streams.readFully(in, buffer, footerLen, buffer.length - footerLen);
259+
bufferPos = 0;
260+
}
261+
// bytes in the buffer minus 16 bytes that could be the footer
262+
return bufferCount - bufferPos - footerLen;
127263
}
128264
}
129265

0 commit comments

Comments
 (0)