diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java index c23267f8d1894..e2d9617d388eb 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CloneSnapshotIT.java @@ -16,7 +16,6 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.MockBigArrays; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.ShardSnapshotResult; @@ -677,7 +676,7 @@ private static BlobStoreIndexShardSnapshots readShardGeneration(BlobStoreReposit String generation) { return PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.supply(f, () -> BlobStoreRepository.INDEX_SHARD_SNAPSHOTS_FORMAT.read(repository.shardContainer(repositoryShardId.index(), - repositoryShardId.shardId()), generation, NamedXContentRegistry.EMPTY, MockBigArrays.NON_RECYCLING_INSTANCE)))); + repositoryShardId.shardId()), generation, NamedXContentRegistry.EMPTY)))); } private static BlobStoreIndexShardSnapshot readShardSnapshot(BlobStoreRepository repository, RepositoryShardId repositoryShardId, diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 38f01969619e6..d4c078c570af6 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -862,8 +862,8 @@ private void writeUpdatedShardMetaDataAndComputeDeletes(Collection s for (String indexMetaGeneration : indexMetaGenerations) { executor.execute(ActionRunnable.supply(allShardCountsListener, () -> { try { - return INDEX_METADATA_FORMAT.read(indexContainer, indexMetaGeneration, namedXContentRegistry, - bigArrays).getNumberOfShards(); + return INDEX_METADATA_FORMAT.read(indexContainer, indexMetaGeneration, namedXContentRegistry + ).getNumberOfShards(); } catch (Exception ex) { logger.warn(() -> new ParameterizedMessage( "[{}] [{}] failed to read metadata for index", indexMetaGeneration, indexId.getName()), ex); @@ -1220,7 +1220,7 @@ private void cleanupOldShardGens(RepositoryData existingRepositoryData, Reposito @Override public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) { try { - return SNAPSHOT_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry, bigArrays); + return SNAPSHOT_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry); } catch (NoSuchFileException ex) { throw new SnapshotMissingException(metadata.name(), snapshotId, ex); } catch (IOException | NotXContentException ex) { @@ -1231,7 +1231,7 @@ public SnapshotInfo getSnapshotInfo(final SnapshotId snapshotId) { @Override public Metadata getSnapshotGlobalMetadata(final SnapshotId snapshotId) { try { - return GLOBAL_METADATA_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry, bigArrays); + return GLOBAL_METADATA_FORMAT.read(blobContainer(), snapshotId.getUUID(), namedXContentRegistry); } catch (NoSuchFileException ex) { throw new SnapshotMissingException(metadata.name(), snapshotId, ex); } catch (IOException ex) { @@ -1243,7 +1243,7 @@ public Metadata getSnapshotGlobalMetadata(final SnapshotId snapshotId) { public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) throws IOException { try { return INDEX_METADATA_FORMAT.read(indexContainer(index), - repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index), namedXContentRegistry, bigArrays); + repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index), namedXContentRegistry); } catch (NoSuchFileException e) { throw new SnapshotMissingException(metadata.name(), snapshotId, e); } @@ -2722,7 +2722,7 @@ private static List unusedBlobs(Set blobs, Set surviving */ public BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) { try { - return INDEX_SHARD_SNAPSHOT_FORMAT.read(shardContainer, snapshotId.getUUID(), namedXContentRegistry, bigArrays); + return INDEX_SHARD_SNAPSHOT_FORMAT.read(shardContainer, snapshotId.getUUID(), namedXContentRegistry); } catch (NoSuchFileException ex) { throw new SnapshotMissingException(metadata.name(), snapshotId, ex); } catch (IOException ex) { @@ -2748,7 +2748,7 @@ private Tuple buildBlobStoreIndexShardSnap if (generation.equals(ShardGenerations.NEW_SHARD_GEN)) { return new Tuple<>(BlobStoreIndexShardSnapshots.EMPTY, ShardGenerations.NEW_SHARD_GEN); } - return new Tuple<>(INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, generation, namedXContentRegistry, bigArrays), generation); + return new Tuple<>(INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, generation, namedXContentRegistry), generation); } final Tuple legacyIndex = buildBlobStoreIndexShardSnapshots(blobs, shardContainer); return new Tuple<>(legacyIndex.v1(), String.valueOf(legacyIndex.v2())); @@ -2765,7 +2765,7 @@ private Tuple buildBlobStoreIndexShardSnapsh long latest = latestGeneration(blobs); if (latest >= 0) { final BlobStoreIndexShardSnapshots shardSnapshots = - INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, Long.toString(latest), namedXContentRegistry, bigArrays); + INDEX_SHARD_SNAPSHOTS_FORMAT.read(shardContainer, Long.toString(latest), namedXContentRegistry); return new Tuple<>(shardSnapshots, latest); } else if (blobs.stream().anyMatch(b -> b.startsWith(SNAPSHOT_PREFIX) || b.startsWith(INDEX_FILE_PREFIX) || b.startsWith(UPLOADED_DATA_BLOB_PREFIX))) { diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 8b41c8d0a2176..1ceaf6a74a2d1 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -11,19 +11,19 @@ import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.IndexFormatTooNewException; import org.apache.lucene.index.IndexFormatTooOldException; -import org.apache.lucene.store.ByteBuffersDataInput; -import org.apache.lucene.store.ByteBuffersIndexInput; -import org.apache.lucene.store.IndexInput; +import org.apache.lucene.store.ChecksumIndexInput; +import org.apache.lucene.store.InputStreamDataInput; import org.apache.lucene.store.OutputStreamIndexOutput; -import org.apache.lucene.util.BytesRef; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.CheckedFunction; +import org.elasticsearch.common.Numbers; import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.compress.CompressorFactory; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; -import org.elasticsearch.common.lucene.store.ByteArrayIndexInput; import org.elasticsearch.common.lucene.store.IndexOutputOutputStream; import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; @@ -31,20 +31,19 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; -import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; -import org.elasticsearch.core.internal.io.Streams; import org.elasticsearch.gateway.CorruptStateException; import org.elasticsearch.snapshots.SnapshotInfo; +import java.io.FilterInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.Arrays; import java.util.HashMap; import java.util.Locale; import java.util.Map; +import java.util.zip.CRC32; /** * Snapshot metadata file format used in v2.0 and above @@ -93,13 +92,10 @@ public ChecksumBlobStoreFormat(String codec, String blobNameFormat, CheckedFunct * @param name name to be translated into * @return parsed blob object */ - public T read(BlobContainer blobContainer, String name, NamedXContentRegistry namedXContentRegistry, - BigArrays bigArrays) throws IOException { + public T read(BlobContainer blobContainer, String name, NamedXContentRegistry namedXContentRegistry) throws IOException { String blobName = blobName(name); - try (ReleasableBytesStreamOutput out = new ReleasableBytesStreamOutput(bigArrays); - InputStream in = blobContainer.readBlob(blobName)) { - Streams.copy(in, out, false); - return deserialize(blobName, namedXContentRegistry, out.bytes()); + try (InputStream in = blobContainer.readBlob(blobName)) { + return deserialize(namedXContentRegistry, in); } } @@ -107,23 +103,163 @@ public String blobName(String name) { return String.format(Locale.ROOT, blobNameFormat, name); } - public T deserialize(String blobName, NamedXContentRegistry namedXContentRegistry, BytesReference bytes) throws IOException { - final String resourceDesc = "ChecksumBlobStoreFormat.readBlob(blob=\"" + blobName + "\")"; + public T deserialize(NamedXContentRegistry namedXContentRegistry, InputStream input) throws IOException { + final DeserializeMetaBlobInputStream deserializeMetaBlobInputStream = new DeserializeMetaBlobInputStream(input); try { - final IndexInput indexInput = bytes.length() > 0 ? new ByteBuffersIndexInput( - new ByteBuffersDataInput(Arrays.asList(BytesReference.toByteBuffers(bytes))), resourceDesc) - : new ByteArrayIndexInput(resourceDesc, BytesRef.EMPTY_BYTES); - CodecUtil.checksumEntireFile(indexInput); - CodecUtil.checkHeader(indexInput, codec, VERSION, VERSION); - long filePointer = indexInput.getFilePointer(); - long contentSize = indexInput.length() - CodecUtil.footerLength() - filePointer; - try (XContentParser parser = XContentHelper.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, - bytes.slice((int) filePointer, (int) contentSize), XContentType.SMILE)) { - return reader.apply(parser); + CodecUtil.checkHeader(new InputStreamDataInput(deserializeMetaBlobInputStream), codec, VERSION, VERSION); + final InputStream wrappedStream; + if (deserializeMetaBlobInputStream.nextBytesCompressed()) { + wrappedStream = CompressorFactory.COMPRESSOR.threadLocalInputStream(deserializeMetaBlobInputStream); + } else { + wrappedStream = deserializeMetaBlobInputStream; } + final T result; + try (XContentParser parser = XContentType.SMILE.xContent().createParser( + namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, wrappedStream)) { + result = reader.apply(parser); + } + deserializeMetaBlobInputStream.verifyFooter(); + return result; } catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) { // we trick this into a dedicated exception with the original stacktrace throw new CorruptStateException(ex); + } catch (Exception e) { + try { + // drain stream fully and check whether the footer is corrupted + Streams.consumeFully(deserializeMetaBlobInputStream); + deserializeMetaBlobInputStream.verifyFooter(); + } catch (CorruptStateException cse) { + cse.addSuppressed(e); + throw cse; + } catch (Exception ex) { + e.addSuppressed(ex); + } + throw e; + } + } + + /** + * Wrapper input stream for deserializing blobs that come with a Lucene header and footer in a streaming manner. It manually manages + * a read buffer to enable not reading into the last 16 bytes (the footer length) of the buffer via the standard read methods so that + * a parser backed by this stream will only see the blob's body. + */ + private static final class DeserializeMetaBlobInputStream extends FilterInputStream { + + // checksum updated with all but the last 8 bytes read from the wrapped stream + private final CRC32 crc32 = new CRC32(); + + // Only the first buffer.length - 16 bytes are exposed by the read() methods; once the read position reaches 16 bytes from the end + // of the buffer the remaining 16 bytes are moved to the start of the buffer and the rest of the buffer is filled from the stream. + private final byte[] buffer = new byte[1024 * 8]; + + // the number of bytes in the buffer, in [0, buffer.length], equal to buffer.length unless the last fill hit EOF + private int bufferCount; + + // the current read position within the buffer, in [0, bufferCount - 16] + private int bufferPos; + + DeserializeMetaBlobInputStream(InputStream in) { + super(in); + } + + @Override + public int read() throws IOException { + if (getAvailable() <= 0) { + return -1; + } + return buffer[bufferPos++]; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int remaining = len; + int read = 0; + while (remaining > 0) { + final int r = doRead(b, off + read, remaining); + if (r <= 0) { + break; + } + read += r; + remaining -= r; + } + if (len > 0 && remaining == len) { + // nothing to read, EOF + return -1; + } + return read; + } + + @Override + public void close() throws IOException { + // not closing the wrapped stream + } + + private int doRead(byte[] b, int off, int len) throws IOException { + final int available = getAvailable(); + if (available < 0) { + return -1; + } + final int read = Math.min(available, len); + System.arraycopy(buffer, bufferPos, b, off, read); + bufferPos += read; + return read; + } + + /** + * Verify footer of the bytes read by this stream the same way {@link CodecUtil#checkFooter(ChecksumIndexInput)} would. + * + * @throws CorruptStateException if footer is found to be corrupted + */ + void verifyFooter() throws CorruptStateException { + if (bufferCount - bufferPos != CodecUtil.footerLength()) { + throw new CorruptStateException( + "should have consumed all but 16 bytes from the buffer but saw buffer pos [" + bufferPos + "] and count [" + + bufferCount + "]"); + } + crc32.update(buffer, 0, bufferPos + 8); + final int magicFound = Numbers.bytesToInt(buffer, bufferPos); + if (magicFound != CodecUtil.FOOTER_MAGIC) { + throw new CorruptStateException("unexpected footer magic [" + magicFound + "]"); + } + final int algorithmFound = Numbers.bytesToInt(buffer, bufferPos + 4); + if (algorithmFound != 0) { + throw new CorruptStateException("unexpected algorithm [" + algorithmFound + "]"); + } + final long checksum = crc32.getValue(); + final long checksumInFooter = Numbers.bytesToLong(buffer, bufferPos + 8); + if (checksum != checksumInFooter) { + throw new CorruptStateException("checksums do not match read [" + checksum + "] but expected [" + checksumInFooter + "]"); + } + } + + /** + * @return true if the next bytes in this stream are compressed + */ + boolean nextBytesCompressed() { + // we already have bytes buffered here because we verify the blob's header (far less than the 8k buffer size) before calling + // this method + assert bufferPos > 0 : "buffer position must be greater than 0 but was [" + bufferPos + "]"; + return CompressorFactory.COMPRESSOR.isCompressed(new BytesArray(buffer, bufferPos, bufferCount - bufferPos)); + } + + /** + * @return the number of bytes available in the buffer, possibly refilling the buffer if needed + */ + private int getAvailable() throws IOException { + final int footerLen = CodecUtil.footerLength(); + if (bufferCount == 0) { + // first read, fill the buffer + bufferCount = Streams.readFully(in, buffer, 0, buffer.length); + } else if (bufferPos == bufferCount - footerLen) { + // crc and discard all but the last 16 bytes in the buffer that might be the footer bytes + assert bufferCount >= footerLen; + crc32.update(buffer, 0, bufferPos); + System.arraycopy(buffer, bufferPos, buffer, 0, footerLen); + bufferCount = footerLen + Streams.readFully(in, buffer, footerLen, buffer.length - footerLen); + bufferPos = 0; + } + // bytes in the buffer minus 16 bytes that could be the footer + return bufferCount - bufferPos - footerLen; } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java index 37f646900868c..9e7f7d1e102ec 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentParserUtils; import org.elasticsearch.index.translog.BufferedChecksumStreamOutput; import org.elasticsearch.repositories.blobstore.ChecksumBlobStoreFormat; import org.elasticsearch.test.ESTestCase; @@ -31,7 +32,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.Map; -import static org.hamcrest.Matchers.containsString; + import static org.hamcrest.Matchers.greaterThan; public class BlobStoreFormatTests extends ESTestCase { @@ -58,20 +59,9 @@ public static BlobObj fromXContent(XContentParser parser) throws IOException { } if (token == XContentParser.Token.START_OBJECT) { while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { - if (token != XContentParser.Token.FIELD_NAME) { - throw new ElasticsearchParseException("unexpected token [{}]", token); - } - String currentFieldName = parser.currentName(); - token = parser.nextToken(); - if (token.isValue()) { - if ("text" .equals(currentFieldName)) { - text = parser.text(); - } else { - throw new ElasticsearchParseException("unexpected field [{}]", currentFieldName); - } - } else { - throw new ElasticsearchParseException("unexpected token [{}]", token); - } + XContentParserUtils.ensureFieldName(parser, token, "text"); + XContentParserUtils.ensureExpectedToken(XContentParser.Token.VALUE_STRING, parser.nextToken(), parser); + text = parser.text(); } } if (text == null) { @@ -93,15 +83,16 @@ public void testBlobStoreOperations() throws IOException { ChecksumBlobStoreFormat checksumSMILE = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent); // Write blobs in different formats - checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile", false, MockBigArrays.NON_RECYCLING_INSTANCE); - checksumSMILE.write(new BlobObj("checksum smile compressed"), blobContainer, "check-smile-comp", true, + final String randomText = randomAlphaOfLengthBetween(0, 1024 * 8 * 3); + final String normalText = "checksum smile: " + randomText; + checksumSMILE.write(new BlobObj(normalText), blobContainer, "check-smile", false, MockBigArrays.NON_RECYCLING_INSTANCE); + final String compressedText = "checksum smile compressed: " + randomText; + checksumSMILE.write(new BlobObj(compressedText), blobContainer, "check-smile-comp", true, MockBigArrays.NON_RECYCLING_INSTANCE); // Assert that all checksum blobs can be read - assertEquals(checksumSMILE.read(blobContainer, "check-smile", xContentRegistry(), MockBigArrays.NON_RECYCLING_INSTANCE).getText(), - "checksum smile"); - assertEquals(checksumSMILE.read(blobContainer, "check-smile-comp", xContentRegistry(), - MockBigArrays.NON_RECYCLING_INSTANCE).getText(), "checksum smile compressed"); + assertEquals(normalText, checksumSMILE.read(blobContainer, "check-smile", xContentRegistry()).getText()); + assertEquals(compressedText, checksumSMILE.read(blobContainer, "check-smile-comp", xContentRegistry()).getText()); } public void testCompressionIsApplied() throws IOException { @@ -127,16 +118,14 @@ public void testBlobCorruption() throws IOException { BlobObj blobObj = new BlobObj(testString); ChecksumBlobStoreFormat checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent); checksumFormat.write(blobObj, blobContainer, "test-path", randomBoolean(), MockBigArrays.NON_RECYCLING_INSTANCE); - assertEquals(checksumFormat.read(blobContainer, "test-path", xContentRegistry(), MockBigArrays.NON_RECYCLING_INSTANCE).getText(), + assertEquals(checksumFormat.read(blobContainer, "test-path", xContentRegistry()).getText(), testString); randomCorruption(blobContainer, "test-path"); try { - checksumFormat.read(blobContainer, "test-path", xContentRegistry(), MockBigArrays.NON_RECYCLING_INSTANCE); + checksumFormat.read(blobContainer, "test-path", xContentRegistry()); fail("Should have failed due to corruption"); - } catch (ElasticsearchCorruptionException ex) { - assertThat(ex.getMessage(), containsString("test-path")); - } catch (EOFException ex) { - // This can happen if corrupt the byte length + } catch (ElasticsearchCorruptionException | EOFException ex) { + // expected exceptions from random byte corruption } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoBlobSerializationTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoBlobSerializationTests.java index 7d99faac00073..c785e1803f45a 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoBlobSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotInfoBlobSerializationTests.java @@ -35,7 +35,7 @@ protected SnapshotInfo copyInstance(SnapshotInfo instance, Version version) thro final PlainActionFuture future = new PlainActionFuture<>(); BlobStoreRepository.SNAPSHOT_FORMAT.serialize(instance, "test", randomBoolean(), BigArrays.NON_RECYCLING_INSTANCE, bytes -> ActionListener.completeWith(future, - () -> BlobStoreRepository.SNAPSHOT_FORMAT.deserialize("test", NamedXContentRegistry.EMPTY, bytes))); + () -> BlobStoreRepository.SNAPSHOT_FORMAT.deserialize(NamedXContentRegistry.EMPTY, bytes.streamInput()))); return future.actionGet(); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java index 8e4447bde94b9..36d20526b9904 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -17,7 +17,6 @@ import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.blobstore.DeleteResult; import org.elasticsearch.common.blobstore.support.PlainBlobMetadata; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.util.Maps; @@ -329,11 +328,11 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize, b if (basePath().buildAsString().equals(path().buildAsString())) { try { final SnapshotInfo updatedInfo = BlobStoreRepository.SNAPSHOT_FORMAT.deserialize( - blobName, namedXContentRegistry, new BytesArray(data)); + namedXContentRegistry, new ByteArrayInputStream(data)); // If the existing snapshotInfo differs only in the timestamps it stores, then the overwrite is not // a problem and could be the result of a correctly handled master failover. - final SnapshotInfo existingInfo = SNAPSHOT_FORMAT.deserialize( - blobName, namedXContentRegistry, Streams.readFully(readBlob(blobName))); + final SnapshotInfo existingInfo = + SNAPSHOT_FORMAT.deserialize(namedXContentRegistry, readBlob(blobName)); assertThat(existingInfo.snapshotId(), equalTo(updatedInfo.snapshotId())); assertThat(existingInfo.reason(), equalTo(updatedInfo.reason())); assertThat(existingInfo.state(), equalTo(updatedInfo.state()));