21
21
import org .elasticsearch .action .ActionListener ;
22
22
import org .elasticsearch .action .admin .cluster .node .tasks .cancel .CancelTasksRequest ;
23
23
import org .elasticsearch .action .admin .cluster .node .tasks .cancel .CancelTasksResponse ;
24
+ import org .elasticsearch .client .Client ;
24
25
import org .elasticsearch .cluster .ClusterState ;
25
26
import org .elasticsearch .cluster .ClusterStateObserver ;
26
27
import org .elasticsearch .cluster .node .DiscoveryNode ;
34
35
import org .elasticsearch .tasks .TaskId ;
35
36
import org .elasticsearch .threadpool .ThreadPool ;
36
37
import org .elasticsearch .persistent .PersistentTasksCustomMetaData .PersistentTask ;
37
- import org .elasticsearch .security .InternalClient ;
38
38
39
39
import java .util .function .Predicate ;
40
40
41
+ import static org .elasticsearch .ClientHelper .PERSISTENT_TASK_ORIGIN ;
42
+ import static org .elasticsearch .ClientHelper .executeAsyncWithOrigin ;
43
+
41
44
/**
42
45
* This service is used by persistent actions to propagate changes in the action state and notify about completion
43
46
*/
44
47
public class PersistentTasksService extends AbstractComponent {
45
48
46
- private final InternalClient client ;
49
+ private final Client client ;
47
50
private final ClusterService clusterService ;
48
51
private final ThreadPool threadPool ;
49
52
50
- public PersistentTasksService (Settings settings , ClusterService clusterService , ThreadPool threadPool , InternalClient client ) {
53
+ public PersistentTasksService (Settings settings , ClusterService clusterService , ThreadPool threadPool , Client client ) {
51
54
super (settings );
52
55
this .client = client ;
53
56
this .clusterService = clusterService ;
@@ -63,8 +66,8 @@ public <Params extends PersistentTaskParams> void startPersistentTask(String tas
63
66
StartPersistentTaskAction .Request createPersistentActionRequest =
64
67
new StartPersistentTaskAction .Request (taskId , taskName , params );
65
68
try {
66
- client . execute ( StartPersistentTaskAction .INSTANCE , createPersistentActionRequest , ActionListener . wrap (
67
- o -> listener .onResponse ((PersistentTask <Params >) o .getTask ()), listener ::onFailure ));
69
+ executeAsyncWithOrigin ( client , PERSISTENT_TASK_ORIGIN , StartPersistentTaskAction .INSTANCE , createPersistentActionRequest ,
70
+ ActionListener . wrap ( o -> listener .onResponse ((PersistentTask <Params >) o .getTask ()), listener ::onFailure ));
68
71
} catch (Exception e ) {
69
72
listener .onFailure (e );
70
73
}
@@ -77,7 +80,7 @@ public void sendCompletionNotification(String taskId, long allocationId, Excepti
77
80
ActionListener <PersistentTask <?>> listener ) {
78
81
CompletionPersistentTaskAction .Request restartRequest = new CompletionPersistentTaskAction .Request (taskId , allocationId , failure );
79
82
try {
80
- client . execute ( CompletionPersistentTaskAction .INSTANCE , restartRequest ,
83
+ executeAsyncWithOrigin ( client , PERSISTENT_TASK_ORIGIN , CompletionPersistentTaskAction .INSTANCE , restartRequest ,
81
84
ActionListener .wrap (o -> listener .onResponse (o .getTask ()), listener ::onFailure ));
82
85
} catch (Exception e ) {
83
86
listener .onFailure (e );
@@ -93,7 +96,8 @@ void sendTaskManagerCancellation(long taskId, ActionListener<CancelTasksResponse
93
96
cancelTasksRequest .setTaskId (new TaskId (localNode .getId (), taskId ));
94
97
cancelTasksRequest .setReason ("persistent action was removed" );
95
98
try {
96
- client .admin ().cluster ().cancelTasks (cancelTasksRequest , listener );
99
+ executeAsyncWithOrigin (client .threadPool ().getThreadContext (), PERSISTENT_TASK_ORIGIN , cancelTasksRequest , listener ,
100
+ client .admin ().cluster ()::cancelTasks );
97
101
} catch (Exception e ) {
98
102
listener .onFailure (e );
99
103
}
@@ -109,8 +113,8 @@ void updateStatus(String taskId, long allocationId, Task.Status status, ActionLi
109
113
UpdatePersistentTaskStatusAction .Request updateStatusRequest =
110
114
new UpdatePersistentTaskStatusAction .Request (taskId , allocationId , status );
111
115
try {
112
- client . execute ( UpdatePersistentTaskStatusAction .INSTANCE , updateStatusRequest , ActionListener . wrap (
113
- o -> listener .onResponse (o .getTask ()), listener ::onFailure ));
116
+ executeAsyncWithOrigin ( client , PERSISTENT_TASK_ORIGIN , UpdatePersistentTaskStatusAction .INSTANCE , updateStatusRequest ,
117
+ ActionListener . wrap ( o -> listener .onResponse (o .getTask ()), listener ::onFailure ));
114
118
} catch (Exception e ) {
115
119
listener .onFailure (e );
116
120
}
@@ -122,8 +126,8 @@ void updateStatus(String taskId, long allocationId, Task.Status status, ActionLi
122
126
public void cancelPersistentTask (String taskId , ActionListener <PersistentTask <?>> listener ) {
123
127
RemovePersistentTaskAction .Request removeRequest = new RemovePersistentTaskAction .Request (taskId );
124
128
try {
125
- client . execute ( RemovePersistentTaskAction . INSTANCE , removeRequest , ActionListener . wrap ( o -> listener . onResponse ( o . getTask ()) ,
126
- listener ::onFailure ));
129
+ executeAsyncWithOrigin ( client , PERSISTENT_TASK_ORIGIN , RemovePersistentTaskAction . INSTANCE , removeRequest ,
130
+ ActionListener . wrap ( o -> listener . onResponse ( o . getTask ()), listener ::onFailure ));
127
131
} catch (Exception e ) {
128
132
listener .onFailure (e );
129
133
}
0 commit comments