23
23
import com .carrotsearch .hppc .cursors .ObjectObjectCursor ;
24
24
import org .elasticsearch .action .ActionListener ;
25
25
import org .elasticsearch .action .ActionRunnable ;
26
+ import org .elasticsearch .action .StepListener ;
26
27
import org .elasticsearch .action .support .ActionFilters ;
27
28
import org .elasticsearch .action .support .master .TransportMasterNodeAction ;
28
29
import org .elasticsearch .cluster .ClusterState ;
@@ -97,7 +98,7 @@ protected void masterOperation(final SnapshotsStatusRequest request,
97
98
List <SnapshotsInProgress .Entry > currentSnapshots =
98
99
snapshotsService .currentSnapshots (request .repository (), Arrays .asList (request .snapshots ()));
99
100
if (currentSnapshots .isEmpty ()) {
100
- listener . onResponse ( buildResponse (request , currentSnapshots , null ) );
101
+ buildResponse (request , currentSnapshots , null , listener );
101
102
return ;
102
103
}
103
104
@@ -119,20 +120,22 @@ protected void masterOperation(final SnapshotsStatusRequest request,
119
120
transportNodesSnapshotsStatus .execute (
120
121
new TransportNodesSnapshotsStatus .Request (nodesIds .toArray (Strings .EMPTY_ARRAY ))
121
122
.snapshots (snapshots ).timeout (request .masterNodeTimeout ()),
122
- ActionListener .wrap (
123
- nodeSnapshotStatuses -> threadPool .executor (ThreadPool .Names .GENERIC ).execute (
124
- ActionRunnable .supply (listener , () -> buildResponse (request , snapshotsService .currentSnapshots (
125
- request .repository (), Arrays .asList (request .snapshots ())), nodeSnapshotStatuses ))), listener ::onFailure ));
123
+ ActionListener .wrap (nodeSnapshotStatuses -> threadPool .generic ().execute (
124
+ ActionRunnable .wrap (listener ,
125
+ l -> buildResponse (
126
+ request , snapshotsService .currentSnapshots (request .repository (), Arrays .asList (request .snapshots ())),
127
+ nodeSnapshotStatuses , l ))
128
+ ), listener ::onFailure ));
126
129
} else {
127
130
// We don't have any in-progress shards, just return current stats
128
- listener . onResponse ( buildResponse (request , currentSnapshots , null ) );
131
+ buildResponse (request , currentSnapshots , null , listener );
129
132
}
130
133
131
134
}
132
135
133
- private SnapshotsStatusResponse buildResponse (SnapshotsStatusRequest request , List <SnapshotsInProgress .Entry > currentSnapshotEntries ,
134
- TransportNodesSnapshotsStatus .NodesSnapshotStatus nodeSnapshotStatuses )
135
- throws IOException {
136
+ private void buildResponse (SnapshotsStatusRequest request , List <SnapshotsInProgress .Entry > currentSnapshotEntries ,
137
+ TransportNodesSnapshotsStatus .NodesSnapshotStatus nodeSnapshotStatuses ,
138
+ ActionListener < SnapshotsStatusResponse > listener ) {
136
139
// First process snapshot that are currently processed
137
140
List <SnapshotStatus > builder = new ArrayList <>();
138
141
Set <String > currentSnapshotNames = new HashSet <>();
@@ -192,8 +195,18 @@ private SnapshotsStatusResponse buildResponse(SnapshotsStatusRequest request, Li
192
195
// Now add snapshots on disk that are not currently running
193
196
final String repositoryName = request .repository ();
194
197
if (Strings .hasText (repositoryName ) && request .snapshots () != null && request .snapshots ().length > 0 ) {
195
- final Set <String > requestedSnapshotNames = Sets .newHashSet (request .snapshots ());
196
- final RepositoryData repositoryData = snapshotsService .getRepositoryData (repositoryName );
198
+ loadRepositoryData (request , builder , currentSnapshotNames , repositoryName , listener );
199
+ } else {
200
+ listener .onResponse (new SnapshotsStatusResponse (Collections .unmodifiableList (builder )));
201
+ }
202
+ }
203
+
204
+ private void loadRepositoryData (SnapshotsStatusRequest request , List <SnapshotStatus > builder , Set <String > currentSnapshotNames ,
205
+ String repositoryName , ActionListener <SnapshotsStatusResponse > listener ) {
206
+ final Set <String > requestedSnapshotNames = Sets .newHashSet (request .snapshots ());
207
+ final StepListener <RepositoryData > repositoryDataListener = new StepListener <>();
208
+ snapshotsService .getRepositoryData (repositoryName , repositoryDataListener );
209
+ repositoryDataListener .whenComplete (repositoryData -> {
197
210
final Map <String , SnapshotId > matchedSnapshotIds = repositoryData .getSnapshotIds ().stream ()
198
211
.filter (s -> requestedSnapshotNames .contains (s .getName ()))
199
212
.collect (Collectors .toMap (SnapshotId ::getName , Function .identity ()));
@@ -248,9 +261,8 @@ private SnapshotsStatusResponse buildResponse(SnapshotsStatusRequest request, Li
248
261
(endTime == 0 ? threadPool .absoluteTimeInMillis () : endTime ) - startTime ));
249
262
}
250
263
}
251
- }
252
-
253
- return new SnapshotsStatusResponse (Collections .unmodifiableList (builder ));
264
+ listener .onResponse (new SnapshotsStatusResponse (Collections .unmodifiableList (builder )));
265
+ }, listener ::onFailure );
254
266
}
255
267
256
268
}
0 commit comments