28
28
import org .elasticsearch .action .index .IndexRequest ;
29
29
import org .elasticsearch .action .index .IndexResponse ;
30
30
import org .elasticsearch .action .support .ActionFilters ;
31
- import org .elasticsearch .action .support .master . TransportMasterNodeAction ;
31
+ import org .elasticsearch .action .support .HandledTransportAction ;
32
32
import org .elasticsearch .client .Client ;
33
33
import org .elasticsearch .client .OriginSettingClient ;
34
34
import org .elasticsearch .cluster .ClusterState ;
35
- import org .elasticsearch .cluster .block .ClusterBlockException ;
36
- import org .elasticsearch .cluster .block .ClusterBlockLevel ;
37
35
import org .elasticsearch .cluster .metadata .IndexMetaData ;
38
- import org .elasticsearch .cluster .metadata .IndexNameExpressionResolver ;
39
36
import org .elasticsearch .cluster .service .ClusterService ;
40
37
import org .elasticsearch .common .UUIDs ;
41
38
import org .elasticsearch .common .inject .Inject ;
42
- import org .elasticsearch .common .io .stream .StreamInput ;
43
39
import org .elasticsearch .common .settings .Settings ;
44
40
import org .elasticsearch .common .xcontent .ToXContent ;
45
41
import org .elasticsearch .common .xcontent .XContentBuilder ;
56
52
57
53
import static org .elasticsearch .index .reindex .ReindexTask .REINDEX_ORIGIN ;
58
54
59
- public class TransportStartReindexJobAction
60
- extends TransportMasterNodeAction <StartReindexJobAction .Request , StartReindexJobAction .Response > {
55
+ public class TransportStartReindexJobAction extends HandledTransportAction <StartReindexJobAction .Request , StartReindexJobAction .Response > {
61
56
57
+ private final ThreadPool threadPool ;
58
+ private final ClusterService clusterService ;
62
59
private final PersistentTasksService persistentTasksService ;
63
60
private final Client taskClient ;
64
61
65
62
@ Inject
66
63
public TransportStartReindexJobAction (TransportService transportService , ThreadPool threadPool ,
67
- ActionFilters actionFilters , IndexNameExpressionResolver indexNameExpressionResolver ,
68
- ClusterService clusterService , PersistentTasksService persistentTasksService , Client client ) {
69
- super (StartReindexJobAction .NAME , transportService , clusterService , threadPool , actionFilters , StartReindexJobAction .Request ::new ,
70
- indexNameExpressionResolver );
64
+ ActionFilters actionFilters , ClusterService clusterService ,
65
+ PersistentTasksService persistentTasksService , Client client ) {
66
+ super (StartReindexJobAction .NAME , transportService , actionFilters , StartReindexJobAction .Request ::new );
67
+ this .threadPool = threadPool ;
68
+ this .clusterService = clusterService ;
71
69
this .persistentTasksService = persistentTasksService ;
72
70
this .taskClient = new OriginSettingClient (client , REINDEX_ORIGIN );
73
71
}
74
72
75
73
@ Override
76
- protected String executor () {
77
- return ThreadPool .Names .SAME ;
78
- }
79
-
80
- @ Override
81
- protected StartReindexJobAction .Response read (StreamInput in ) throws IOException {
82
- return new StartReindexJobAction .Response (in );
83
- }
84
-
85
- @ Override
86
- protected void masterOperation (Task task , StartReindexJobAction .Request request , ClusterState state ,
87
- ActionListener <StartReindexJobAction .Response > listener ) {
74
+ protected void doExecute (Task task , StartReindexJobAction .Request request , ActionListener <StartReindexJobAction .Response > listener ) {
88
75
// TODO: If the connection is lost to the master, this action might be retried creating two tasks.
89
76
// Eventually prevent this (perhaps by pre-generating UUID).
90
77
String generatedId = UUIDs .randomBase64UUID ();
@@ -93,7 +80,8 @@ protected void masterOperation(Task task, StartReindexJobAction.Request request,
93
80
boolean storeTaskResult = request .getWaitForCompletion () == false ;
94
81
ReindexJob job = new ReindexJob (storeTaskResult , threadPool .getThreadContext ().getHeaders ());
95
82
96
- boolean reindexIndexExists = state .routingTable ().hasIndex (ReindexTask .REINDEX_INDEX );
83
+ ClusterState clusterState = clusterService .state ();
84
+ boolean reindexIndexExists = clusterState .routingTable ().hasIndex (ReindexTask .REINDEX_INDEX );
97
85
98
86
createReindexTaskDoc (generatedId , request .getReindexRequest (), reindexIndexExists , new ActionListener <>() {
99
87
@ Override
@@ -162,11 +150,6 @@ public void onFailure(Exception e) {
162
150
});
163
151
}
164
152
165
- @ Override
166
- protected ClusterBlockException checkBlock (StartReindexJobAction .Request request , ClusterState state ) {
167
- return state .blocks ().globalBlockedException (ClusterBlockLevel .METADATA_WRITE );
168
- }
169
-
170
153
private void createReindexTaskDoc (String taskId , ReindexRequest reindexRequest , boolean indexExists , ActionListener <Void > listener ) {
171
154
if (indexExists ) {
172
155
IndexRequest indexRequest = new IndexRequest (ReindexTask .REINDEX_INDEX ).id (taskId ).opType (DocWriteRequest .OpType .CREATE );
0 commit comments