33
33
import org .elasticsearch .common .UUIDs ;
34
34
import org .elasticsearch .common .blobstore .BlobContainer ;
35
35
import org .elasticsearch .common .blobstore .BlobPath ;
36
+ import org .elasticsearch .common .bytes .BytesArray ;
36
37
import org .elasticsearch .common .bytes .BytesReference ;
37
38
import org .elasticsearch .common .collect .Tuple ;
38
39
import org .elasticsearch .common .io .Streams ;
52
53
import org .junit .Before ;
53
54
import org .threeten .bp .Duration ;
54
55
55
- import java .io .ByteArrayOutputStream ;
56
56
import java .io .IOException ;
57
57
import java .io .InputStream ;
58
58
import java .net .InetAddress ;
@@ -323,14 +323,16 @@ public void testWriteLargeBlob() throws IOException {
323
323
logger .debug ("starting with resumable upload id [{}]" , sessionUploadId .get ());
324
324
325
325
httpServer .createContext ("/upload/storage/v1/b/bucket/o" , safeHandler (exchange -> {
326
+ final BytesReference requestBody = Streams .readFully (exchange .getRequestBody ());
327
+
326
328
final Map <String , String > params = new HashMap <>();
327
329
RestUtils .decodeQueryString (exchange .getRequestURI ().getQuery (), 0 , params );
328
330
assertThat (params .get ("uploadType" ), equalTo ("resumable" ));
329
331
330
332
if ("POST" .equals (exchange .getRequestMethod ())) {
331
333
assertThat (params .get ("name" ), equalTo ("write_large_blob" ));
332
334
if (countInits .decrementAndGet () <= 0 ) {
333
- byte [] response = Streams . readFully ( exchange . getRequestBody ()) .utf8ToString ().getBytes (UTF_8 );
335
+ byte [] response = requestBody .utf8ToString ().getBytes (UTF_8 );
334
336
exchange .getResponseHeaders ().add ("Content-Type" , "application/json" );
335
337
exchange .getResponseHeaders ().add ("Location" , httpServerUrl () +
336
338
"/upload/storage/v1/b/bucket/o?uploadType=resumable&upload_id=" + sessionUploadId .get ());
@@ -348,7 +350,6 @@ public void testWriteLargeBlob() throws IOException {
348
350
if (uploadId .equals (sessionUploadId .get ()) == false ) {
349
351
logger .debug ("session id [{}] is gone" , uploadId );
350
352
assertThat (wrongChunk , greaterThan (0 ));
351
- Streams .readFully (exchange .getRequestBody ());
352
353
exchange .sendResponseHeaders (HttpStatus .SC_GONE , -1 );
353
354
return ;
354
355
}
@@ -367,7 +368,6 @@ public void testWriteLargeBlob() throws IOException {
367
368
countInits .set (nbErrors );
368
369
countUploads .set (nbErrors * totalChunks );
369
370
370
- Streams .readFully (exchange .getRequestBody ());
371
371
exchange .sendResponseHeaders (HttpStatus .SC_GONE , -1 );
372
372
return ;
373
373
}
@@ -377,14 +377,12 @@ public void testWriteLargeBlob() throws IOException {
377
377
assertTrue (Strings .hasLength (range ));
378
378
379
379
if (countUploads .decrementAndGet () % 2 == 0 ) {
380
- final ByteArrayOutputStream requestBody = new ByteArrayOutputStream ();
381
- final long bytesRead = Streams .copy (exchange .getRequestBody (), requestBody );
382
- assertThat (Math .toIntExact (bytesRead ), anyOf (equalTo (defaultChunkSize ), equalTo (lastChunkSize )));
380
+ assertThat (Math .toIntExact (requestBody .length ()), anyOf (equalTo (defaultChunkSize ), equalTo (lastChunkSize )));
383
381
384
382
final int rangeStart = getContentRangeStart (range );
385
383
final int rangeEnd = getContentRangeEnd (range );
386
- assertThat (rangeEnd + 1 - rangeStart , equalTo (Math .toIntExact (bytesRead )));
387
- assertArrayEquals ( Arrays . copyOfRange (data , rangeStart , rangeEnd + 1 ), requestBody . toByteArray ( ));
384
+ assertThat (rangeEnd + 1 - rangeStart , equalTo (Math .toIntExact (requestBody . length () )));
385
+ assertThat ( new BytesArray (data , rangeStart , rangeEnd - rangeStart + 1 ), is ( requestBody ));
388
386
389
387
final Integer limit = getContentRangeLimit (range );
390
388
if (limit != null ) {
@@ -399,8 +397,6 @@ public void testWriteLargeBlob() throws IOException {
399
397
}
400
398
}
401
399
402
- // read all the request body, otherwise the SDK client throws a non-retryable StorageException
403
- Streams .readFully (exchange .getRequestBody ());
404
400
if (randomBoolean ()) {
405
401
exchange .sendResponseHeaders (HttpStatus .SC_INTERNAL_SERVER_ERROR , -1 );
406
402
}
0 commit comments