82
82
import java .util .concurrent .atomic .AtomicLong ;
83
83
import java .util .function .BiConsumer ;
84
84
import java .util .function .BiPredicate ;
85
+ import java .util .function .Supplier ;
85
86
86
87
public class AzureBlobStore implements BlobStore {
87
88
private static final Logger logger = LogManager .getLogger (AzureBlobStore .class );
@@ -228,40 +229,41 @@ public DeleteResult deleteBlobDirectory(String path) throws IOException {
228
229
final AtomicInteger blobsDeleted = new AtomicInteger (0 );
229
230
final AtomicLong bytesDeleted = new AtomicLong (0 );
230
231
231
- try {
232
- final BlobServiceClient client = client ();
233
- SocketAccess .doPrivilegedVoidException (() -> {
234
- final BlobContainerClient blobContainerClient = client .getBlobContainerClient (container );
235
- final BlobContainerAsyncClient blobContainerAsyncClient = asyncClient ().getBlobContainerAsyncClient (container );
236
- final Queue <String > directories = new ArrayDeque <>();
237
- directories .offer (path );
238
- String directoryName ;
239
- List <Mono <Void >> deleteTasks = new ArrayList <>();
240
- while ((directoryName = directories .poll ()) != null ) {
241
- final BlobListDetails blobListDetails = new BlobListDetails ()
242
- .setRetrieveMetadata (true );
243
-
244
- final ListBlobsOptions options = new ListBlobsOptions ()
245
- .setPrefix (directoryName )
246
- .setDetails (blobListDetails );
247
-
248
- for (BlobItem blobItem : blobContainerClient .listBlobsByHierarchy ("/" , options , null )) {
249
- if (blobItem .isPrefix () != null && blobItem .isPrefix ()) {
250
- directories .offer (blobItem .getName ());
251
- } else {
252
- BlobAsyncClient blobAsyncClient = blobContainerAsyncClient .getBlobAsyncClient (blobItem .getName ());
253
- deleteTasks .add (blobAsyncClient .delete ());
254
- bytesDeleted .addAndGet (blobItem .getProperties ().getContentLength ());
255
- blobsDeleted .incrementAndGet ();
256
- }
232
+ final BlobServiceClient client = client ();
233
+ SocketAccess .doPrivilegedVoidException (() -> {
234
+ final BlobContainerClient blobContainerClient = client .getBlobContainerClient (container );
235
+ final BlobContainerAsyncClient blobContainerAsyncClient = asyncClient ().getBlobContainerAsyncClient (container );
236
+ final Queue <String > directories = new ArrayDeque <>();
237
+ directories .offer (path );
238
+ String directoryName ;
239
+ List <Mono <Void >> deleteTasks = new ArrayList <>();
240
+ while ((directoryName = directories .poll ()) != null ) {
241
+ final BlobListDetails blobListDetails = new BlobListDetails ()
242
+ .setRetrieveMetadata (true );
243
+
244
+ final ListBlobsOptions options = new ListBlobsOptions ()
245
+ .setPrefix (directoryName )
246
+ .setDetails (blobListDetails );
247
+
248
+ for (BlobItem blobItem : blobContainerClient .listBlobsByHierarchy ("/" , options , null )) {
249
+ if (blobItem .isPrefix () != null && blobItem .isPrefix ()) {
250
+ directories .offer (blobItem .getName ());
251
+ } else {
252
+ BlobAsyncClient blobAsyncClient = blobContainerAsyncClient .getBlobAsyncClient (blobItem .getName ());
253
+ final Mono <Void > deleteTask = blobAsyncClient .delete ()
254
+ // Ignore not found blobs, as it's possible that due to network errors a request
255
+ // for an already deleted blob is retried, causing an error.
256
+ .onErrorResume (this ::isNotFoundError , throwable -> Mono .empty ())
257
+ .onErrorMap (throwable -> new IOException ("Error deleting blob " + blobItem .getName (), throwable ));
258
+ deleteTasks .add (deleteTask );
259
+ bytesDeleted .addAndGet (blobItem .getProperties ().getContentLength ());
260
+ blobsDeleted .incrementAndGet ();
257
261
}
258
262
}
263
+ }
259
264
260
- executeDeleteTasks (deleteTasks );
261
- });
262
- } catch (Exception e ) {
263
- throw new IOException ("Deleting directory [" + path + "] failed" , e );
264
- }
265
+ executeDeleteTasks (deleteTasks , () -> "Deleting directory [" + path + "] failed" );
266
+ });
265
267
266
268
return new DeleteResult (blobsDeleted .get (), bytesDeleted .get ());
267
269
}
@@ -271,31 +273,43 @@ void deleteBlobList(List<String> blobs) throws IOException {
271
273
return ;
272
274
}
273
275
274
- try {
275
- BlobServiceAsyncClient asyncClient = asyncClient ();
276
- SocketAccess . doPrivilegedVoidException (() -> {
277
- List < Mono < Void >> deleteTasks = new ArrayList <>( blobs . size () );
278
- final BlobContainerAsyncClient blobContainerClient = asyncClient . getBlobContainerAsyncClient ( container );
279
- for ( String blob : blobs ) {
280
- final Mono < Void > deleteTask = blobContainerClient . getBlobAsyncClient ( blob )
281
- . delete ()
282
- // Ignore not found blobs
283
- . onErrorResume ( e -> ( e instanceof BlobStorageException ) && (( BlobStorageException ) e ). getStatusCode () == 404 ,
284
- throwable -> Mono . empty ());
285
- deleteTasks .add (deleteTask );
286
- }
276
+ BlobServiceAsyncClient asyncClient = asyncClient ();
277
+ SocketAccess . doPrivilegedVoidException (() -> {
278
+ List < Mono < Void >> deleteTasks = new ArrayList <>( blobs . size ());
279
+ final BlobContainerAsyncClient blobContainerClient = asyncClient . getBlobContainerAsyncClient ( container );
280
+ for ( String blob : blobs ) {
281
+ final Mono < Void > deleteTask = blobContainerClient . getBlobAsyncClient ( blob )
282
+ . delete ( )
283
+ // Ignore not found blobs
284
+ . onErrorResume ( this :: isNotFoundError , throwable -> Mono . empty ())
285
+ . onErrorMap ( throwable -> new IOException ( "Error deleting blob " + blob , throwable ));
286
+
287
+ deleteTasks .add (deleteTask );
288
+ }
287
289
288
- executeDeleteTasks (deleteTasks );
289
- });
290
- } catch (Exception e ) {
291
- throw new IOException ("Unable to delete blobs " + blobs , e );
292
- }
290
+ executeDeleteTasks (deleteTasks , () -> "Unable to delete blobs " + blobs );
291
+ });
293
292
}
294
293
295
- private void executeDeleteTasks (List <Mono <Void >> deleteTasks ) {
296
- // zipDelayError executes all tasks in parallel and delays
297
- // error propagation until all tasks have finished.
298
- Mono .zipDelayError (deleteTasks , results -> null ).block ();
294
+ private boolean isNotFoundError (Throwable e ) {
295
+ return e instanceof BlobStorageException && ((BlobStorageException ) e ).getStatusCode () == 404 ;
296
+ }
297
+
298
+ private void executeDeleteTasks (List <Mono <Void >> deleteTasks , Supplier <String > errorMessageSupplier ) throws IOException {
299
+ try {
300
+ // zipDelayError executes all tasks in parallel and delays
301
+ // error propagation until all tasks have finished.
302
+ Mono .zipDelayError (deleteTasks , results -> null ).block ();
303
+ } catch (Exception e ) {
304
+ final IOException exception = new IOException (errorMessageSupplier .get ());
305
+ for (Throwable suppressed : e .getSuppressed ()) {
306
+ // We're only interested about the blob deletion exceptions and not in the reactor internals exceptions
307
+ if (suppressed instanceof IOException ) {
308
+ exception .addSuppressed (suppressed );
309
+ }
310
+ }
311
+ throw exception ;
312
+ }
299
313
}
300
314
301
315
public InputStream getInputStream (String blob , long position , final @ Nullable Long length ) throws IOException {
0 commit comments