11
11
import org .apache .logging .log4j .Logger ;
12
12
import org .apache .logging .log4j .message .ParameterizedMessage ;
13
13
import org .elasticsearch .ElasticsearchException ;
14
- import org .elasticsearch .ExceptionsHelper ;
15
- import org .elasticsearch .ResourceAlreadyExistsException ;
14
+ import org .elasticsearch .Version ;
16
15
import org .elasticsearch .action .ActionListener ;
17
- import org .elasticsearch .action .admin .indices .create .CreateIndexRequest ;
18
- import org .elasticsearch .action .admin .indices .create .CreateIndexResponse ;
19
16
import org .elasticsearch .action .bulk .BackoffPolicy ;
20
17
import org .elasticsearch .action .index .IndexRequestBuilder ;
21
18
import org .elasticsearch .action .index .IndexResponse ;
22
19
import org .elasticsearch .client .Client ;
23
20
import org .elasticsearch .client .OriginSettingClient ;
24
21
import org .elasticsearch .client .Requests ;
25
- import org .elasticsearch .cluster .ClusterState ;
26
22
import org .elasticsearch .cluster .metadata .IndexMetadata ;
27
- import org .elasticsearch .cluster .metadata .MappingMetadata ;
28
- import org .elasticsearch .cluster .service .ClusterService ;
29
23
import org .elasticsearch .common .inject .Inject ;
30
24
import org .elasticsearch .common .settings .Settings ;
31
25
import org .elasticsearch .common .unit .TimeValue ;
32
26
import org .elasticsearch .common .util .concurrent .EsRejectedExecutionException ;
33
27
import org .elasticsearch .common .xcontent .ToXContent ;
34
28
import org .elasticsearch .common .xcontent .XContentBuilder ;
35
29
import org .elasticsearch .common .xcontent .XContentFactory ;
36
- import org .elasticsearch .common .xcontent .XContentType ;
37
- import org .elasticsearch .core .internal .io .Streams ;
30
+ import org .elasticsearch .indices .SystemIndexDescriptor ;
38
31
import org .elasticsearch .threadpool .ThreadPool ;
39
32
40
- import java .io .ByteArrayOutputStream ;
41
33
import java .io .IOException ;
42
- import java .io .InputStream ;
43
- import java .nio .charset .StandardCharsets ;
34
+ import java .io .UncheckedIOException ;
44
35
import java .util .Iterator ;
45
- import java .util .Map ;
46
36
47
37
import static org .elasticsearch .action .admin .cluster .node .tasks .get .GetTaskAction .TASKS_ORIGIN ;
48
38
import static org .elasticsearch .common .unit .TimeValue .timeValueMillis ;
39
+ import static org .elasticsearch .common .xcontent .XContentFactory .jsonBuilder ;
49
40
50
41
/**
51
42
* Service that can store task results.
@@ -55,14 +46,19 @@ public class TaskResultsService {
55
46
private static final Logger logger = LogManager .getLogger (TaskResultsService .class );
56
47
57
48
public static final String TASK_INDEX = ".tasks" ;
58
-
59
49
public static final String TASK_TYPE = "task" ;
60
-
61
- public static final String TASK_RESULT_INDEX_MAPPING_FILE = "task-index-mapping.json" ;
62
-
63
50
public static final String TASK_RESULT_MAPPING_VERSION_META_FIELD = "version" ;
64
51
65
- public static final int TASK_RESULT_MAPPING_VERSION = 3 ;
52
+ public static final SystemIndexDescriptor TASKS_DESCRIPTOR = SystemIndexDescriptor .builder ()
53
+ .setIndexPattern (TASK_INDEX + "*" )
54
+ .setPrimaryIndex (TASK_INDEX )
55
+ .setDescription ("Task Result Index" )
56
+ .setSettings (getTaskResultIndexSettings ())
57
+ .setMappings (getTaskResultIndexMappings ())
58
+ .setVersionMetaKey (TASK_RESULT_MAPPING_VERSION_META_FIELD )
59
+ .setOrigin (TASKS_ORIGIN )
60
+ .setIndexType (TASK_TYPE )
61
+ .build ();
66
62
67
63
/**
68
64
* The backoff policy to use when saving a task result fails. The total wait
@@ -73,76 +69,16 @@ public class TaskResultsService {
73
69
74
70
private final Client client ;
75
71
76
- private final ClusterService clusterService ;
77
-
78
72
private final ThreadPool threadPool ;
79
73
80
74
@ Inject
81
- public TaskResultsService (Client client , ClusterService clusterService , ThreadPool threadPool ) {
75
+ public TaskResultsService (Client client , ThreadPool threadPool ) {
82
76
this .client = new OriginSettingClient (client , TASKS_ORIGIN );
83
- this .clusterService = clusterService ;
84
77
this .threadPool = threadPool ;
85
78
}
86
79
87
80
public void storeResult (TaskResult taskResult , ActionListener <Void > listener ) {
88
-
89
- ClusterState state = clusterService .state ();
90
-
91
- if (state .routingTable ().hasIndex (TASK_INDEX ) == false ) {
92
- CreateIndexRequest createIndexRequest = new CreateIndexRequest ();
93
- createIndexRequest .settings (taskResultIndexSettings ());
94
- createIndexRequest .index (TASK_INDEX );
95
- createIndexRequest .mapping (TASK_TYPE , taskResultIndexMapping (), XContentType .JSON );
96
- createIndexRequest .cause ("auto(task api)" );
97
-
98
- client .admin ().indices ().create (createIndexRequest , new ActionListener <CreateIndexResponse >() {
99
- @ Override
100
- public void onResponse (CreateIndexResponse result ) {
101
- doStoreResult (taskResult , listener );
102
- }
103
-
104
- @ Override
105
- public void onFailure (Exception e ) {
106
- if (ExceptionsHelper .unwrapCause (e ) instanceof ResourceAlreadyExistsException ) {
107
- // we have the index, do it
108
- try {
109
- doStoreResult (taskResult , listener );
110
- } catch (Exception inner ) {
111
- inner .addSuppressed (e );
112
- listener .onFailure (inner );
113
- }
114
- } else {
115
- listener .onFailure (e );
116
- }
117
- }
118
- });
119
- } else {
120
- IndexMetadata metadata = state .getMetadata ().index (TASK_INDEX );
121
- if (getTaskResultMappingVersion (metadata ) < TASK_RESULT_MAPPING_VERSION ) {
122
- // The index already exists but doesn't have our mapping
123
- client .admin ().indices ().preparePutMapping (TASK_INDEX ).setType (TASK_TYPE )
124
- .setSource (taskResultIndexMapping (), XContentType .JSON )
125
- .execute (ActionListener .delegateFailure (listener , (l , r ) -> doStoreResult (taskResult , listener )));
126
- } else {
127
- doStoreResult (taskResult , listener );
128
- }
129
- }
130
- }
131
-
132
- private int getTaskResultMappingVersion (IndexMetadata metadata ) {
133
- MappingMetadata mappingMetadata = metadata .getMappings ().get (TASK_TYPE );
134
- if (mappingMetadata == null ) {
135
- return 0 ;
136
- }
137
- @ SuppressWarnings ("unchecked" ) Map <String , Object > meta = (Map <String , Object >) mappingMetadata .sourceAsMap ().get ("_meta" );
138
- if (meta == null || meta .containsKey (TASK_RESULT_MAPPING_VERSION_META_FIELD ) == false ) {
139
- return 1 ; // The mapping was created before meta field was introduced
140
- }
141
- return (int ) meta .get (TASK_RESULT_MAPPING_VERSION_META_FIELD );
142
- }
143
-
144
- private void doStoreResult (TaskResult taskResult , ActionListener <Void > listener ) {
145
- IndexRequestBuilder index = client .prepareIndex (TASK_INDEX , TASK_TYPE , taskResult .getTask ().getTaskId ().toString ());
81
+ IndexRequestBuilder index = client .prepareIndex (TASK_INDEX , TASK_TYPE ).setId (taskResult .getTask ().getTaskId ().toString ());
146
82
try (XContentBuilder builder = XContentFactory .contentBuilder (Requests .INDEX_CONTENT_TYPE )) {
147
83
taskResult .toXContent (builder , ToXContent .EMPTY_PARAMS );
148
84
index .setSource (builder );
@@ -173,24 +109,106 @@ public void onFailure(Exception e) {
173
109
});
174
110
}
175
111
176
- private Settings taskResultIndexSettings () {
112
+ private static Settings getTaskResultIndexSettings () {
177
113
return Settings .builder ()
178
114
.put (IndexMetadata .INDEX_NUMBER_OF_SHARDS_SETTING .getKey (), 1 )
179
115
.put (IndexMetadata .INDEX_AUTO_EXPAND_REPLICAS_SETTING .getKey (), "0-1" )
180
116
.put (IndexMetadata .SETTING_PRIORITY , Integer .MAX_VALUE )
181
117
.build ();
182
118
}
183
119
184
- public String taskResultIndexMapping () {
185
- try (InputStream is = getClass ().getResourceAsStream (TASK_RESULT_INDEX_MAPPING_FILE )) {
186
- ByteArrayOutputStream out = new ByteArrayOutputStream ();
187
- Streams .copy (is , out );
188
- return out .toString (StandardCharsets .UTF_8 .name ());
189
- } catch (Exception e ) {
190
- logger .error (() -> new ParameterizedMessage (
191
- "failed to create tasks results index template [{}]" , TASK_RESULT_INDEX_MAPPING_FILE ), e );
192
- throw new IllegalStateException ("failed to create tasks results index template [" + TASK_RESULT_INDEX_MAPPING_FILE + "]" , e );
193
- }
120
+ private static XContentBuilder getTaskResultIndexMappings () {
121
+ try {
122
+ final XContentBuilder builder = jsonBuilder ();
123
+
124
+ builder .startObject ();
125
+ {
126
+ builder .startObject (TASK_TYPE );
127
+ {
128
+ builder .startObject ("_meta" );
129
+ builder .field (TASK_RESULT_MAPPING_VERSION_META_FIELD , Version .CURRENT .toString ());
130
+ builder .endObject ();
131
+
132
+ builder .field ("dynamic" , "strict" );
133
+ builder .startObject ("properties" );
134
+ {
135
+ builder .startObject ("completed" );
136
+ builder .field ("type" , "boolean" );
137
+ builder .endObject ();
138
+
139
+ builder .startObject ("task" );
140
+ {
141
+ builder .startObject ("properties" );
142
+ {
143
+ builder .startObject ("action" );
144
+ builder .field ("type" , "keyword" );
145
+ builder .endObject ();
146
+
147
+ builder .startObject ("cancellable" );
148
+ builder .field ("type" , "boolean" );
149
+ builder .endObject ();
150
+
151
+ builder .startObject ("id" );
152
+ builder .field ("type" , "long" );
153
+ builder .endObject ();
154
+
155
+ builder .startObject ("parent_task_id" );
156
+ builder .field ("type" , "keyword" );
157
+ builder .endObject ();
158
+
159
+ builder .startObject ("node" );
160
+ builder .field ("type" , "keyword" );
161
+ builder .endObject ();
162
+
163
+ builder .startObject ("running_time_in_nanos" );
164
+ builder .field ("type" , "long" );
165
+ builder .endObject ();
166
+
167
+ builder .startObject ("start_time_in_millis" );
168
+ builder .field ("type" , "long" );
169
+ builder .endObject ();
170
+
171
+ builder .startObject ("type" );
172
+ builder .field ("type" , "keyword" );
173
+ builder .endObject ();
174
+
175
+ builder .startObject ("status" );
176
+ builder .field ("type" , "object" );
177
+ builder .field ("enabled" , false );
178
+ builder .endObject ();
179
+
180
+ builder .startObject ("description" );
181
+ builder .field ("type" , "text" );
182
+ builder .endObject ();
183
+
184
+ builder .startObject ("headers" );
185
+ builder .field ("type" , "object" );
186
+ builder .field ("enabled" , false );
187
+ builder .endObject ();
188
+ }
189
+ builder .endObject ();
190
+ }
191
+ builder .endObject ();
194
192
193
+ builder .startObject ("response" );
194
+ builder .field ("type" , "object" );
195
+ builder .field ("enabled" , false );
196
+ builder .endObject ();
197
+
198
+ builder .startObject ("error" );
199
+ builder .field ("type" , "object" );
200
+ builder .field ("enabled" , false );
201
+ builder .endObject ();
202
+ }
203
+ builder .endObject ();
204
+ }
205
+ builder .endObject ();
206
+ }
207
+
208
+ builder .endObject ();
209
+ return builder ;
210
+ } catch (IOException e ) {
211
+ throw new UncheckedIOException ("Failed to build " + TASK_INDEX + " index mappings" , e );
212
+ }
195
213
}
196
214
}
0 commit comments