diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index b81e7f1ea33a3..0274ed2dc7416 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -27,7 +27,6 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.RateLimiter; import org.apache.lucene.util.SetOnce; -import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; @@ -995,12 +994,6 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { final ShardId shardId = store.shardId(); final long startTime = threadPool.absoluteTimeInMillis(); - final StepListener snapshotDoneListener = new StepListener<>(); - snapshotDoneListener.whenComplete(listener::onResponse, e -> { - snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), ExceptionsHelper.detailedMessage(e)); - listener.onFailure(e instanceof IndexShardSnapshotFailedException ? (IndexShardSnapshotFailedException) e - : new IndexShardSnapshotFailedException(store.shardId(), e)); - }); try { logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, metadata.name()); @@ -1137,8 +1130,8 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s snapshotId, shardId), e); } snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis()); - snapshotDoneListener.onResponse(null); - }, snapshotDoneListener::onFailure); + listener.onResponse(null); + }, listener::onFailure); if (indexIncrementalFileCount == 0) { allFilesUploadedListener.onResponse(Collections.emptyList()); return; @@ -1181,7 +1174,7 @@ public void onFailure(Exception e) { }); } } catch (Exception e) { - snapshotDoneListener.onFailure(e); + listener.onFailure(e); } } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 083382b74b03f..8f59657d1f88b 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -292,8 +292,10 @@ public void onResponse(final Void aVoid) { @Override public void onFailure(Exception e) { + final String failure = ExceptionsHelper.detailedMessage(e); + snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure); logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); - notifyFailedSnapshotShard(snapshot, shardId, ExceptionsHelper.detailedMessage(e)); + notifyFailedSnapshotShard(snapshot, shardId, failure); } }); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 52c71ef9dabc7..9634092cde22d 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -1224,6 +1224,12 @@ public void testDataNodeRestartWithBusyMasterDuringSnapshot() throws Exception { disruption.startDisrupting(); logger.info("--> restarting data node, which should cause primary shards to be failed"); internalCluster().restartNode(dataNode, InternalTestCluster.EMPTY_CALLBACK); + + logger.info("--> wait for shard snapshots to show as failed"); + assertBusy(() -> assertThat( + client().admin().cluster().prepareSnapshotStatus("test-repo").setSnapshots("test-snap").get().getSnapshots() + .get(0).getShardsStats().getFailedShards(), greaterThanOrEqualTo(1)), 60L, TimeUnit.SECONDS); + unblockNode("test-repo", dataNode); disruption.stopDisrupting(); // check that snapshot completes