51
51
import java .net .InetSocketAddress ;
52
52
import java .net .SocketTimeoutException ;
53
53
import java .nio .charset .StandardCharsets ;
54
+ import java .util .Arrays ;
54
55
import java .util .Locale ;
55
56
import java .util .Objects ;
56
57
import java .util .concurrent .atomic .AtomicBoolean ;
@@ -192,21 +193,31 @@ public void testWriteBlobWithRetries() throws Exception {
192
193
final int maxRetries = randomInt (5 );
193
194
final CountDown countDown = new CountDown (maxRetries + 1 );
194
195
195
- final byte [] bytes = randomByteArrayOfLength (randomIntBetween (1 , 512 ));
196
+ final byte [] bytes = randomByteArrayOfLength (randomIntBetween (1 , frequently () ? 512 : 1 << 20 )); // rarely up to 1mb
196
197
httpServer .createContext ("/bucket/write_blob_max_retries" , exchange -> {
197
- final BytesReference body = Streams .readFully (exchange .getRequestBody ());
198
- if (countDown .countDown ()) {
199
- if (Objects .deepEquals (bytes , BytesReference .toBytes (body ))) {
200
- exchange .sendResponseHeaders (HttpStatus .SC_OK , -1 );
201
- } else {
202
- exchange .sendResponseHeaders (HttpStatus .SC_BAD_REQUEST , -1 );
198
+ if ("PUT" .equals (exchange .getRequestMethod ()) && exchange .getRequestURI ().getQuery () == null ) {
199
+ if (countDown .countDown ()) {
200
+ final BytesReference body = Streams .readFully (exchange .getRequestBody ());
201
+ if (Objects .deepEquals (bytes , BytesReference .toBytes (body ))) {
202
+ exchange .sendResponseHeaders (HttpStatus .SC_OK , -1 );
203
+ } else {
204
+ exchange .sendResponseHeaders (HttpStatus .SC_BAD_REQUEST , -1 );
205
+ }
206
+ exchange .close ();
207
+ return ;
208
+ }
209
+
210
+ if (randomBoolean ()) {
211
+ if (randomBoolean ()) {
212
+ Streams .readFully (exchange .getRequestBody (), new byte [randomIntBetween (1 , bytes .length - 1 )]);
213
+ } else {
214
+ Streams .readFully (exchange .getRequestBody ());
215
+ exchange .sendResponseHeaders (randomFrom (HttpStatus .SC_INTERNAL_SERVER_ERROR , HttpStatus .SC_BAD_GATEWAY ,
216
+ HttpStatus .SC_SERVICE_UNAVAILABLE , HttpStatus .SC_GATEWAY_TIMEOUT ), -1 );
217
+ }
203
218
}
204
219
exchange .close ();
205
- return ;
206
220
}
207
- exchange .sendResponseHeaders (randomFrom (HttpStatus .SC_INTERNAL_SERVER_ERROR , HttpStatus .SC_BAD_GATEWAY ,
208
- HttpStatus .SC_SERVICE_UNAVAILABLE , HttpStatus .SC_GATEWAY_TIMEOUT ), -1 );
209
- exchange .close ();
210
221
});
211
222
212
223
final BlobContainer blobContainer = createBlobContainer (maxRetries , null , true , null );
@@ -217,17 +228,21 @@ public void testWriteBlobWithRetries() throws Exception {
217
228
}
218
229
219
230
public void testWriteBlobWithReadTimeouts () {
231
+ final byte [] bytes = randomByteArrayOfLength (randomIntBetween (10 , 128 ));
220
232
final TimeValue readTimeout = TimeValue .timeValueMillis (randomIntBetween (100 , 500 ));
221
233
final BlobContainer blobContainer = createBlobContainer (1 , readTimeout , true , null );
222
234
223
235
// HTTP server does not send a response
224
236
httpServer .createContext ("/bucket/write_blob_timeout" , exchange -> {
225
237
if (randomBoolean ()) {
226
- Streams .readFully (exchange .getRequestBody ());
238
+ if (randomBoolean ()) {
239
+ Streams .readFully (exchange .getRequestBody (), new byte [randomIntBetween (1 , bytes .length - 1 )]);
240
+ } else {
241
+ Streams .readFully (exchange .getRequestBody ());
242
+ }
227
243
}
228
244
});
229
245
230
- final byte [] bytes = randomByteArrayOfLength (randomIntBetween (1 , 128 ));
231
246
Exception exception = expectThrows (IOException .class , () -> {
232
247
try (InputStream stream = new InputStreamIndexInput (new ByteArrayIndexInput ("desc" , bytes ), bytes .length )) {
233
248
blobContainer .writeBlob ("write_blob_timeout" , stream , bytes .length , false );
@@ -249,16 +264,18 @@ public void testWriteLargeBlob() throws Exception {
249
264
final ByteSizeValue bufferSize = new ByteSizeValue (5 , ByteSizeUnit .MB );
250
265
final BlobContainer blobContainer = createBlobContainer (null , readTimeout , true , bufferSize );
251
266
252
- final int parts = randomIntBetween (1 , 2 );
267
+ final int parts = randomIntBetween (1 , 5 );
253
268
final long lastPartSize = randomLongBetween (10 , 512 );
254
269
final long blobSize = (parts * bufferSize .getBytes ()) + lastPartSize ;
255
270
256
- final int maxRetries = 2 ; // we want all requests to fail at least once
257
- final CountDown countDownInitiate = new CountDown (maxRetries );
258
- final AtomicInteger countDownUploads = new AtomicInteger (maxRetries * (parts + 1 ));
259
- final CountDown countDownComplete = new CountDown (maxRetries );
271
+ final int nbErrors = 2 ; // we want all requests to fail at least once
272
+ final CountDown countDownInitiate = new CountDown (nbErrors );
273
+ final AtomicInteger countDownUploads = new AtomicInteger (nbErrors * (parts + 1 ));
274
+ final CountDown countDownComplete = new CountDown (nbErrors );
260
275
261
276
httpServer .createContext ("/bucket/write_large_blob" , exchange -> {
277
+ final long contentLength = Long .parseLong (exchange .getRequestHeaders ().getFirst ("Content-Length" ));
278
+
262
279
if ("POST" .equals (exchange .getRequestMethod ())
263
280
&& exchange .getRequestURI ().getQuery ().equals ("uploads" )) {
264
281
// initiate multipart upload request
@@ -275,11 +292,14 @@ public void testWriteLargeBlob() throws Exception {
275
292
exchange .close ();
276
293
return ;
277
294
}
278
- } else if ("PUT" .equals (exchange .getRequestMethod ())) {
295
+ } else if ("PUT" .equals (exchange .getRequestMethod ())
296
+ && exchange .getRequestURI ().getQuery ().contains ("uploadId=TEST" )
297
+ && exchange .getRequestURI ().getQuery ().contains ("partNumber=" )) {
279
298
// upload part request
280
299
MD5DigestCalculatingInputStream md5 = new MD5DigestCalculatingInputStream (exchange .getRequestBody ());
281
300
BytesReference bytes = Streams .readFully (md5 );
282
301
assertThat ((long ) bytes .length (), anyOf (equalTo (lastPartSize ), equalTo (bufferSize .getBytes ())));
302
+ assertThat (contentLength , anyOf (equalTo (lastPartSize ), equalTo (bufferSize .getBytes ())));
283
303
284
304
if (countDownUploads .decrementAndGet () % 2 == 0 ) {
285
305
exchange .getResponseHeaders ().add ("ETag" , Base16 .encodeAsString (md5 .getMd5Digest ()));
@@ -289,10 +309,10 @@ public void testWriteLargeBlob() throws Exception {
289
309
}
290
310
291
311
} else if ("POST" .equals (exchange .getRequestMethod ())
292
- && exchange .getRequestURI ().getQuery ().equals ("uploadId=TEST" )) {
312
+ && exchange .getRequestURI ().getQuery ().equals ("uploadId=TEST" )) {
293
313
// complete multipart upload request
294
- Streams .readFully (exchange .getRequestBody ());
295
314
if (countDownComplete .countDown ()) {
315
+ Streams .readFully (exchange .getRequestBody ());
296
316
byte [] response = ("<?xml version=\" 1.0\" encoding=\" UTF-8\" ?>\n " +
297
317
"<CompleteMultipartUploadResult>\n " +
298
318
" <Bucket>bucket</Bucket>\n " +
@@ -308,8 +328,13 @@ public void testWriteLargeBlob() throws Exception {
308
328
309
329
// sends an error back or let the request time out
310
330
if (useTimeout == false ) {
311
- exchange .sendResponseHeaders (randomFrom (HttpStatus .SC_INTERNAL_SERVER_ERROR , HttpStatus .SC_BAD_GATEWAY ,
312
- HttpStatus .SC_SERVICE_UNAVAILABLE , HttpStatus .SC_GATEWAY_TIMEOUT ), -1 );
331
+ if (randomBoolean () && contentLength > 0 ) {
332
+ Streams .readFully (exchange .getRequestBody (), new byte [randomIntBetween (1 , Math .toIntExact (contentLength - 1 ))]);
333
+ } else {
334
+ Streams .readFully (exchange .getRequestBody ());
335
+ exchange .sendResponseHeaders (randomFrom (HttpStatus .SC_INTERNAL_SERVER_ERROR , HttpStatus .SC_BAD_GATEWAY ,
336
+ HttpStatus .SC_SERVICE_UNAVAILABLE , HttpStatus .SC_GATEWAY_TIMEOUT ), -1 );
337
+ }
313
338
exchange .close ();
314
339
}
315
340
});
@@ -323,9 +348,6 @@ public void testWriteLargeBlob() throws Exception {
323
348
324
349
/**
325
350
* A resettable InputStream that only serves zeros.
326
- *
327
- * Ideally it should be wrapped into a BufferedInputStream but it seems that the AWS SDK is calling InputStream{@link #reset()}
328
- * before calling InputStream{@link #mark(int)}, which is not permitted by the {@link #reset()} method contract.
329
351
**/
330
352
private static class ZeroInputStream extends InputStream {
331
353
@@ -336,17 +358,32 @@ private static class ZeroInputStream extends InputStream {
336
358
337
359
private ZeroInputStream (final long length ) {
338
360
this .length = length ;
339
- this .reads = new AtomicLong (length );
361
+ this .reads = new AtomicLong (0 );
340
362
this .mark = -1 ;
341
363
}
342
364
343
365
@ Override
344
366
public int read () throws IOException {
345
367
ensureOpen ();
346
- if (reads .decrementAndGet () < 0 ) {
368
+ return (reads .incrementAndGet () <= length ) ? 0 : -1 ;
369
+ }
370
+
371
+ @ Override
372
+ public int read (byte [] b , int off , int len ) throws IOException {
373
+ ensureOpen ();
374
+ if (len == 0 ) {
375
+ return 0 ;
376
+ }
377
+
378
+ final int available = available ();
379
+ if (available == 0 ) {
347
380
return -1 ;
348
381
}
349
- return 0 ;
382
+
383
+ final int toCopy = Math .min (len , available );
384
+ Arrays .fill (b , off , off + toCopy , (byte ) 0 );
385
+ reads .addAndGet (toCopy );
386
+ return toCopy ;
350
387
}
351
388
352
389
@ Override
@@ -368,7 +405,14 @@ public synchronized void reset() throws IOException {
368
405
@ Override
369
406
public int available () throws IOException {
370
407
ensureOpen ();
371
- return Math .toIntExact (length - reads .get ());
408
+ if (reads .get () >= length ) {
409
+ return 0 ;
410
+ }
411
+ try {
412
+ return Math .toIntExact (length - reads .get ());
413
+ } catch (ArithmeticException e ) {
414
+ return Integer .MAX_VALUE ;
415
+ }
372
416
}
373
417
374
418
@ Override
0 commit comments