Skip to content

Commit 8a06a9d

Browse files
Make Repository.getRepositoryData an Async API (elastic#49299)
This API call in most implementations is fairly IO heavy and slow so it is more natural to be async in the first place. Concretely though, this change is a prerequisite of elastic#49060 since determining the repository generation from the cluster state introduces situations where this call would have to wait for other operations to finish. Doing so in a blocking manner would break `SnapshotResiliencyTests` and waste a thread. Also, this sets up the possibility to in the future make use of async IO where provided by the underlying Repository implementation. In a follow-up `SnapshotsService#getRepositoryData` will be made async as well (did not do it here, since it's another huge change to do so). Note: This change for now does not alter the threading behaviour in any way (since `Repository#getRepositoryData` isn't forking) and is purely mechanical.
1 parent bc29c98 commit 8a06a9d

File tree

18 files changed

+739
-701
lines changed

18 files changed

+739
-701
lines changed

server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java

+90-82
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.Version;
2525
import org.elasticsearch.action.ActionListener;
2626
import org.elasticsearch.action.ActionRunnable;
27+
import org.elasticsearch.action.StepListener;
2728
import org.elasticsearch.action.support.ActionFilters;
2829
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
2930
import org.elasticsearch.cluster.ClusterState;
@@ -41,6 +42,7 @@
4142
import org.elasticsearch.repositories.RepositoriesService;
4243
import org.elasticsearch.repositories.Repository;
4344
import org.elasticsearch.repositories.RepositoryCleanupResult;
45+
import org.elasticsearch.repositories.RepositoryData;
4446
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
4547
import org.elasticsearch.snapshots.SnapshotsService;
4648
import org.elasticsearch.threadpool.ThreadPool;
@@ -167,97 +169,103 @@ private void cleanupRepo(String repositoryName, ActionListener<RepositoryCleanup
167169
return;
168170
}
169171
final BlobStoreRepository blobStoreRepository = (BlobStoreRepository) repository;
170-
final long repositoryStateId = repository.getRepositoryData().getGenId();
171-
logger.info("Running cleanup operations on repository [{}][{}]", repositoryName, repositoryStateId);
172-
clusterService.submitStateUpdateTask("cleanup repository [" + repositoryName + "][" + repositoryStateId + ']',
173-
new ClusterStateUpdateTask() {
172+
final StepListener<RepositoryData> repositoryDataListener = new StepListener<>();
173+
repository.getRepositoryData(repositoryDataListener);
174+
repositoryDataListener.whenComplete(repositoryData -> {
175+
final long repositoryStateId = repositoryData.getGenId();
176+
logger.info("Running cleanup operations on repository [{}][{}]", repositoryName, repositoryStateId);
177+
clusterService.submitStateUpdateTask("cleanup repository [" + repositoryName + "][" + repositoryStateId + ']',
178+
new ClusterStateUpdateTask() {
174179

175-
private boolean startedCleanup = false;
180+
private boolean startedCleanup = false;
176181

177-
@Override
178-
public ClusterState execute(ClusterState currentState) {
179-
final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
180-
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
181-
throw new IllegalStateException(
182-
"Cannot cleanup [" + repositoryName + "] - a repository cleanup is already in-progress in ["
183-
+ repositoryCleanupInProgress + "]");
184-
}
185-
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
186-
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
187-
throw new IllegalStateException("Cannot cleanup [" + repositoryName
188-
+ "] - a snapshot is currently being deleted in [" + deletionsInProgress + "]");
189-
}
190-
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
191-
if (snapshots != null && !snapshots.entries().isEmpty()) {
192-
throw new IllegalStateException(
193-
"Cannot cleanup [" + repositoryName + "] - a snapshot is currently running in [" + snapshots + "]");
182+
@Override
183+
public ClusterState execute(ClusterState currentState) {
184+
final RepositoryCleanupInProgress repositoryCleanupInProgress =
185+
currentState.custom(RepositoryCleanupInProgress.TYPE);
186+
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
187+
throw new IllegalStateException(
188+
"Cannot cleanup [" + repositoryName + "] - a repository cleanup is already in-progress in ["
189+
+ repositoryCleanupInProgress + "]");
190+
}
191+
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
192+
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
193+
throw new IllegalStateException("Cannot cleanup [" + repositoryName
194+
+ "] - a snapshot is currently being deleted in [" + deletionsInProgress + "]");
195+
}
196+
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
197+
if (snapshots != null && !snapshots.entries().isEmpty()) {
198+
throw new IllegalStateException(
199+
"Cannot cleanup [" + repositoryName + "] - a snapshot is currently running in [" + snapshots + "]");
200+
}
201+
return ClusterState.builder(currentState).putCustom(RepositoryCleanupInProgress.TYPE,
202+
new RepositoryCleanupInProgress(
203+
RepositoryCleanupInProgress.startedEntry(repositoryName, repositoryStateId))).build();
194204
}
195-
return ClusterState.builder(currentState).putCustom(RepositoryCleanupInProgress.TYPE,
196-
new RepositoryCleanupInProgress(
197-
RepositoryCleanupInProgress.startedEntry(repositoryName, repositoryStateId))).build();
198-
}
199-
200-
@Override
201-
public void onFailure(String source, Exception e) {
202-
after(e, null);
203-
}
204205

205-
@Override
206-
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
207-
startedCleanup = true;
208-
logger.debug("Initialized repository cleanup in cluster state for [{}][{}]", repositoryName, repositoryStateId);
209-
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener,
210-
l -> blobStoreRepository.cleanup(
211-
repositoryStateId,
212-
newState.nodes().getMinNodeVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION),
213-
ActionListener.wrap(result -> after(null, result), e -> after(e, null)))));
214-
}
215-
216-
private void after(@Nullable Exception failure, @Nullable RepositoryCleanupResult result) {
217-
if (failure == null) {
218-
logger.debug("Finished repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId);
219-
} else {
220-
logger.debug(() -> new ParameterizedMessage(
221-
"Failed to finish repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId), failure);
206+
@Override
207+
public void onFailure(String source, Exception e) {
208+
after(e, null);
222209
}
223-
assert failure != null || result != null;
224-
if (startedCleanup == false) {
225-
logger.debug("No cleanup task to remove from cluster state because we failed to start one", failure);
226-
listener.onFailure(failure);
227-
return;
210+
211+
@Override
212+
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
213+
startedCleanup = true;
214+
logger.debug("Initialized repository cleanup in cluster state for [{}][{}]", repositoryName, repositoryStateId);
215+
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener,
216+
l -> blobStoreRepository.cleanup(
217+
repositoryStateId,
218+
newState.nodes().getMinNodeVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION),
219+
ActionListener.wrap(result -> after(null, result), e -> after(e, null)))));
228220
}
229-
clusterService.submitStateUpdateTask(
230-
"remove repository cleanup task [" + repositoryName + "][" + repositoryStateId + ']',
231-
new ClusterStateUpdateTask() {
232-
@Override
233-
public ClusterState execute(ClusterState currentState) {
234-
return removeInProgressCleanup(currentState);
235-
}
236221

237-
@Override
238-
public void onFailure(String source, Exception e) {
239-
if (failure != null) {
240-
e.addSuppressed(failure);
222+
private void after(@Nullable Exception failure, @Nullable RepositoryCleanupResult result) {
223+
if (failure == null) {
224+
logger.debug("Finished repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId);
225+
} else {
226+
logger.debug(() -> new ParameterizedMessage(
227+
"Failed to finish repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId), failure);
228+
}
229+
assert failure != null || result != null;
230+
if (startedCleanup == false) {
231+
logger.debug("No cleanup task to remove from cluster state because we failed to start one", failure);
232+
listener.onFailure(failure);
233+
return;
234+
}
235+
clusterService.submitStateUpdateTask(
236+
"remove repository cleanup task [" + repositoryName + "][" + repositoryStateId + ']',
237+
new ClusterStateUpdateTask() {
238+
@Override
239+
public ClusterState execute(ClusterState currentState) {
240+
return removeInProgressCleanup(currentState);
241+
}
242+
243+
@Override
244+
public void onFailure(String source, Exception e) {
245+
if (failure != null) {
246+
e.addSuppressed(failure);
247+
}
248+
logger.warn(() ->
249+
new ParameterizedMessage("[{}] failed to remove repository cleanup task", repositoryName), e);
250+
listener.onFailure(e);
241251
}
242-
logger.warn(() ->
243-
new ParameterizedMessage("[{}] failed to remove repository cleanup task", repositoryName), e);
244-
listener.onFailure(e);
245-
}
246252

247-
@Override
248-
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
249-
if (failure == null) {
250-
logger.info("Done with repository cleanup on [{}][{}] with result [{}]",
251-
repositoryName, repositoryStateId, result);
252-
listener.onResponse(result);
253-
} else {
254-
logger.warn(() -> new ParameterizedMessage("Failed to run repository cleanup operations on [{}][{}]",
255-
repositoryName, repositoryStateId), failure);
256-
listener.onFailure(failure);
253+
@Override
254+
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
255+
if (failure == null) {
256+
logger.info("Done with repository cleanup on [{}][{}] with result [{}]",
257+
repositoryName, repositoryStateId, result);
258+
listener.onResponse(result);
259+
} else {
260+
logger.warn(() -> new ParameterizedMessage(
261+
"Failed to run repository cleanup operations on [{}][{}]",
262+
repositoryName, repositoryStateId), failure);
263+
listener.onFailure(failure);
264+
}
257265
}
258-
}
259-
});
260-
}
261-
});
266+
});
267+
}
268+
});
269+
}, listener::onFailure);
262270
}
263271
}

server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -483,15 +483,21 @@ private void restore(IndexShard indexShard, Repository repository, SnapshotRecov
483483
translogState.totalOperations(0);
484484
translogState.totalOperationsOnStart(0);
485485
indexShard.prepareForIndexRecovery();
486-
ShardId snapshotShardId = shardId;
486+
final ShardId snapshotShardId;
487487
final String indexName = restoreSource.index();
488488
if (!shardId.getIndexName().equals(indexName)) {
489489
snapshotShardId = new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, shardId.id());
490+
} else {
491+
snapshotShardId = shardId;
490492
}
491-
final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName);
492-
assert indexShard.getEngineOrNull() == null;
493-
repository.restoreShard(indexShard.store(), restoreSource.snapshot().getSnapshotId(), indexId, snapshotShardId,
494-
indexShard.recoveryState(), restoreListener);
493+
repository.getRepositoryData(ActionListener.wrap(
494+
repositoryData -> {
495+
final IndexId indexId = repositoryData.resolveIndexId(indexName);
496+
assert indexShard.getEngineOrNull() == null;
497+
repository.restoreShard(indexShard.store(), restoreSource.snapshot().getSnapshotId(), indexId, snapshotShardId,
498+
indexShard.recoveryState(), restoreListener);
499+
}, restoreListener::onFailure
500+
));
495501
} catch (Exception e) {
496502
restoreListener.onFailure(e);
497503
}

server/src/main/java/org/elasticsearch/repositories/FilterRepository.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,8 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind
6868
}
6969

7070
@Override
71-
public RepositoryData getRepositoryData() {
72-
return in.getRepositoryData();
71+
public void getRepositoryData(ActionListener<RepositoryData> listener) {
72+
in.getRepositoryData(listener);
7373
}
7474

7575
@Override

server/src/main/java/org/elasticsearch/repositories/Repository.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ default Repository create(RepositoryMetaData metaData, Function<String, Reposito
105105
* and the indices across all snapshots found in the repository. Throws a {@link RepositoryException}
106106
* if there was an error in reading the data.
107107
*/
108-
RepositoryData getRepositoryData();
108+
void getRepositoryData(ActionListener<RepositoryData> listener);
109109

110110
/**
111111
* Starts snapshotting process

0 commit comments

Comments
 (0)