81
81
import java .util .concurrent .atomic .AtomicLong ;
82
82
import java .util .function .BiConsumer ;
83
83
import java .util .function .BiPredicate ;
84
+ import java .util .function .Supplier ;
84
85
85
86
public class AzureBlobStore implements BlobStore {
86
87
private static final Logger logger = LogManager .getLogger (AzureBlobStore .class );
@@ -227,40 +228,41 @@ public DeleteResult deleteBlobDirectory(String path) throws IOException {
227
228
final AtomicInteger blobsDeleted = new AtomicInteger (0 );
228
229
final AtomicLong bytesDeleted = new AtomicLong (0 );
229
230
230
- try {
231
- final BlobServiceClient client = client ();
232
- SocketAccess .doPrivilegedVoidException (() -> {
233
- final BlobContainerClient blobContainerClient = client .getBlobContainerClient (container );
234
- final BlobContainerAsyncClient blobContainerAsyncClient = asyncClient ().getBlobContainerAsyncClient (container );
235
- final Queue <String > directories = new ArrayDeque <>();
236
- directories .offer (path );
237
- String directoryName ;
238
- List <Mono <Void >> deleteTasks = new ArrayList <>();
239
- while ((directoryName = directories .poll ()) != null ) {
240
- final BlobListDetails blobListDetails = new BlobListDetails ()
241
- .setRetrieveMetadata (true );
242
-
243
- final ListBlobsOptions options = new ListBlobsOptions ()
244
- .setPrefix (directoryName )
245
- .setDetails (blobListDetails );
246
-
247
- for (BlobItem blobItem : blobContainerClient .listBlobsByHierarchy ("/" , options , null )) {
248
- if (blobItem .isPrefix () != null && blobItem .isPrefix ()) {
249
- directories .offer (blobItem .getName ());
250
- } else {
251
- BlobAsyncClient blobAsyncClient = blobContainerAsyncClient .getBlobAsyncClient (blobItem .getName ());
252
- deleteTasks .add (blobAsyncClient .delete ());
253
- bytesDeleted .addAndGet (blobItem .getProperties ().getContentLength ());
254
- blobsDeleted .incrementAndGet ();
255
- }
231
+ final BlobServiceClient client = client ();
232
+ SocketAccess .doPrivilegedVoidException (() -> {
233
+ final BlobContainerClient blobContainerClient = client .getBlobContainerClient (container );
234
+ final BlobContainerAsyncClient blobContainerAsyncClient = asyncClient ().getBlobContainerAsyncClient (container );
235
+ final Queue <String > directories = new ArrayDeque <>();
236
+ directories .offer (path );
237
+ String directoryName ;
238
+ List <Mono <Void >> deleteTasks = new ArrayList <>();
239
+ while ((directoryName = directories .poll ()) != null ) {
240
+ final BlobListDetails blobListDetails = new BlobListDetails ()
241
+ .setRetrieveMetadata (true );
242
+
243
+ final ListBlobsOptions options = new ListBlobsOptions ()
244
+ .setPrefix (directoryName )
245
+ .setDetails (blobListDetails );
246
+
247
+ for (BlobItem blobItem : blobContainerClient .listBlobsByHierarchy ("/" , options , null )) {
248
+ if (blobItem .isPrefix () != null && blobItem .isPrefix ()) {
249
+ directories .offer (blobItem .getName ());
250
+ } else {
251
+ BlobAsyncClient blobAsyncClient = blobContainerAsyncClient .getBlobAsyncClient (blobItem .getName ());
252
+ final Mono <Void > deleteTask = blobAsyncClient .delete ()
253
+ // Ignore not found blobs, as it's possible that due to network errors a request
254
+ // for an already deleted blob is retried, causing an error.
255
+ .onErrorResume (this ::isNotFoundError , throwable -> Mono .empty ())
256
+ .onErrorMap (throwable -> new IOException ("Error deleting blob " + blobItem .getName (), throwable ));
257
+ deleteTasks .add (deleteTask );
258
+ bytesDeleted .addAndGet (blobItem .getProperties ().getContentLength ());
259
+ blobsDeleted .incrementAndGet ();
256
260
}
257
261
}
262
+ }
258
263
259
- executeDeleteTasks (deleteTasks );
260
- });
261
- } catch (Exception e ) {
262
- throw new IOException ("Deleting directory [" + path + "] failed" , e );
263
- }
264
+ executeDeleteTasks (deleteTasks , () -> "Deleting directory [" + path + "] failed" );
265
+ });
264
266
265
267
return new DeleteResult (blobsDeleted .get (), bytesDeleted .get ());
266
268
}
@@ -270,31 +272,43 @@ void deleteBlobList(List<String> blobs) throws IOException {
270
272
return ;
271
273
}
272
274
273
- try {
274
- BlobServiceAsyncClient asyncClient = asyncClient ();
275
- SocketAccess . doPrivilegedVoidException (() -> {
276
- List < Mono < Void >> deleteTasks = new ArrayList <>( blobs . size () );
277
- final BlobContainerAsyncClient blobContainerClient = asyncClient . getBlobContainerAsyncClient ( container );
278
- for ( String blob : blobs ) {
279
- final Mono < Void > deleteTask = blobContainerClient . getBlobAsyncClient ( blob )
280
- . delete ()
281
- // Ignore not found blobs
282
- . onErrorResume ( e -> ( e instanceof BlobStorageException ) && (( BlobStorageException ) e ). getStatusCode () == 404 ,
283
- throwable -> Mono . empty ());
284
- deleteTasks .add (deleteTask );
285
- }
275
+ BlobServiceAsyncClient asyncClient = asyncClient ();
276
+ SocketAccess . doPrivilegedVoidException (() -> {
277
+ List < Mono < Void >> deleteTasks = new ArrayList <>( blobs . size ());
278
+ final BlobContainerAsyncClient blobContainerClient = asyncClient . getBlobContainerAsyncClient ( container );
279
+ for ( String blob : blobs ) {
280
+ final Mono < Void > deleteTask = blobContainerClient . getBlobAsyncClient ( blob )
281
+ . delete ( )
282
+ // Ignore not found blobs
283
+ . onErrorResume ( this :: isNotFoundError , throwable -> Mono . empty ())
284
+ . onErrorMap ( throwable -> new IOException ( "Error deleting blob " + blob , throwable ));
285
+
286
+ deleteTasks .add (deleteTask );
287
+ }
286
288
287
- executeDeleteTasks (deleteTasks );
288
- });
289
- } catch (Exception e ) {
290
- throw new IOException ("Unable to delete blobs " + blobs , e );
291
- }
289
+ executeDeleteTasks (deleteTasks , () -> "Unable to delete blobs " + blobs );
290
+ });
292
291
}
293
292
294
- private void executeDeleteTasks (List <Mono <Void >> deleteTasks ) {
295
- // zipDelayError executes all tasks in parallel and delays
296
- // error propagation until all tasks have finished.
297
- Mono .zipDelayError (deleteTasks , results -> null ).block ();
293
+ private boolean isNotFoundError (Throwable e ) {
294
+ return e instanceof BlobStorageException && ((BlobStorageException ) e ).getStatusCode () == 404 ;
295
+ }
296
+
297
+ private void executeDeleteTasks (List <Mono <Void >> deleteTasks , Supplier <String > errorMessageSupplier ) throws IOException {
298
+ try {
299
+ // zipDelayError executes all tasks in parallel and delays
300
+ // error propagation until all tasks have finished.
301
+ Mono .zipDelayError (deleteTasks , results -> null ).block ();
302
+ } catch (Exception e ) {
303
+ final IOException exception = new IOException (errorMessageSupplier .get ());
304
+ for (Throwable suppressed : e .getSuppressed ()) {
305
+ // We're only interested about the blob deletion exceptions and not in the reactor internals exceptions
306
+ if (suppressed instanceof IOException ) {
307
+ exception .addSuppressed (suppressed );
308
+ }
309
+ }
310
+ throw exception ;
311
+ }
298
312
}
299
313
300
314
public InputStream getInputStream (String blob , long position , final @ Nullable Long length ) throws IOException {
0 commit comments