File tree Expand file tree Collapse file tree 1 file changed +26
-0
lines changed
server/src/main/java/org/elasticsearch/persistent Expand file tree Collapse file tree 1 file changed +26
-0
lines changed Original file line number Diff line number Diff line change @@ -156,6 +156,32 @@ public void onTimeout(TimeValue timeout) {
156
156
}
157
157
}
158
158
159
+ public void waitForPersistentTasksStatus (Predicate <PersistentTasksCustomMetaData > predicate ,
160
+ @ Nullable TimeValue timeout , ActionListener <Boolean > listener ) {
161
+ ClusterStateObserver stateObserver = new ClusterStateObserver (clusterService , timeout ,
162
+ logger , threadPool .getThreadContext ());
163
+ if (predicate .test (stateObserver .setAndGetObservedState ().metaData ().custom (PersistentTasksCustomMetaData .TYPE ))) {
164
+ listener .onResponse (true );
165
+ } else {
166
+ stateObserver .waitForNextChange (new ClusterStateObserver .Listener () {
167
+ @ Override
168
+ public void onNewClusterState (ClusterState state ) {
169
+ listener .onResponse (true );
170
+ }
171
+
172
+ @ Override
173
+ public void onClusterServiceClose () {
174
+ listener .onFailure (new NodeClosedException (clusterService .localNode ()));
175
+ }
176
+
177
+ @ Override
178
+ public void onTimeout (TimeValue timeout ) {
179
+ listener .onFailure (new IllegalStateException ("timed out after " + timeout ));
180
+ }
181
+ }, clusterState -> predicate .test (clusterState .metaData ().custom (PersistentTasksCustomMetaData .TYPE )));
182
+ }
183
+ }
184
+
159
185
public interface WaitForPersistentTaskStatusListener <Request extends PersistentTaskRequest >
160
186
extends ActionListener <PersistentTask <Request >> {
161
187
default void onTimeout (TimeValue timeout ) {
You can’t perform that action at this time.
0 commit comments