From 6e4bd15239207f21f184cc9779c40ad177d918e7 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sat, 31 Aug 2019 16:51:03 +0200 Subject: [PATCH 1/5] bck --- .../blobstore/BlobStoreRepository.java | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 9d32cc61d421b..38d0ce4a857db 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1249,6 +1249,7 @@ private void snapshotFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, IndexId IndexShardSnapshotStatus snapshotStatus, Store store) throws IOException { final BlobContainer shardContainer = shardContainer(indexId, shardId); final String file = fileInfo.physicalName(); + checkAborted(fileInfo, shardId, snapshotId, snapshotStatus); store.incRef(); try (IndexInput indexInput = store.openVerifyingInput(file, IOContext.READONCE, fileInfo.metadata())) { for (int i = 0; i < fileInfo.numberOfParts(); i++) { @@ -1263,23 +1264,15 @@ private void snapshotFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, IndexId inputStream = new FilterInputStream(inputStream) { @Override public int read() throws IOException { - checkAborted(); + checkAborted(fileInfo, shardId, snapshotId, snapshotStatus); return super.read(); } @Override public int read(byte[] b, int off, int len) throws IOException { - checkAborted(); + checkAborted(fileInfo, shardId, snapshotId, snapshotStatus); return super.read(b, off, len); } - - private void checkAborted() { - if (snapshotStatus.isAborted()) { - logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, - snapshotId, fileInfo.physicalName()); - throw new IndexShardSnapshotFailedException(shardId, "Aborted"); - } - } }; shardContainer.writeBlob(fileInfo.partName(i), inputStream, partBytes, true); } @@ -1294,6 +1287,15 @@ private void checkAborted() { } } + private void checkAborted(BlobStoreIndexShardSnapshot.FileInfo fileInfo, ShardId shardId, SnapshotId snapshotId, + IndexShardSnapshotStatus snapshotStatus) { + if (snapshotStatus.isAborted()) { + logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, + snapshotId, fileInfo.physicalName()); + throw new IndexShardSnapshotFailedException(shardId, "Aborted"); + } + } + private static void failStoreIfCorrupted(Store store, Exception e) { if (Lucene.isCorruptionException(e)) { try { From f44fa904ee63f5056132e57aa17619462c1c6cd7 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sat, 31 Aug 2019 18:01:47 +0200 Subject: [PATCH 2/5] step 1 --- .../blobstore/BlobStoreRepository.java | 54 ++++++++++--------- 1 file changed, 30 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 38d0ce4a857db..64555ad7328d5 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -110,6 +110,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; @@ -898,12 +899,14 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s final ShardId shardId = store.shardId(); final long startTime = threadPool.absoluteTimeInMillis(); final StepListener snapshotDoneListener = new StepListener<>(); + snapshotDoneListener.whenComplete(v -> store.decRef(), e -> store.decRef()); snapshotDoneListener.whenComplete(listener::onResponse, e -> { snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), ExceptionsHelper.detailedMessage(e)); listener.onFailure(e instanceof IndexShardSnapshotFailedException ? (IndexShardSnapshotFailedException) e : new IndexShardSnapshotFailedException(store.shardId(), e)); }); try { + store.incRef(); logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, metadata.name()); final BlobContainer shardContainer = shardContainer(indexId, shardId); @@ -928,18 +931,14 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s store.incRef(); final Collection fileNames; final Store.MetadataSnapshot metadataFromStore; + // TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should try { - // TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should - try { - logger.trace( - "[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit); - metadataFromStore = store.getMetadata(snapshotIndexCommit); - fileNames = snapshotIndexCommit.getFileNames(); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e); - } - } finally { - store.decRef(); + logger.trace( + "[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit); + metadataFromStore = store.getMetadata(snapshotIndexCommit); + fileNames = snapshotIndexCommit.getFileNames(); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e); } int indexIncrementalFileCount = 0; int indexTotalNumberOfFiles = 0; @@ -1048,17 +1047,26 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s final GroupedActionListener filesListener = new GroupedActionListener<>(allFilesUploadedListener, indexIncrementalFileCount); final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + final AtomicBoolean alreadyFailed = new AtomicBoolean(); for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) { executor.execute(new ActionRunnable<>(filesListener) { @Override protected void doRun() { try { - snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store); + if (alreadyFailed.get() == false) { + snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store); + } filesListener.onResponse(null); } catch (IOException e) { throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e); } } + + @Override + public void onFailure(Exception e) { + alreadyFailed.set(true); + super.onFailure(e); + } }); } } catch (Exception e) { @@ -1249,7 +1257,6 @@ private void snapshotFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, IndexId IndexShardSnapshotStatus snapshotStatus, Store store) throws IOException { final BlobContainer shardContainer = shardContainer(indexId, shardId); final String file = fileInfo.physicalName(); - checkAborted(fileInfo, shardId, snapshotId, snapshotStatus); store.incRef(); try (IndexInput indexInput = store.openVerifyingInput(file, IOContext.READONCE, fileInfo.metadata())) { for (int i = 0; i < fileInfo.numberOfParts(); i++) { @@ -1264,15 +1271,23 @@ private void snapshotFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, IndexId inputStream = new FilterInputStream(inputStream) { @Override public int read() throws IOException { - checkAborted(fileInfo, shardId, snapshotId, snapshotStatus); + checkAborted(); return super.read(); } @Override public int read(byte[] b, int off, int len) throws IOException { - checkAborted(fileInfo, shardId, snapshotId, snapshotStatus); + checkAborted(); return super.read(b, off, len); } + + private void checkAborted() { + if (snapshotStatus.isAborted()) { + logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, + snapshotId, fileInfo.physicalName()); + throw new IndexShardSnapshotFailedException(shardId, "Aborted"); + } + } }; shardContainer.writeBlob(fileInfo.partName(i), inputStream, partBytes, true); } @@ -1287,15 +1302,6 @@ public int read(byte[] b, int off, int len) throws IOException { } } - private void checkAborted(BlobStoreIndexShardSnapshot.FileInfo fileInfo, ShardId shardId, SnapshotId snapshotId, - IndexShardSnapshotStatus snapshotStatus) { - if (snapshotStatus.isAborted()) { - logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, - snapshotId, fileInfo.physicalName()); - throw new IndexShardSnapshotFailedException(shardId, "Aborted"); - } - } - private static void failStoreIfCorrupted(Store store, Exception e) { if (Lucene.isCorruptionException(e)) { try { From 74b152df59947f566ca6a4410ec356c530b94d47 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sat, 31 Aug 2019 22:18:26 +0200 Subject: [PATCH 3/5] bck --- .../repositories/blobstore/BlobStoreRepository.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 64555ad7328d5..626f2ff274fcd 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -928,13 +928,11 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s final List indexCommitPointFiles = new ArrayList<>(); ArrayList filesToSnapshot = new ArrayList<>(); - store.incRef(); final Collection fileNames; final Store.MetadataSnapshot metadataFromStore; // TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should try { - logger.trace( - "[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit); + logger.trace("[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit); metadataFromStore = store.getMetadata(snapshotIndexCommit); fileNames = snapshotIndexCommit.getFileNames(); } catch (IOException e) { From bf9e6089a13a61b9826aa22f702d5218e8bb158c Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sun, 1 Sep 2019 09:52:40 +0200 Subject: [PATCH 4/5] dont mess with store refcnt --- .../blobstore/BlobStoreRepository.java | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 4244b48d806c9..69d0359d0d829 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -899,14 +899,12 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s final ShardId shardId = store.shardId(); final long startTime = threadPool.absoluteTimeInMillis(); final StepListener snapshotDoneListener = new StepListener<>(); - snapshotDoneListener.whenComplete(v -> store.decRef(), e -> store.decRef()); snapshotDoneListener.whenComplete(listener::onResponse, e -> { snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), ExceptionsHelper.detailedMessage(e)); listener.onFailure(e instanceof IndexShardSnapshotFailedException ? (IndexShardSnapshotFailedException) e : new IndexShardSnapshotFailedException(store.shardId(), e)); }); try { - store.incRef(); logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, metadata.name()); final BlobContainer shardContainer = shardContainer(indexId, shardId); @@ -928,15 +926,21 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s final List indexCommitPointFiles = new ArrayList<>(); ArrayList filesToSnapshot = new ArrayList<>(); + store.incRef(); final Collection fileNames; final Store.MetadataSnapshot metadataFromStore; - // TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should try { - logger.trace("[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit); - metadataFromStore = store.getMetadata(snapshotIndexCommit); - fileNames = snapshotIndexCommit.getFileNames(); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e); + // TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should + try { + logger.trace( + "[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit); + metadataFromStore = store.getMetadata(snapshotIndexCommit); + fileNames = snapshotIndexCommit.getFileNames(); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e); + } + } finally { + store.decRef(); } int indexIncrementalFileCount = 0; int indexTotalNumberOfFiles = 0; From c09a8f6d8e3051704c774fed7b3eb8d3526c09b5 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 2 Sep 2019 09:26:22 +0200 Subject: [PATCH 5/5] CR: add comment --- .../repositories/blobstore/BlobStoreRepository.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 69d0359d0d829..47bbb773b0afc 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1049,6 +1049,7 @@ public void snapshotShard(Store store, MapperService mapperService, SnapshotId s final GroupedActionListener filesListener = new GroupedActionListener<>(allFilesUploadedListener, indexIncrementalFileCount); final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); + // Flag to signal that the snapshot has been aborted/failed so we can stop any further blob uploads from starting final AtomicBoolean alreadyFailed = new AtomicBoolean(); for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) { executor.execute(new ActionRunnable<>(filesListener) {