16
16
17
17
package org .wikimedia .elasticsearch .swift .repositories .blobstore ;
18
18
19
+ import org .apache .commons .codec .digest .DigestUtils ;
19
20
import org .apache .logging .log4j .LogManager ;
20
21
import org .apache .logging .log4j .Logger ;
21
22
import org .elasticsearch .common .Nullable ;
37
38
import org .wikimedia .elasticsearch .swift .util .retry .WithTimeout ;
38
39
import org .wikimedia .elasticsearch .swift .repositories .SwiftRepository ;
39
40
40
- import java .io .BufferedInputStream ;
41
+ import java .io .ByteArrayInputStream ;
41
42
import java .io .ByteArrayOutputStream ;
42
- import java .io .IOException ;
43
43
import java .io .InputStream ;
44
+ import java .io .IOException ;
44
45
import java .nio .file .FileAlreadyExistsException ;
45
46
46
47
import java .nio .file .NoSuchFileException ;
@@ -299,22 +300,37 @@ private String buildKey(String blobName) {
299
300
}
300
301
301
302
/**
302
- * Fetch a given blob into a BufferedInputStream
303
+ * Fetch a given blob into memory, verify etag, and return InputStream.
303
304
* @param blobName The blob name to read
304
305
* @return a stream
305
306
*/
306
307
@ Override
307
308
public InputStream readBlob (final String blobName ) throws IOException {
309
+ String objectName = buildKey (blobName );
310
+
308
311
try {
309
312
return withTimeout ().retry (retryIntervalS , shortOperationTimeoutS , TimeUnit .SECONDS , () -> {
310
313
try {
311
- InputStream downloadStream = SwiftPerms .execThrows (() ->
312
- blobStore .getContainer ().getObject (buildKey (blobName )).downloadObjectAsInputStream ()
313
- );
314
- return new BufferedInputStream (downloadStream , (int ) blobStore .getBufferSizeInBytes ());
314
+ return SwiftPerms .execThrows (() -> {
315
+ StoredObject storedObject = blobStore .getContainer ().getObject (objectName );
316
+ InputStream rawInputStream = storedObject .downloadObjectAsInputStream ();
317
+ int contentLength = (int ) storedObject .getContentLength ();
318
+ String objectEtag = storedObject .getEtag ();
319
+ byte [] objectData = readAllBytes (rawInputStream , contentLength );
320
+ String dataEtag = DigestUtils .md5Hex (objectData );
321
+
322
+ if (!dataEtag .equals (objectEtag )) {
323
+ String message = "cannot read blob [" + objectName + "]: server etag [" + objectEtag +
324
+ "] does not match calculated etag [" + dataEtag + "]" ;
325
+ logger .warn (message );
326
+ throw new IOException (message );
327
+ }
328
+
329
+ return new ByteArrayInputStream (objectData );
330
+ });
315
331
}
316
332
catch (NotFoundException e ) {
317
- String message = "cannot read blob [" + buildKey ( blobName ) + "]" ;
333
+ String message = "cannot read blob [" + objectName + "]" ;
318
334
logger .warn (message );
319
335
NoSuchFileException e2 = new NoSuchFileException (message );
320
336
e2 .initCause (e );
@@ -326,7 +342,7 @@ public InputStream readBlob(final String blobName) throws IOException {
326
342
throw e ;
327
343
}
328
344
catch (Exception e ) {
329
- throw new BlobStoreException ("cannot read blob [" + buildKey ( blobName ) + "]" , e );
345
+ throw new BlobStoreException ("cannot read blob [" + objectName + "]" , e );
330
346
}
331
347
}
332
348
@@ -335,7 +351,8 @@ public void writeBlob(final String blobName,
335
351
final InputStream in ,
336
352
final long blobSize ,
337
353
boolean failIfAlreadyExists ) throws IOException {
338
- byte [] bytes = readAllBytes (in );
354
+ // async execution races against the InputStream closed in the caller. Read all data locally.
355
+ byte [] bytes = readAllBytes (in , -1 );
339
356
340
357
if (executor != null && allowConcurrentIO ) {
341
358
Future <Void > task = executor .submit (() -> {
@@ -350,16 +367,19 @@ public void writeBlob(final String blobName,
350
367
internalWriteBlob (blobName , bytes , failIfAlreadyExists );
351
368
}
352
369
353
- private byte [] readAllBytes (InputStream in ) throws IOException {
354
- final byte [] buffer = new byte [1024 ];
355
- ByteArrayOutputStream baos = new ByteArrayOutputStream (buffer .length );
356
- int read ;
370
+ private byte [] readAllBytes (InputStream in , int sizeHint ) throws IOException {
371
+ int bufferSize = (int ) blobStore .getBufferSizeInBytes ();
372
+ final byte [] buffer = new byte [bufferSize ];
357
373
358
- while ((read = in .read (buffer )) != -1 ) {
359
- baos .write (buffer , 0 , read );
360
- }
374
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream (sizeHint > 0 ? sizeHint : bufferSize )) {
375
+ int read ;
361
376
362
- return baos .toByteArray ();
377
+ while ((read = in .read (buffer )) != -1 ) {
378
+ baos .write (buffer , 0 , read );
379
+ }
380
+
381
+ return baos .toByteArray ();
382
+ }
363
383
}
364
384
365
385
private void internalWriteBlob (String blobName , byte [] bytes , boolean failIfAlreadyExists ) throws IOException {
0 commit comments