From 89a61567d4d20deaa2d2def2e301efadf4dcaf3a Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 19 Nov 2019 16:49:41 +0100 Subject: [PATCH 1/4] Make SnapshotsService#getRepositoryData Async Follow up to #49299 removing the blocking step for the snapshot status APIs as well. --- .../get/TransportGetSnapshotsAction.java | 45 ++++++++++++------- .../TransportSnapshotsStatusAction.java | 42 ++++++++++------- .../snapshots/SnapshotsService.java | 15 ++++--- 3 files changed, 65 insertions(+), 37 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java index 5047871383234..eaf64863f60bc 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.StepListener; import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesAction; import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest; import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse; @@ -36,6 +37,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.regex.Regex; @@ -123,18 +125,21 @@ private void getMultipleReposSnapshotInfo(List repos, String // run concurrently for all repos on GENERIC thread pool for (final RepositoryMetaData repo : repos) { - threadPool.executor(ThreadPool.Names.GENERIC).execute(ActionRunnable.supply(groupedActionListener, () -> { - try { - return GetSnapshotsResponse.Response.snapshots( - repo.name(), getSingleRepoSnapshotInfo(repo.name(), snapshots, ignoreUnavailable, verbose)); - } catch (ElasticsearchException e) { - return GetSnapshotsResponse.Response.error(repo.name(), e); - } - })); + final String repoName = repo.name(); + threadPool.generic().execute(ActionRunnable.wrap( + ActionListener.delegateResponse(groupedActionListener, (groupedListener, e) -> { + if (e instanceof ElasticsearchException) { + groupedListener.onResponse(GetSnapshotsResponse.Response.error(repoName, (ElasticsearchException) e)); + } else { + groupedListener.onFailure(e); + } + }), wrappedListener -> getSingleRepoSnapshotInfo(repoName, snapshots, ignoreUnavailable, verbose, + ActionListener.map(wrappedListener, snInfos -> GetSnapshotsResponse.Response.snapshots(repoName, snInfos))))); } } - private List getSingleRepoSnapshotInfo(String repo, String[] snapshots, boolean ignoreUnavailable, boolean verbose) { + private void getSingleRepoSnapshotInfo(String repo, String[] snapshots, boolean ignoreUnavailable, boolean verbose, + ActionListener> listener) { final Map allSnapshotIds = new HashMap<>(); final List currentSnapshots = new ArrayList<>(); for (SnapshotInfo snapshotInfo : snapshotsService.currentSnapshots(repo)) { @@ -143,14 +148,25 @@ private List getSingleRepoSnapshotInfo(String repo, String[] snaps currentSnapshots.add(snapshotInfo); } - final RepositoryData repositoryData; - if (isCurrentSnapshotsOnly(snapshots) == false) { - repositoryData = snapshotsService.getRepositoryData(repo); + final StepListener repositoryDataListener = new StepListener<>(); + if (isCurrentSnapshotsOnly(snapshots)) { + repositoryDataListener.onResponse(null); + } else { + snapshotsService.getRepositoryData(repo, repositoryDataListener); + } + + repositoryDataListener.whenComplete(repositoryData -> listener.onResponse( + loadSnapshotInfos(repo, snapshots, ignoreUnavailable, verbose, allSnapshotIds, currentSnapshots, repositoryData)), + listener::onFailure); + } + + private List loadSnapshotInfos(String repo, String[] snapshots, boolean ignoreUnavailable, boolean verbose, + Map allSnapshotIds, List currentSnapshots, + @Nullable RepositoryData repositoryData) { + if (repositoryData != null) { for (SnapshotId snapshotId : repositoryData.getSnapshotIds()) { allSnapshotIds.put(snapshotId.getName(), snapshotId); } - } else { - repositoryData = null; } final Set toResolve = new HashSet<>(); @@ -193,7 +209,6 @@ private List getSingleRepoSnapshotInfo(String repo, String[] snaps CollectionUtil.timSort(snapshotInfos); } } - return snapshotInfos; } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index 4e5ade741c21e..ddb621f55d833 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -25,6 +25,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.StepListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.client.node.NodeClient; @@ -102,7 +103,7 @@ protected void masterOperation(Task task, final SnapshotsStatusRequest request, List currentSnapshots = snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots())); if (currentSnapshots.isEmpty()) { - listener.onResponse(buildResponse(request, currentSnapshots, null)); + buildResponse(request, currentSnapshots, null, listener); return; } @@ -124,20 +125,21 @@ protected void masterOperation(Task task, final SnapshotsStatusRequest request, client.executeLocally(TransportNodesSnapshotsStatus.TYPE, new TransportNodesSnapshotsStatus.Request(nodesIds.toArray(Strings.EMPTY_ARRAY)) .snapshots(snapshots).timeout(request.masterNodeTimeout()), - ActionListener.wrap( - nodeSnapshotStatuses -> threadPool.executor(ThreadPool.Names.GENERIC).execute( - ActionRunnable.supply(listener, () -> buildResponse(request, snapshotsService.currentSnapshots( - request.repository(), Arrays.asList(request.snapshots())), nodeSnapshotStatuses))), listener::onFailure)); + ActionListener.wrap(nodeSnapshotStatuses -> threadPool.generic().execute( + ActionRunnable.wrap(listener, + l -> buildResponse( + request, snapshotsService.currentSnapshots(request.repository(), Arrays.asList(request.snapshots())), + nodeSnapshotStatuses, l)) + ), listener::onFailure)); } else { // We don't have any in-progress shards, just return current stats - listener.onResponse(buildResponse(request, currentSnapshots, null)); + buildResponse(request, currentSnapshots, null, listener); } - } - private SnapshotsStatusResponse buildResponse(SnapshotsStatusRequest request, List currentSnapshotEntries, - TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses) - throws IOException { + private void buildResponse(SnapshotsStatusRequest request, List currentSnapshotEntries, + TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses, + ActionListener listener) { // First process snapshot that are currently processed List builder = new ArrayList<>(); Set currentSnapshotNames = new HashSet<>(); @@ -197,8 +199,18 @@ private SnapshotsStatusResponse buildResponse(SnapshotsStatusRequest request, Li // Now add snapshots on disk that are not currently running final String repositoryName = request.repository(); if (Strings.hasText(repositoryName) && request.snapshots() != null && request.snapshots().length > 0) { - final Set requestedSnapshotNames = Sets.newHashSet(request.snapshots()); - final RepositoryData repositoryData = snapshotsService.getRepositoryData(repositoryName); + loadRepositoryData(request, builder, currentSnapshotNames, repositoryName, listener); + } else { + listener.onResponse(new SnapshotsStatusResponse(Collections.unmodifiableList(builder))); + } + } + + private void loadRepositoryData(SnapshotsStatusRequest request, List builder, Set currentSnapshotNames, + String repositoryName, ActionListener listener) { + final Set requestedSnapshotNames = Sets.newHashSet(request.snapshots()); + final StepListener repositoryDataListener = new StepListener<>(); + snapshotsService.getRepositoryData(repositoryName, repositoryDataListener); + repositoryDataListener.whenComplete(repositoryData -> { final Map matchedSnapshotIds = repositoryData.getSnapshotIds().stream() .filter(s -> requestedSnapshotNames.contains(s.getName())) .collect(Collectors.toMap(SnapshotId::getName, Function.identity())); @@ -253,9 +265,7 @@ private SnapshotsStatusResponse buildResponse(SnapshotsStatusRequest request, Li (endTime == 0 ? threadPool.absoluteTimeInMillis() : endTime) - startTime)); } } - } - - return new SnapshotsStatusResponse(Collections.unmodifiableList(builder)); + listener.onResponse(new SnapshotsStatusResponse(Collections.unmodifiableList(builder))); + }, listener::onFailure); } - } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 9798b1e045f1e..65c4eaeb2609d 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -31,7 +31,6 @@ import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.StepListener; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; @@ -157,12 +156,16 @@ public SnapshotsService(Settings settings, ClusterService clusterService, IndexN * Gets the {@link RepositoryData} for the given repository. * * @param repositoryName repository name - * @return repository data + * @param listener listener to pass {@link RepositoryData} to */ - public RepositoryData getRepositoryData(final String repositoryName) { - Repository repository = repositoriesService.repository(repositoryName); - assert repository != null; // should only be called once we've validated the repository exists - return PlainActionFuture.get(repository::getRepositoryData); + public void getRepositoryData(final String repositoryName, final ActionListener listener) { + try { + Repository repository = repositoriesService.repository(repositoryName); + assert repository != null; // should only be called once we've validated the repository exists + repository.getRepositoryData(listener); + } catch (Exception e) { + listener.onFailure(e); + } } /** From 8fe160f61e376ee8bb6ba5ee7d995ac1b2886d10 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 19 Nov 2019 16:52:46 +0100 Subject: [PATCH 2/4] smaller diff --- .../admin/cluster/snapshots/get/TransportGetSnapshotsAction.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java index eaf64863f60bc..a488ed413897e 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/get/TransportGetSnapshotsAction.java @@ -209,6 +209,7 @@ private List loadSnapshotInfos(String repo, String[] snapshots, bo CollectionUtil.timSort(snapshotInfos); } } + return snapshotInfos; } From a5675bc06d60872bfce7a6bfdd4642b5915c70fb Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 19 Nov 2019 16:53:05 +0100 Subject: [PATCH 3/4] smaller diff --- .../cluster/snapshots/status/TransportSnapshotsStatusAction.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index ddb621f55d833..d124d4acb86ec 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -268,4 +268,5 @@ private void loadRepositoryData(SnapshotsStatusRequest request, List Date: Tue, 19 Nov 2019 16:54:04 +0100 Subject: [PATCH 4/4] smaller diff --- .../cluster/snapshots/status/TransportSnapshotsStatusAction.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index d124d4acb86ec..8e7c85760749f 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -135,6 +135,7 @@ protected void masterOperation(Task task, final SnapshotsStatusRequest request, // We don't have any in-progress shards, just return current stats buildResponse(request, currentSnapshots, null, listener); } + } private void buildResponse(SnapshotsStatusRequest request, List currentSnapshotEntries,