Skip to content

Deserialize BlobStore Metadata Files in a Streaming Manner #73149

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.ShardSnapshotResult;
Expand Down Expand Up @@ -677,7 +676,7 @@ private static BlobStoreIndexShardSnapshots readShardGeneration(BlobStoreReposit
String generation) {
return PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.supply(f,
() -> BlobStoreRepository.INDEX_SHARD_SNAPSHOTS_FORMAT.read(repository.shardContainer(repositoryShardId.index(),
repositoryShardId.shardId()), generation, NamedXContentRegistry.EMPTY, MockBigArrays.NON_RECYCLING_INSTANCE))));
repositoryShardId.shardId()), generation, NamedXContentRegistry.EMPTY))));
}

private static BlobStoreIndexShardSnapshot readShardSnapshot(BlobStoreRepository repository, RepositoryShardId repositoryShardId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -862,8 +862,8 @@ private void writeUpdatedShardMetaDataAndComputeDeletes(Collection<SnapshotId> s
for (String indexMetaGeneration : indexMetaGenerations) {
executor.execute(ActionRunnable.supply(allShardCountsListener, () -> {
try {
return INDEX_METADATA_FORMAT.read(indexContainer, indexMetaGeneration, namedXContentRegistry,
bigArrays).getNumberOfShards();
return INDEX_METADATA_FORMAT.read(indexContainer, indexMetaGeneration, namedXContentRegistry
).getNumberOfShards();
} catch (Exception ex) {
logger.warn(() -> new ParameterizedMessage(
"[{}] [{}] failed to read metadata for index", indexMetaGeneration, indexId.getName()), ex);
Expand Down Expand Up @@ -1220,7 +1220,7 @@ private void cleanupOldShardGens(RepositoryData existingRepositoryData, Reposito
@Override
public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) {
try {
return SNAPSHOT_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry, bigArrays);
return SNAPSHOT_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry);
} catch (NoSuchFileException ex) {
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
} catch (IOException | NotXContentException ex) {
Expand All @@ -1231,7 +1231,7 @@ public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) {
@Override
public Metadata getSnapshotGlobalMetadata(final SnapshotId snapshotId) {
try {
return GLOBAL_METADATA_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry, bigArrays);
return GLOBAL_METADATA_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry);
} catch (NoSuchFileException ex) {
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
} catch (IOException ex) {
Expand All @@ -1243,7 +1243,7 @@ public Metadata getSnapshotGlobalMetadata(final SnapshotId snapshotId) {
public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) throws IOException {
try {
return INDEX_METADATA_FORMAT.read(indexContainer(index),
repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index), namedXContentRegistry, bigArrays);
repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index), namedXContentRegistry);
} catch (NoSuchFileException e) {
throw new SnapshotMissingException(metadata.name(), snapshotId, e);
}
Expand Down Expand Up @@ -2722,7 +2722,7 @@ private static List<String> unusedBlobs(Set<String> blobs, Set<String> surviving
*/
public BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) {
try {
return INDEX_SHARD_SNAPSHOT_FORMAT.read(shardContainer, snapshotId.getUUID(), namedXContentRegistry, bigArrays);
return INDEX_SHARD_SNAPSHOT_FORMAT.read(shardContainer, snapshotId.getUUID(), namedXContentRegistry);
} catch (NoSuchFileException ex) {
throw new SnapshotMissingException(metadata.name(), snapshotId, ex);
} catch (IOException ex) {
Expand All @@ -2748,7 +2748,7 @@ private Tuple<BlobStoreIndexShardSnapshots, String> buildBlobStoreIndexShardSnap
if (generation.equals(ShardGenerations.NEW_SHARD_GEN)) {
return new Tuple<>(BlobStoreIndexShardSnapshots.EMPTY, ShardGenerations.NEW_SHARD_GEN);
}
return new Tuple<>(INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, generation, namedXContentRegistry, bigArrays), generation);
return new Tuple<>(INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, generation, namedXContentRegistry), generation);
}
final Tuple<BlobStoreIndexShardSnapshots, Long> legacyIndex = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
return new Tuple<>(legacyIndex.v1(), String.valueOf(legacyIndex.v2()));
Expand All @@ -2765,7 +2765,7 @@ private Tuple<BlobStoreIndexShardSnapshots, Long> buildBlobStoreIndexShardSnapsh
long latest = latestGeneration(blobs);
if (latest >= 0) {
final BlobStoreIndexShardSnapshots shardSnapshots =
INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, Long.toString(latest), namedXContentRegistry, bigArrays);
INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, Long.toString(latest), namedXContentRegistry);
return new Tuple<>(shardSnapshots, latest);
} else if (blobs.stream().anyMatch(b -> b.startsWith(SNAPSHOT_PREFIX) || b.startsWith(INDEX_FILE_PREFIX)
|| b.startsWith(UPLOADED_DATA_BLOB_PREFIX))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,40 +11,39 @@
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.store.ByteBuffersDataInput;
import org.apache.lucene.store.ByteBuffersIndexInput;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.InputStreamDataInput;
import org.apache.lucene.store.OutputStreamIndexOutput;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Numbers;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput;
import org.elasticsearch.common.lucene.store.ByteArrayIndexInput;
import org.elasticsearch.common.lucene.store.IndexOutputOutputStream;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.Streams;
import org.elasticsearch.gateway.CorruptStateException;
import org.elasticsearch.snapshots.SnapshotInfo;

import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.zip.CRC32;

/**
* Snapshot metadata file format used in v2.0 and above
Expand Down Expand Up @@ -93,40 +92,172 @@ public ChecksumBlobStoreFormat(String codec, String blobNameFormat, CheckedFunct
* @param name name to be translated into
* @return parsed blob object
*/
public T read(BlobContainer blobContainer, String name, NamedXContentRegistry namedXContentRegistry,
BigArrays bigArrays) throws IOException {
public T read(BlobContainer blobContainer, String name, NamedXContentRegistry namedXContentRegistry) throws IOException {
String blobName = blobName(name);
try (ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays);
InputStream in = blobContainer.readBlob(blobName)) {
Streams.copy(in, out, false);
return deserialize(blobName, namedXContentRegistry, out.bytes());
try (InputStream in = blobContainer.readBlob(blobName)) {
return deserialize(namedXContentRegistry, in);
}
}

public String blobName(String name) {
return String.format(Locale.ROOT, blobNameFormat, name);
}

public T deserialize(String blobName, NamedXContentRegistry namedXContentRegistry, BytesReference bytes) throws IOException {
final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")";
public T deserialize(NamedXContentRegistry namedXContentRegistry, InputStream input) throws IOException {
final DeserializeMetaBlobInputStream deserializeMetaBlobInputStream = new DeserializeMetaBlobInputStream(input);

CodecUtil.checkHeader(new InputStreamDataInput(deserializeMetaBlobInputStream), codec, VERSION, VERSION);
final InputStream wrappedStream;
if (deserializeMetaBlobInputStream.nextBytesCompressed()) {
wrappedStream = CompressorFactory.COMPRESSOR.threadLocalInputStream(deserializeMetaBlobInputStream);
} else {
wrappedStream = deserializeMetaBlobInputStream;
}
try {
final IndexInput indexInput = bytes.length() > 0 ? new ByteBuffersIndexInput(
new ByteBuffersDataInput(Arrays.asList(BytesReference.toByteBuffers(bytes))), resourceDesc)
: new ByteArrayIndexInput(resourceDesc, BytesRef.EMPTY_BYTES);
CodecUtil.checksumEntireFile(indexInput);
CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION);
long filePointer = indexInput.getFilePointer();
long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer;
try (XContentParser parser = XContentHelper.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE,
bytes.slice((int) filePointer, (int) contentSize), XContentType.SMILE)) {
return reader.apply(parser);
final T result;
try (XContentParser parser = XContentType.SMILE.xContent().createParser(
namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, wrappedStream)) {
result = reader.apply(parser);
}
deserializeMetaBlobInputStream.verifyFooter();
return result;
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
// we trick this into a dedicated exception with the original stacktrace
throw new CorruptStateException(ex);
}
}

/**
* Wrapper input stream for deserializing blobs that come with a Lucene header and footer in a streaming manner. It manually manages
* a read buffer to enable not reading into the last 16 bytes (the footer length) of the buffer via the standard read methods so that
* a parser backed by this stream will only see the blob's body.
*/
private static final class DeserializeMetaBlobInputStream extends FilterInputStream {

// checksum updated with all but the last 8 bytes read from the wrapped stream
private final CRC32 crc32 = new CRC32();

// Only the first buffer.length - 16 bytes are exposed by the read() methods; once the read position reaches 16 bytes from the end
// of the buffer the remaining 16 bytes are moved to the start of the buffer and the rest of the buffer is filled from the stream.
private final byte[] buffer = new byte[1024 * 8];

// the number of bytes in the buffer, in [0, buffer.length], equal to buffer.length unless the last fill hit EOF
private int bufferCount;

// the current read position within the buffer, in [0, bufferCount - 16]
private int bufferPos;

DeserializeMetaBlobInputStream(InputStream in) {
super(in);
}

@Override
public int read() throws IOException {
if (buffered() <= 0) {
fill();
}
if (buffered() <= 0) {
return -1;
}
return buffer[bufferPos++];
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
int remaining = len;
int read = 0;
while (remaining > 0) {
final int r = doRead(b, off + read, remaining);
if (r <= 0) {
break;
}
read += r;
remaining -= r;
}
if (len > 0 && remaining == len) {
// nothing to read, EOF
return -1;
}
return read;
}

@Override
public void close() throws IOException {
// not closing the wrapped stream
}

private int doRead(byte[] b, int off, int len) throws IOException {
if (buffered() <= 0) {
fill();
}
final int available = buffered();
if (available < 0) {
return -1;
}
final int read = Math.min(available, len);
System.arraycopy(buffer, bufferPos, b, off, read);
bufferPos += read;
return read;
}

/**
* Verify footer of the bytes read by this stream the same way {@link CodecUtil#checkFooter(ChecksumIndexInput)} would.
*
* @throws CorruptStateException if footer is found to be corrupted
*/
void verifyFooter() throws CorruptStateException {
if (bufferCount - bufferPos != CodecUtil.footerLength()) {
throw new CorruptStateException(
"should have consumed all but 16 bytes from the buffer but saw buffer pos [" + bufferPos + "] and count ["
+ bufferCount + "]");
}
crc32.update(buffer, 0, bufferPos + 8);
final int magicFound = Numbers.bytesToInt(buffer, bufferPos);
if (magicFound != CodecUtil.FOOTER_MAGIC) {
throw new CorruptStateException("unexpected footer magic [" + magicFound + "]");
}
final int algorithmFound = Numbers.bytesToInt(buffer, bufferPos + 4);
if (algorithmFound != 0) {
throw new CorruptStateException("unexpected algorithm [" + algorithmFound + "]");
}
final long checksum = crc32.getValue();
final long checksumInFooter = Numbers.bytesToLong(buffer, bufferPos + 8);
if (checksum != checksumInFooter) {
throw new CorruptStateException("checksums do not match read [" + checksum + "] but expected [" + checksumInFooter + "]");
}
}

/**
* @return true if the next bytes in this stream are compressed
*/
boolean nextBytesCompressed() {
// we already have bytes buffered here because we verify the blob's header (far less than the 8k buffer size) before calling
// this method
assert bufferPos > 0 : "buffer position must be greater than 0 but was [" + bufferPos + "]";
return CompressorFactory.COMPRESSOR.isCompressed(new BytesArray(buffer, bufferPos, bufferCount - bufferPos));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe assert that bufferPos > 0?

}

private int buffered() {
// bytes in the buffer minus 16 bytes that could be the footer
return bufferCount - bufferPos - CodecUtil.footerLength();
}

private void fill() throws IOException {
if (bufferCount == 0) {
bufferCount = Streams.readFully(in, buffer, 0, buffer.length);
} else {
// crc and discard all but the last 16 bytes in the buffer that might be the footer bytes
final int footerLen = CodecUtil.footerLength();
assert bufferCount >= footerLen;
assert bufferPos == bufferCount - footerLen;
crc32.update(buffer, 0, bufferPos);
System.arraycopy(buffer, bufferPos, buffer, 0, footerLen);
bufferCount = footerLen + Streams.readFully(in, buffer, footerLen, buffer.length - footerLen);
bufferPos = 0;
}
}
}

/**
* Writes blob with resolving the blob name using {@link #blobName} method.
* <p>
Expand Down
Loading